diff --git a/bot.py b/bot.py index 566252e..4e0f4ba 100644 --- a/bot.py +++ b/bot.py @@ -5,7 +5,7 @@ from discord import app_commands from discord.app_commands import Choice from functools import reduce from dotenv import load_dotenv -from typing import List, Optional, Union +from typing import Dict, List, Optional, Union # TODO: Reenable + extend textgen # from textgen import textgen @@ -38,7 +38,8 @@ LINUS_GUILD = discord.Object(id=431154792308408340) TEST_GUILD = discord.Object(id=821511861178204161) -CONFIGPATH = "/config/Heidi.conf" if DOCKER else "./Heidi.conf" +CONFIGPATH = "/config" if DOCKER else "." +USERCONFIGNAME = "Heidi_User.conf" class HeidiClient(discord.Client): @@ -48,26 +49,37 @@ class HeidiClient(discord.Client): # Separate object that keeps all application command state self.tree = app_commands.CommandTree(self) - # Handle persistent configuration - self.config = configparser.ConfigParser() - if not os.path.exists(CONFIGPATH): - os.mknod(CONFIGPATH) - self.config.read(CONFIGPATH) - self.update_to_default_config() - self.print_config() + # Handle persistent user configuration + self.user_config = configparser.ConfigParser() + if not os.path.exists(f"{CONFIGPATH}/{USERCONFIGNAME}"): + os.mknod(f"{CONFIGPATH}/{USERCONFIGNAME}") + self.user_config.read(f"{CONFIGPATH}/{USERCONFIGNAME}") + self.update_to_default_user_config() + self.print_user_config() # self.models = Models() # scraped model list # automatic actions on all messages - # auto_triggers is a map with tuples of two functions: (predicate, action) + # on_message_triggers is a map with tuples of two functions: (predicate, action) + # the predicate receives the message as argument # if the predicate is true the action is performed - self.auto_triggers = { + self.on_message_triggers = { # lambda m: m.author.nick.lower() in self.models.get_in_names(): self.autoreact_to_girls, lambda m: "jeremy" in m.author.nick.lower(): self._autoreact_to_jeremy, lambda m: "kardashian" in m.author.nick.lower() or "jenner" in m.author.nick.lower(): self._autoreact_to_kardashian, } + # automatic actions on voice state changes + # on_voice_state_triggers is a map with tuples of two functions: (predicate, action) + # the predicate receives the member, before- and after-state as arguments + # if the predicate is true, the action is performed + self.on_voice_state_triggers = { + lambda m, b, a: b.channel != a.channel + and a.channel != None + and isinstance(a.channel, discord.VoiceChannel): self._play_entrance_sound, + } + # Textgen # self.textgen_models: dict[str, textgen] = { # # The name must correspond to the name of the training text file @@ -100,31 +112,39 @@ class HeidiClient(discord.Client): self.tree.copy_global_to(guild=TEST_GUILD) await self.tree.sync(guild=TEST_GUILD) - def update_to_default_config(self): + def update_to_default_user_config(self): """ Adds config keys to the config, if they don't exist yet. """ - config_sections = ["ENTRANCE.SOUND"] + user_config_sections = ["ENTRANCE.SOUND"] - for section in config_sections: - if section not in self.config: - print(f"Adding section {section} to Heidi.conf") - self.config[section] = dict() + for section in user_config_sections: + if section not in self.user_config: + print(f"Adding section {section} to {CONFIGPATH}/{USERCONFIGNAME}") + self.user_config[section] = dict() - with open(CONFIGPATH, "w") as file: - print("Writing Heidi.conf") - self.config.write(file) + self.write_user_config() - def print_config(self): + def print_user_config(self): print("Read persistent configuration:\n") - for section in self.config.sections(): + for section in self.user_config.sections(): print(f"[{section}]") - for key in self.config[section]: - print(f"{key}={self.config[section][key]}") + for key in self.user_config[section]: + print(f"{key}={self.user_config[section][key]}") print("") + def write_user_config(self): + if not os.path.exists(f"{CONFIGPATH}/{USERCONFIGNAME}"): + print(f"Error: {CONFIGPATH}/{USERCONFIGNAME} doesn't exist!") + return + + print(f"Writing {CONFIGPATH}/{USERCONFIGNAME}") + + with open(f"{CONFIGPATH}/{USERCONFIGNAME}", "w") as file: + self.user_config.write(file) + # Commands ----------------------------------------------------------------------------------- # async def list_models_in(self, message): @@ -159,19 +179,40 @@ class HeidiClient(discord.Client): # await message.add_reaction("❤") @staticmethod - async def _autoreact_to_jeremy(message): + async def _autoreact_to_jeremy(message: discord.Message): """ 🧀 Jeremy """ await message.add_reaction("🧀") @staticmethod - async def _autoreact_to_kardashian(message): + async def _autoreact_to_kardashian(message: discord.Message): """ 💄 Kardashian """ await message.add_reaction("💄") + async def _play_entrance_sound( + self, + member: discord.Member, + before: discord.VoiceState, + after: discord.VoiceState, + ): + soundpath: Union[str, None] = self.user_config["ENTRANCE.SOUND"].get( + member.name, None + ) + + if soundpath == None: + print(f"User {member.name} has not set an entrance sound") + return + + board, sound = soundpath.split("/") + + # Wait a bit to not have simultaneous joins + await asyncio.sleep(1) + + await play_voice_line_for_member(None, member, board, sound) + # ------------------------------------------------------------------------------------------------ @@ -182,6 +223,7 @@ handler = logging.FileHandler(filename="discord.log", encoding="utf-8", mode="w" intents = discord.Intents.default() intents.members = True # Allow to react to member join/leave etc intents.message_content = True # Allow to read message content from arbitrary messages +intents.voice_states = True # Allow to process on_voice_state_update # Setup our client client = HeidiClient(intents=intents) @@ -195,41 +237,91 @@ async def on_ready(): if client.user != None: print(f"{client.user} (id: {client.user.id}) has connected to Discord!") else: - print(f"client.user is None!") + print("client.user is None!") @client.event -async def on_message(message): +async def on_message(message: discord.Message): # Skip Heidis own messages if message.author == client.user: return # Automatic actions for all messages # python iterates over the keys of a map - for predicate in client.auto_triggers: + for predicate in client.on_message_triggers: if predicate(message): - action = client.auto_triggers[predicate] + action = client.on_message_triggers[predicate] + print(f"on_message: calling {action.__name__}") await action(message) - break + + +@client.event +async def on_voice_state_update( + member: discord.Member, before: discord.VoiceState, after: discord.VoiceState +): + # Skip Heidis own voice state updates (e.g. on /say) + if member._user == client.user: + return + + # Automatic acions for all voice state changes + # python iterates over the keys of a map + for predicate in client.on_voice_state_triggers: + if predicate(member, before, after): + action = client.on_voice_state_triggers[predicate] + print(f"on_voice_state_update: calling {action.__name__}") + await action(member, before, after) # Config Commands -------------------------------------------------------------------------------- -async def config_key_autocomplete( +async def user_config_key_autocomplete( interaction: discord.Interaction, current: str ) -> List[Choice[str]]: return [ Choice(name=key, value=key) - for key in client.config.sections() # @todo can access client here? + for key in client.user_config.sections() if key.lower().startswith(current.lower()) ] -async def config_value_autocomplete( +async def user_config_value_autocomplete( interaction: discord.Interaction, current: str ) -> List[Choice[str]]: - return [Choice(name="None", value="None")] + """ + Calls an autocomplete function depending on the entered config_key. + """ + autocompleters = {"ENTRANCE.SOUND": user_entrance_sound_autocomplete} + autocompleter = autocompleters[interaction.namespace.option] + + print(f"config_value_autocomplete: calling {autocompleter.__name__}") + + return autocompleter(interaction, current) + + +def user_entrance_sound_autocomplete( + interaction: discord.Interaction, current: str +) -> List[Choice[str]]: + """ + Generates autocomplete options for the ENTRANCE.SOUND config key. + """ + boards: List[str] = os.listdir(SOUNDDIR) + all_sounds: Dict[str, List[str]] = { + board: list(map(lambda x: x.split(".")[0], os.listdir(f"{SOUNDDIR}/{board}/"))) + for board in boards + } # These are all sounds, organized per board + + completions: List[Choice[str]] = [] + for ( + board, + board_sounds, + ) in all_sounds.items(): # Iterate over all sounds, organized per board + for sound in board_sounds: # Iterate over board specific sounds + soundpath = f"{board}/{sound}" + if soundpath.lower().startswith(current.lower()): + completions += [Choice(name=soundpath, value=soundpath)] + + return completions @client.tree.command( @@ -238,16 +330,25 @@ async def config_value_autocomplete( ) @app_commands.rename(config_key="option") @app_commands.describe(config_key="Die Option, welche du ändern willst.") -@app_commands.autocomplete(config_key=config_key_autocomplete) +@app_commands.autocomplete(config_key=user_config_key_autocomplete) @app_commands.rename(config_value="wert") @app_commands.describe( config_value="Der Wert, auf welche die Option gesetzt werden soll." ) -@app_commands.autocomplete(config_value=config_value_autocomplete) +@app_commands.autocomplete(config_value=user_config_value_autocomplete) async def user_config( interaction: discord.Interaction, config_key: str, config_value: str ): - pass + # Only Members can set settings + if not isinstance(interaction.user, discord.Member): + print("User not a member") + await interaction.response.send_message("Heidi sagt: Komm in die Gruppe!") + return + + member: discord.Member = interaction.user + + client.user_config[config_key][member.name] = config_value + client.write_user_config() # Commands --------------------------------------------------------------------------------------- @@ -346,7 +447,7 @@ async def choose(interaction: discord.Interaction, option_a: str, option_b: str) # Sounds ----------------------------------------------------------------------------------------- -SOUNDDIR: str = "/sounds/" if DOCKER else "./heidi-sounds/" +SOUNDDIR: str = "/sounds" if DOCKER else "./heidi-sounds" # Example: https://discordpy.readthedocs.io/en/latest/interactions/api.html?highlight=autocomplete#discord.app_commands.autocomplete @@ -367,7 +468,7 @@ async def sound_autocomplete( ) -> List[Choice[str]]: board: str = interaction.namespace.board sounds: List[str] = list( - map(lambda x: x.split(".")[0], os.listdir(SOUNDDIR + board + "/")) + map(lambda x: x.split(".")[0], os.listdir(f"{SOUNDDIR}/{board}/")) ) return [ @@ -392,39 +493,7 @@ async def say_voiceline(interaction: discord.Interaction, board: str, sound: str member: discord.Member = interaction.user - # Member needs to be in voice channel to hear audio (Heidi needs to know the channel to join) - if ( - (not member.voice) - or (not member.voice.channel) - or (not isinstance(member.voice.channel, discord.VoiceChannel)) - ): - print("User not in (valid) voice channel!") - await interaction.response.send_message("Heidi sagt: Komm in den Channel!") - return - - voice_channel: discord.VoiceChannel = member.voice.channel - - try: - open(SOUNDDIR + board + "/" + sound + ".mkv") - except IOError: - print("Error: Invalid soundfile!") - await interaction.response.send_message( - f'Heidi sagt: "{board}/{sound}" kanninich finden bruder' - ) - return - - await interaction.response.send_message(f'Heidi sagt: "{board}/{sound}"') - - audio_source = discord.FFmpegPCMAudio( - SOUNDDIR + board + "/" + sound + ".mkv" - ) # only works from docker - voice_client = await voice_channel.connect() - voice_client.play(audio_source) - - while voice_client.is_playing(): - await asyncio.sleep(1) - - await voice_client.disconnect() + await play_voice_line_for_member(interaction, member, board, sound) # Contextmenu ------------------------------------------------------------------------------------ @@ -465,6 +534,63 @@ async def insult( ) # with ephemeral = True only the caller can see the answer +# Helpers ---------------------------------------------------------------------------------------- + + +async def play_voice_line( + interaction: Union[discord.Interaction, None], + voice_channel: discord.VoiceChannel, + board: str, + sound: str, +): + try: + open(f"{SOUNDDIR}/{board}/{sound}.mkv") + except IOError: + print("Error: Invalid soundfile!") + if interaction != None: + await interaction.response.send_message( + f'Heidi sagt: "{board}/{sound}" kanninich finden bruder' + ) + return + + if interaction != None: + await interaction.response.send_message(f'Heidi sagt: "{board}/{sound}"') + + audio_source = discord.FFmpegPCMAudio( + f"{SOUNDDIR}/{board}/{sound}.mkv" + ) # only works from docker + voice_client = await voice_channel.connect() + voice_client.play(audio_source) + + while voice_client.is_playing(): + await asyncio.sleep(1) + + await voice_client.disconnect() + + +async def play_voice_line_for_member( + interaction: Union[discord.Interaction, None], + member: discord.Member, + board: str, + sound: str, +): + # Member needs to be in voice channel to hear audio (Heidi needs to know the channel to join) + if ( + member == None + or member.voice == None + or member.voice.channel == None + or not isinstance(member.voice.channel, discord.VoiceChannel) + ): + print("User not in (valid) voice channel!") + if interaction != None: + await interaction.response.send_message("Heidi sagt: Komm in den Channel!") + return + + voice_channel: discord.VoiceChannel = member.voice.channel + + await play_voice_line(interaction, voice_channel, board, sound) + + # ------------------------------------------------------------------------------------------------