"""Main Discord bot application.""" from __future__ import annotations import base64 import logging from io import BytesIO from typing import TYPE_CHECKING import discord import requests from discord import Message from discord.ext import commands from vibe_bot import llama_wrapper, tts from vibe_bot.tts import DEFAULT_LANG from vibe_bot.config import ( CHAT_ENDPOINT, CHAT_ENDPOINT_KEY, CHAT_MODEL, DISCORD_TOKEN, IMAGE_EDIT_ENDPOINT, IMAGE_EDIT_ENDPOINT_KEY, IMAGE_EDIT_MODEL, IMAGE_GEN_ENDPOINT, IMAGE_GEN_ENDPOINT_KEY, IMAGE_GEN_MODEL, MAX_COMPLETION_TOKENS, TTS_MODEL_PATH, TTS_SPEED, TTS_VOICE, TTS_VOICES_PATH, VOICES_LIST, ) from vibe_bot.database import CustomBotManager, get_database if TYPE_CHECKING: from discord.ext.commands import Bot from discord.ext.commands import Context as CommandsContext # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) # Initialize the bot intents = discord.Intents.default() intents.message_content = True bot = commands.Bot(command_prefix="!", intents=intents) def get_user_info(user: discord.User | discord.Member) -> str: """Format user information for inclusion in bot prompts.""" parts: list[str] = [] if user.global_name: parts.append(f"Global Name: {user.global_name}") nick = getattr(user, "nick", None) if nick: parts.append(f"Nickname: {nick}") top_role = getattr(user, "top_role", None) if top_role and top_role.name != "@everyone": parts.append(f"Top Role: {top_role.name}") activities = getattr(user, "activities", None) if activities: activity_names = [ getattr(a, "name", str(a)) for a in activities if getattr(a, "name", "") != "custom_status" ] if activity_names: parts.append(f"Activities: {', '.join(activity_names)}") joined_at = getattr(user, "joined_at", None) if joined_at: parts.append(f"Joined: {joined_at.strftime('%Y-%m-%d')}") parts.append(f"Username: {user.name}") parts.append(f"User ID: {user.id}") parts.append( f"Account Created: {user.created_at.strftime('%Y-%m-%d') if user.created_at else 'Unknown'}" ) return "\n".join(parts) # Initialize TTS engine tts_engine: tts.TTSEngine | None = None try: tts_engine = tts.TTSEngine(TTS_MODEL_PATH, TTS_VOICES_PATH) logger.info("TTS engine initialized successfully") except Exception: logger.exception("Failed to initialize TTS engine") logger.info( "Make sure kokoro-v1.0.onnx and voices-v1.0.bin are in the project directory", ) # Name and personality validation constants MIN_BOT_NAME_LENGTH = 2 MAX_BOT_NAME_LENGTH = 50 MIN_PERSONALITY_LENGTH = 10 @bot.event async def on_ready() -> None: """Log when the bot is ready and logged in.""" logger.info("Bot is starting up...") logger.info("Bot logged in as %s", bot.user) @bot.command(name="custom-bot") async def custom_bot( ctx: CommandsContext[Bot], bot_name: str, *, personality: str, ) -> None: """Create a custom bot with a name and personality. Usage: !custom-bot Example: !custom-bot alfred you are a proper british butler """ logger.info( "Custom bot command initiated by %s: name=%r, personality length=%d", ctx.author.name, bot_name, len(personality), ) # Validate bot name name_length = 0 if not bot_name else len(bot_name) if ( not bot_name or name_length < MIN_BOT_NAME_LENGTH or name_length > MAX_BOT_NAME_LENGTH ): logger.warning( "Invalid bot name from %s: %r (length: %d)", ctx.author.name, bot_name, name_length, ) await ctx.send("Invalid bot name. Name must be between 2 and 50 characters.") return logger.info("Bot name validation passed for %r", bot_name) # Validate personality personality_length = 0 if not personality else len(personality) if not personality or personality_length < MIN_PERSONALITY_LENGTH: logger.warning( "Invalid personality from %s: length=%d", ctx.author.name, personality_length, ) await ctx.send( "Invalid personality. Description must be at least 10 characters.", ) return logger.info("Personality validation passed for bot %r", bot_name) # Create custom bot manager logger.info("Initializing CustomBotManager for user %s", ctx.author.name) custom_bot_manager = CustomBotManager() # Create the custom bot logger.info( "Attempting to create custom bot %r for user %s", bot_name, ctx.author.name, ) success = custom_bot_manager.create_custom_bot( bot_name=bot_name, system_prompt=personality, created_by=str(ctx.author.id), ) if success: logger.info( "Successfully created custom bot %r for user %s", bot_name, ctx.author.name, ) await ctx.send( f"Custom bot **'{bot_name}'** has been created " f"with personality: *{personality}*", ) await ctx.send( f"\nYou can now use this bot with: " f"`!{bot_name} `", ) else: logger.warning( "Failed to create custom bot %r for user %s", bot_name, ctx.author.name, ) await ctx.send("Failed to create custom bot. It may already exist.") @bot.command(name="list-custom-bots") async def list_custom_bots(ctx: CommandsContext[Bot]) -> None: """List all custom bots available in the server.""" logger.info("Listing custom bots requested by %s", ctx.author.name) # Create custom bot manager logger.info("Initializing CustomBotManager to list custom bots") custom_bot_manager = CustomBotManager() logger.info("Fetching list of custom bots from database") bots = custom_bot_manager.list_custom_bots() if not bots: logger.info("No custom bots found for user %s", ctx.author.name) await ctx.send( "No custom bots have been created yet. " "Use `!custom-bot ` to create one.", ) return logger.info( "Found %d custom bots, displaying top 10 for %s", len(bots), ctx.author.name, ) bot_list = "Available Custom Bots:\n\n" for name, _prompt, _creator in bots: bot_list += f"* {name}\n" logger.info("Sending bot list response to %s", ctx.author.name) await ctx.send(bot_list) @bot.command(name="delete-custom-bot") async def delete_custom_bot(ctx: CommandsContext[Bot], bot_name: str) -> None: """Delete a custom bot (only the creator can delete). Usage: !delete-custom-bot """ logger.info( "Delete custom bot command initiated by %s: bot_name=%r", ctx.author.name, bot_name, ) # Create custom bot manager logger.info("Initializing CustomBotManager for delete operation") custom_bot_manager = CustomBotManager() # Get bot info logger.info("Looking up custom bot %r in database", bot_name) bot_info = custom_bot_manager.get_custom_bot(bot_name) if not bot_info: logger.warning( "Custom bot %r not found by user %s", bot_name, ctx.author.name, ) await ctx.send(f"Custom bot '{bot_name}' not found.") return logger.info( "Custom bot %r found, owned by user %s", bot_name, bot_info[2], ) # Check ownership if bot_info[2] != str(ctx.author.id): logger.warning( "User %s attempted to delete bot %r they don't own", ctx.author.name, bot_name, ) await ctx.send("You can only delete your own custom bots.") return logger.info( "User %s is authorized to delete bot %r", ctx.author.name, bot_name, ) # Delete the bot logger.info("Deleting custom bot %r from database", bot_name) success = custom_bot_manager.delete_custom_bot(bot_name) if success: logger.info( "Successfully deleted custom bot %r by user %s", bot_name, ctx.author.name, ) await ctx.send(f"Custom bot '{bot_name}' has been deleted.") else: logger.warning( "Failed to delete custom bot %r by user %s", bot_name, ctx.author.name, ) await ctx.send("Failed to delete custom bot.") # Handle custom bot commands @bot.event async def on_message(message: Message) -> None: """Handle incoming messages for custom bot command detection.""" # Skip bot messages if message.author == bot.user: return message_author = message.author.name message_content = message.content.lower() logger.debug( "Processing message from %s: %r...", message_author, message_content[:50], ) ctx = await bot.get_context(message) logger.info("Initializing CustomBotManager to check for custom bot commands") custom_bot_manager = CustomBotManager() logger.info("Fetching list of custom bots to check for matching commands") custom_bots = custom_bot_manager.list_custom_bots() logger.info("Checking %d custom bots for command match", len(custom_bots)) for bot_name, system_prompt, _ in custom_bots: # Check if message starts with the custom bot name followed by a space if message_content.startswith(f"!{bot_name} "): logger.info( "Custom bot command detected: %r triggered by %s", bot_name, message.author.name, ) # Extract the actual message (remove the bot name prefix) user_message = message.content[len(f"!{bot_name} ") :] logger.debug( "Extracted user message for bot %r: %r...", bot_name, user_message[:50], ) # Prepare the payload with custom personality response_prefix = f"{bot_name} response" logger.info("Sending request to OpenAI API for bot %r", bot_name) await handle_chat( ctx=ctx, bot_name=bot_name, message=user_message, system_prompt=system_prompt, response_prefix=response_prefix, ) return # If no custom bot matched, call the default event handler await bot.process_commands(message) @bot.command(name="lobotomize") async def lobotomize(ctx: CommandsContext[Bot]) -> None: """Clear all conversation history and memory for all bots.""" logger.info("Lobotomize command triggered by %s", ctx.author.name) db = get_database() db.clear_all_messages() await ctx.send("All conversation history and memory has been cleared. 🧠✨") @bot.command(name="voices") async def voices(ctx: CommandsContext[Bot]) -> None: """List all available TTS voices organized by category.""" voice_list = "Available Voices:\n\n" for category, info in VOICES_LIST.items(): voice_list += f"{category} ({info['language']}):\n" for v in info["voices"]: voice_list += f" - {v}\n" voice_list += "\n" voice_list += "Use `!speak --voice ` to choose a voice." chunk_size = 1900 chunks: list[str] = [] current_chunk = voice_list while current_chunk: if len(current_chunk) <= chunk_size: chunks.append(current_chunk) break split_pos = current_chunk.rfind("\n", 0, chunk_size) if split_pos == -1: split_pos = chunk_size chunks.append(current_chunk[:split_pos]) current_chunk = current_chunk[split_pos:].lstrip("\n") for chunk in chunks: await ctx.send(chunk) @bot.command(name="speak") async def speak( ctx: CommandsContext[Bot], *, message: str, ) -> None: """Have the bot speak the given text using Kokoro TTS, or have a custom bot speak. Usage: !speak --voice - plain text to speech Usage: !speak --voice - have a custom bot respond and speak Example: !speak hello world Example: !speak hello world --voice af_bella Example: !speak alfred what time is it --voice am_puck """ if tts_engine is None: await ctx.send( "TTS engine not initialized. " "Make sure kokoro-v1.0.onnx and voices-v1.0.bin are present.", ) return # Parse --voice flag from the message voice = None voice_match = message.rsplit("--voice ", 1) if len(voice_match) == 2: voice = voice_match[1].strip() message = voice_match[0].rstrip() if not message or not message.strip(): await ctx.send("Please provide text to speak.") return # Validate voice if provided if voice: all_voices = [v for cat in VOICES_LIST.values() for v in cat["voices"]] if voice not in all_voices: await ctx.send( f"Unknown voice '{voice}'. Use `!voices` to see available voices." ) return custom_bot_manager = CustomBotManager() custom_bots = custom_bot_manager.list_custom_bots() bot_names = [b[0] for b in custom_bots] first_word = message.split(maxsplit=1)[0] if message.split() else "" if first_word in bot_names: await _speak_with_bot( ctx, first_word, message, tts_engine, custom_bot_manager, voice ) else: await _speak_plain(ctx, message, tts_engine, voice) async def _speak_with_bot( ctx: CommandsContext[Bot], bot_name: str, message: str, engine: tts.TTSEngine, custom_bot_manager: CustomBotManager, voice: str | None = None, ) -> None: """Handle speak command for a custom bot.""" text_to_speak = message[len(bot_name) :].lstrip() if not text_to_speak: await ctx.send("Please provide text for the bot to respond to.") return await ctx.send(f"**{bot_name}** is thinking...") bot_info = custom_bot_manager.get_custom_bot(bot_name) if not bot_info: await ctx.send(f"Custom bot '{bot_name}' not found.") return _, system_prompt, _, _ = bot_info system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences.\n\nUser Information:\n{get_user_info(ctx.author)}" # Determine language for the chosen voice chosen_voice = voice or TTS_VOICE lang = DEFAULT_LANG for cat in VOICES_LIST.values(): if chosen_voice in cat["voices"]: lang = str(cat["language"]) break try: db = get_database() context = db.get_conversation_context( user_id=str(ctx.author.id), current_message=text_to_speak, max_context=5, ) prompts = [{"role": "user", "content": text_to_speak}] if context: prompts = context + prompts bot_response = llama_wrapper.chat_completion_with_history( system_prompt=system_prompt_edit, prompts=prompts, openai_url=CHAT_ENDPOINT, openai_api_key=CHAT_ENDPOINT_KEY, model=CHAT_MODEL, max_tokens=MAX_COMPLETION_TOKENS, ) if not bot_response: await ctx.send(f"**{bot_name}** failed to generate a response.") return db.add_message( message_id=f"{ctx.message.id}", user_id=str(ctx.author.id), username=ctx.author.name, content=f"User: {text_to_speak}", bot_name=bot_name, channel_id=str(ctx.channel.id), guild_id=str(ctx.guild.id) if ctx.guild else None, ) if ctx.bot.user is not None: db.add_message( message_id=f"{ctx.message.id}_response", user_id=str(ctx.bot.user.id), username=ctx.bot.user.name, content=bot_response, bot_name=bot_name, channel_id=str(ctx.channel.id), guild_id=str(ctx.guild.id) if ctx.guild else None, ) await ctx.send(f"**{bot_name}**: {bot_response}") await ctx.send(f"Generating speech for **{bot_name}**...") audio_buffer = engine.generate_audio( bot_response, voice=chosen_voice, speed=TTS_SPEED, lang=lang, ) audio_file = discord.File(audio_buffer, filename="speech.mp3") await ctx.send(file=audio_file) except Exception: logger.exception( "Error in speak command with bot %r", bot_name, ) await ctx.send("Error generating speech.") async def _speak_plain( ctx: CommandsContext[Bot], message: str, engine: tts.TTSEngine, voice: str | None = None, ) -> None: """Handle speak command for plain text.""" chosen_voice = voice or TTS_VOICE # Determine language for the chosen voice lang = DEFAULT_LANG for cat in VOICES_LIST.values(): if chosen_voice in cat["voices"]: lang = str(cat["language"]) break try: await ctx.send("Generating speech...") audio_buffer = engine.generate_audio( message, voice=chosen_voice, speed=TTS_SPEED, lang=lang, ) audio_file = discord.File(audio_buffer, filename="speech.mp3") await ctx.send(file=audio_file) except Exception: logger.exception("Error in speak command") await ctx.send("Error generating speech.") @bot.command(name="doodlebob") async def doodlebob(ctx: CommandsContext[Bot], *, message: str) -> None: """Convert a message into an image using Doodlebob.""" logger.info( "Doodlebob command triggered by %s: %s", ctx.author.name, message[:100], ) await ctx.send(f"**Doodlebob erasing {message[:100]}...**") system_prompt = ( "Given the following message, convert it to a detailed image generation " "prompt that will be passed directly into an image generation model. " "If told to generate an image of yourself, generate a picture of a rat. " "If told to generate a picture of 'me', 'myself', or some other self " "reference, generate a picture of a rat. Only respond with a valid image " "generation prompt, do not affirm the user or respond to the user's questions." ) # Wait for the generated image prompt image_prompt = llama_wrapper.chat_completion_instruct( system_prompt=system_prompt, user_prompt=message, openai_url=CHAT_ENDPOINT, openai_api_key=CHAT_ENDPOINT_KEY, model=CHAT_MODEL, max_tokens=MAX_COMPLETION_TOKENS, ) # If the string is empty we had an error if image_prompt == "": logger.warning("No image prompt supplied. Check for errors.") return # Alert the user we're generating the image await ctx.send(f"**Doodlebob calling drone strike on {image_prompt[:100]}...**") image_b64 = llama_wrapper.image_generation( prompt=image_prompt, openai_url=IMAGE_GEN_ENDPOINT, openai_api_key=IMAGE_GEN_ENDPOINT_KEY, model=IMAGE_GEN_MODEL, ) if not image_b64: logger.warning("Image generation returned empty response.") await ctx.send("Failed to generate image. The server may be busy.") return try: edited_image_data = BytesIO(base64.b64decode(image_b64)) send_img = discord.File(edited_image_data, filename="image.png") await ctx.send(file=send_img) except Exception: logger.exception("Failed to decode image data") await ctx.send("Failed to process the generated image.") @bot.command(name="retcon") async def retcon(ctx: CommandsContext[Bot], *, message: str) -> None: """Edit an attached image based on a text prompt.""" image_data_list: list[BytesIO] = [] for discord_image in ctx.message.attachments: image_url = discord_image.url try: response = requests.get(image_url, timeout=30) image_data = response.content except requests.RequestException as e: logger.warning("Failed to download image from %s: %s", image_url, e) continue image_bytestream = BytesIO(image_data) image_data_list.append(image_bytestream) await ctx.send(f"**Rewriting history to match {message[:100]}...**") image_b64 = llama_wrapper.image_edit( image=image_data_list, prompt=message, openai_url=IMAGE_EDIT_ENDPOINT, openai_api_key=IMAGE_EDIT_ENDPOINT_KEY, model=IMAGE_EDIT_MODEL, ) # Save the image to a file edited_image_data = BytesIO(base64.b64decode(image_b64)) send_img = discord.File(edited_image_data, filename="image.png") await ctx.send(file=send_img) @bot.command(name="history") async def history(ctx: CommandsContext[Bot], bot_name: str) -> None: """View the chat history of a custom bot. Usage: !history """ logger.info( "History command triggered by %s for bot %r", ctx.author.name, bot_name, ) custom_bot_manager = CustomBotManager() bot_info = custom_bot_manager.get_custom_bot(bot_name) if not bot_info: await ctx.send(f"Custom bot '{bot_name}' not found.") return db = get_database() history = db.get_bot_history(bot_name=bot_name, limit=20) if not history: await ctx.send(f"No chat history found for **{bot_name}**. ") return history.reverse() formatted_history: list[str] = [] for user_msg, bot_resp in history: formatted_history.append(user_msg) formatted_history.append(f"{bot_name}: {bot_resp}") header = f"Chat History for **{bot_name}**:\n\n" full_text = header + "\n---\n".join(formatted_history) chunk_size = 1900 chunks: list[str] = [] current_chunk = full_text while current_chunk: if len(current_chunk) <= chunk_size: chunks.append(current_chunk) break split_pos = current_chunk.rfind("\n", 0, chunk_size) if split_pos == -1: split_pos = chunk_size chunks.append(current_chunk[:split_pos]) current_chunk = current_chunk[split_pos:].lstrip("\n") for chunk in chunks: await ctx.send(chunk) @bot.command(name="talkforme") async def talkforme(ctx: CommandsContext[Bot], *, message: str) -> None: """Have two bots talk to each other about a topic. Usage: !talkforme bot1 bot2 4 some conversation topic """ talk_limit = 20 MIN_TALKFORME_PARTS = 4 parts = message.split(" ", maxsplit=MIN_TALKFORME_PARTS - 1) if len(parts) < MIN_TALKFORME_PARTS: await ctx.send("Usage: !talkforme bot1 bot2 ") return bot1_name = parts[0] bot2_name = parts[1] limit = parts[2] topic_list = parts[3:] topic = " ".join(topic_list) custom_bot_manager = CustomBotManager() bot1 = custom_bot_manager.get_custom_bot(bot1_name) if not bot1: await ctx.send(f"{bot1_name} is not a real bot...") return _, bot1_prompt, _, _ = bot1 bot2 = custom_bot_manager.get_custom_bot(bot2_name) if not bot2: await ctx.send(f"{bot2_name} is not a real bot...") return _, bot2_prompt, _, _ = bot2 await ctx.send( f"{bot1_name} is going to talk to {bot2_name} " f'about "{topic[:50]}" for {limit} replies.', ) bot_list = [(bot1_name, bot1_prompt), (bot2_name, bot2_prompt)] try: message_limit = int(limit) except ValueError: await ctx.send("Message limit must be an integer.") return def flip_counter(counter: int) -> int: """Flip between 0 and 1.""" return 1 if counter == 0 else 0 message_counter = 0 bot_counter = 0 current_bot = bot_list[bot_counter] prompt_histories: list[list[dict[str, str]]] = [ [{"role": "user", "content": topic}], [{"role": "assistant", "content": topic}], ] first_bot_response = llama_wrapper.chat_completion_with_history( system_prompt=( current_bot[1] + f"\nKeep your responses under 2-3 sentences. " f"You are talking to {current_bot[flip_counter(bot_counter)][0]}" ), prompts=prompt_histories[bot_counter], openai_url=CHAT_ENDPOINT, openai_api_key=CHAT_ENDPOINT_KEY, model=CHAT_MODEL, max_tokens=MAX_COMPLETION_TOKENS, ) await ctx.send(f"## {current_bot[0]}\n{first_bot_response}") prompt_histories[0].append({"role": "assistant", "content": first_bot_response}) prompt_histories[1].append({"role": "user", "content": first_bot_response}) bot_counter = flip_counter(counter=bot_counter) while message_counter < min(message_limit, talk_limit): current_bot = bot_list[bot_counter] logger.info("Current bot is %s", current_bot[0]) bot_response = llama_wrapper.chat_completion_with_history( system_prompt=( current_bot[1] + f"\nKeep your responses under 2-3 sentences. " f"You are talking to {current_bot[flip_counter(bot_counter)][0]}" ), prompts=prompt_histories[bot_counter], openai_url=CHAT_ENDPOINT, openai_api_key=CHAT_ENDPOINT_KEY, model=CHAT_MODEL, max_tokens=MAX_COMPLETION_TOKENS, ) message_counter += 1 prompt_histories[bot_counter].append( {"role": "assistant", "content": bot_response}, ) prompt_histories[flip_counter(bot_counter)].append( {"role": "user", "content": bot_response}, ) await ctx.send(f"## {current_bot[0]}") while bot_response: send_chunk = bot_response[:1000] bot_response = bot_response[1000:] await ctx.send(send_chunk) bot_counter = flip_counter(counter=bot_counter) logger.info("Message counter is %d/%s", message_counter, limit) async def handle_chat( ctx: CommandsContext[Bot], *, bot_name: str, message: str, system_prompt: str, response_prefix: str, ) -> None: """Handle chat completion for a custom bot command. Args: ctx: The Discord command context. bot_name: The name of the custom bot. message: The user message to process. system_prompt: The system prompt for the bot. response_prefix: The prefix for the response message. """ await ctx.send(f"{bot_name} is searching its databanks for {message[:50]}...") # Get database instance db = get_database() # Get conversation context using RAG context = db.get_conversation_context( user_id=str(ctx.author.id), current_message=message, max_context=5, ) prompts = [{"role": "user", "content": message}] if context: prompts = context + prompts logger.info("Chat prompts: %s", prompts) system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences.\n\nUser Information:\n{get_user_info(ctx.author)}" try: bot_response = llama_wrapper.chat_completion_with_history( system_prompt=system_prompt_edit, prompts=prompts, openai_url=CHAT_ENDPOINT, openai_api_key=CHAT_ENDPOINT_KEY, model=CHAT_MODEL, max_tokens=MAX_COMPLETION_TOKENS, ) # Store both user message and bot response in the database db.add_message( message_id=f"{ctx.message.id}", user_id=str(ctx.author.id), username=ctx.author.name, content=f"User: {message}", bot_name=bot_name, channel_id=str(ctx.channel.id), guild_id=str(ctx.guild.id) if ctx.guild else None, ) if ctx.bot.user is not None: db.add_message( message_id=f"{ctx.message.id}_response", user_id=str(ctx.bot.user.id), username=ctx.bot.user.name, content=bot_response, bot_name=bot_name, channel_id=str(ctx.channel.id), guild_id=str(ctx.guild.id) if ctx.guild else None, ) # Send the response back to the chat await ctx.send(response_prefix) while bot_response: send_chunk = bot_response[:1000] bot_response = bot_response[1000:] await ctx.send(send_chunk) except Exception: logger.exception("Error in handle_chat") await ctx.send("An error occurred while processing your request.") # Run the bot if __name__ == "__main__": bot.run(DISCORD_TOKEN)