import discord from discord.ext import commands import os import base64 from io import BytesIO from openai import OpenAI import logging from database import get_database, CustomBotManager # type: ignore from config import ( # type: ignore CHAT_ENDPOINT_KEY, DISCORD_TOKEN, CHAT_ENDPOINT, CHAT_MODEL, IMAGE_EDIT_ENDPOINT_KEY, IMAGE_GEN_ENDPOINT, IMAGE_EDIT_ENDPOINT, MAX_COMPLETION_TOKENS, ) import llama_wrapper # type: ignore import requests # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Initialize the bot intents = discord.Intents.default() intents.message_content = True bot = commands.Bot(command_prefix="!", intents=intents) @bot.event async def on_ready(): logger.info("Bot is starting up...") print(f"Bot logged in as {bot.user}") logger.info(f"Bot logged in as {bot.user}") @bot.command(name="custom-bot") # type: ignore async def custom_bot(ctx, bot_name: str, *, personality: str): """Create a custom bot with a name and personality Usage: !custom-bot Example: !custom-bot alfred you are a proper british butler """ logger.info( f"Custom bot command initiated by {ctx.author.name}: name='{bot_name}', personality length={len(personality)}" ) # Validate bot name if not bot_name or len(bot_name) < 2 or len(bot_name) > 50: logger.warning( f"Invalid bot name from {ctx.author.name}: '{bot_name}' (length: {len(bot_name) if bot_name else 0})" ) await ctx.send("❌ Invalid bot name. Name must be between 2 and 50 characters.") return logger.info(f"Bot name validation passed for '{bot_name}'") # Validate personality if not personality or len(personality) < 10: logger.warning( f"Invalid personality from {ctx.author.name}: length={len(personality) if personality else 0}" ) await ctx.send( "❌ Invalid personality. Description must be at least 10 characters." ) return logger.info(f"Personality validation passed for bot '{bot_name}'") # Create custom bot manager logger.info(f"Initializing CustomBotManager for user {ctx.author.name}") custom_bot_manager = CustomBotManager() # Create the custom bot logger.info( f"Attempting to create custom bot '{bot_name}' for user {ctx.author.name}" ) success = custom_bot_manager.create_custom_bot( bot_name=bot_name, system_prompt=personality, created_by=str(ctx.author.id) ) if success: logger.info( f"Successfully created custom bot '{bot_name}' for user {ctx.author.name}" ) await ctx.send( f"✅ Custom bot **'{bot_name}'** has been created with personality: *{personality}*" ) await ctx.send(f"\nYou can now use this bot with: `!{bot_name} `") else: logger.warning( f"Failed to create custom bot '{bot_name}' for user {ctx.author.name}" ) await ctx.send("❌ Failed to create custom bot. It may already exist.") @bot.command(name="list-custom-bots") async def list_custom_bots(ctx): """List all custom bots available in the server""" logger.info(f"Listing custom bots requested by {ctx.author.name}") # Create custom bot manager logger.info("Initializing CustomBotManager to list custom bots") custom_bot_manager = CustomBotManager() logger.info("Fetching list of custom bots from database") bots = custom_bot_manager.list_custom_bots() if not bots: logger.info(f"No custom bots found for user {ctx.author.name}") await ctx.send( "No custom bots have been created yet. Use `!custom-bot ` to create one." ) return logger.info( f"Found {len(bots)} custom bots, displaying top 10 for {ctx.author.name}" ) bot_list = "🤖 **Available Custom Bots**:\n\n" for name, prompt, creator in bots: bot_list += f"• **{name}**\n" logger.info(f"Sending bot list response to {ctx.author.name}") await ctx.send(bot_list) @bot.command(name="delete-custom-bot") # type: ignore async def delete_custom_bot(ctx, bot_name: str): """Delete a custom bot (only the creator can delete) Usage: !delete-custom-bot """ logger.info( f"Delete custom bot command initiated by {ctx.author.name}: bot_name='{bot_name}'" ) # Create custom bot manager logger.info("Initializing CustomBotManager for delete operation") custom_bot_manager = CustomBotManager() # Get bot info logger.info(f"Looking up custom bot '{bot_name}' in database") bot_info = custom_bot_manager.get_custom_bot(bot_name) if not bot_info: logger.warning(f"Custom bot '{bot_name}' not found by user {ctx.author.name}") await ctx.send(f"❌ Custom bot '{bot_name}' not found.") return logger.info(f"Custom bot '{bot_name}' found, owned by user {bot_info[2]}") # Check ownership if bot_info[2] != str(ctx.author.id): logger.warning( f"User {ctx.author.name} attempted to delete bot '{bot_name}' they don't own" ) await ctx.send("❌ You can only delete your own custom bots.") return logger.info(f"User {ctx.author.name} is authorized to delete bot '{bot_name}'") # Delete the bot logger.info(f"Deleting custom bot '{bot_name}' from database") success = custom_bot_manager.delete_custom_bot(bot_name) if success: logger.info( f"Successfully deleted custom bot '{bot_name}' by user {ctx.author.name}" ) await ctx.send(f"✅ Custom bot '{bot_name}' has been deleted.") else: logger.warning( f"Failed to delete custom bot '{bot_name}' by user {ctx.author.name}" ) await ctx.send("❌ Failed to delete custom bot.") # Handle custom bot commands @bot.event async def on_message(message): # Skip bot messages if message.author == bot.user: return message_author = message.author.name message_content = message.content.lower() logger.debug( f"Processing message from {message_author}: '{message_content[:50]}...'" ) ctx = await bot.get_context(message) logger.info("Initializing CustomBotManager to check for custom bot commands") custom_bot_manager = CustomBotManager() logger.info("Fetching list of custom bots to check for matching commands") custom_bots = custom_bot_manager.list_custom_bots() logger.info(f"Checking {len(custom_bots)} custom bots for command match") for bot_name, system_prompt, _ in custom_bots: # Check if message starts with the custom bot name followed by a space if message_content.startswith(f"!{bot_name} "): logger.info( f"Custom bot command detected: '{bot_name}' triggered by {message.author.name}" ) # Extract the actual message (remove the bot name prefix) user_message = message.content[len(f"!{bot_name} ") :] logger.debug( f"Extracted user message for bot '{bot_name}': '{user_message[:50]}...'" ) # Prepare the payload with custom personality response_prefix = f"**{bot_name} response**" logger.info(f"Sending request to OpenAI API for bot '{bot_name}'") await handle_chat( ctx=ctx, bot_name=bot_name, message=user_message, system_prompt=system_prompt, response_prefix=response_prefix, ) return # If no custom bot matched, call the default event handler await bot.process_commands(message) @bot.command(name="doodlebob") async def doodlebob(ctx, *, message: str): # add some logging logger.info(f"Doodlebob command triggered by {ctx.author.name}: {message[:100]}") await ctx.send(f"**Doodlebob erasing {message[:100]}...**") system_prompt = ( "Given the following message, convert it to a detailed image generation prompt that will be passed directly into an image generation model." "If told to generate an image of yourself, generate a picture of a rat. If told to generate a picture of 'me', 'myself', or some other self" " reference, generate a picture of a rat. Only respond with a valid image generation prompt, do not affirm the user or respond to the user's" " questions." ) # Wait for the generated image prompt image_prompt = llama_wrapper.chat_completion_instruct( system_prompt=system_prompt, user_prompt=message, openai_url=CHAT_ENDPOINT, openai_api_key=CHAT_ENDPOINT_KEY, model=CHAT_MODEL, max_tokens=MAX_COMPLETION_TOKENS, ) # If the string is empty we had an error if image_prompt == "": print("No image prompt supplied. Check for errors.") return # Alert the user we're generating the image await ctx.send(f"**Doodlebob calling drone strike on {image_prompt[:100]}...**") image_b64 = llama_wrapper.image_generation( prompt=message, openai_url=IMAGE_EDIT_ENDPOINT, openai_api_key=IMAGE_EDIT_ENDPOINT_KEY, ) # Save the image to a file edited_image_data = BytesIO(base64.b64decode(image_b64)) send_img = discord.File(edited_image_data, filename="image.png") await ctx.send(file=send_img) @bot.command(name="retcon") async def retcon(ctx, *, message: str): image_url = ctx.message.attachments[0].url image_data = requests.get(image_url).content image_bytestream = BytesIO(image_data) await ctx.send(f"**Rewriting history to match {message[:100]}...**") image_b64 = llama_wrapper.image_edit( image=image_bytestream, prompt=message, openai_url=IMAGE_EDIT_ENDPOINT, openai_api_key=IMAGE_EDIT_ENDPOINT_KEY, ) # Save the image to a file edited_image_data = BytesIO(base64.b64decode(image_b64)) send_img = discord.File(edited_image_data, filename="image.png") await ctx.send(file=send_img) @bot.command(name="talkforme") async def talkforme(ctx, *, message: str): """Have two bots talk to each other about a topic Usage: !talkforme bot1 bot2 4 some conversation topic """ TALK_LIMIT = 20 bot1_name, bot2_name, limit, topic_list = ( message.split(" ")[0], message.split(" ")[1], message.split(" ")[2], message.split(" ")[3:], ) topic = " ".join(topic_list) custom_bot_manager = CustomBotManager() bot1 = custom_bot_manager.get_custom_bot(bot1_name) if not bot1: await ctx.send(f"{bot1_name} is not a real bot...") return else: _, bot1_prompt, _, _ = bot1 bot2 = custom_bot_manager.get_custom_bot(bot2_name) if not bot2: await ctx.send(f"{bot2_name} is not a real bot...") return else: _, bot2_prompt, _, _ = bot2 await ctx.send( f'{bot1_name} is going to talk to {bot2_name} about "{topic[:50]}" for {limit} replies.' ) bot_list = [(bot1_name, bot1_prompt), (bot2_name, bot2_prompt)] message_limit = int(limit) def flip_counter(counter: int): if counter == 0: return 1 else: return 0 def flip_user(user: str): if user == "user": return "assistant" else: return "user" message_counter = 0 bot_counter = 0 current_bot = bot_list[bot_counter] prompt_histories = [ [{"role": "user", "content": topic}], [{"role": "assistant", "content": topic}], ] first_bot_response = llama_wrapper.chat_completion_with_history( system_prompt=current_bot[1] + f"\nKeep your responses under 2-3 sentences. You are talking to {current_bot[flip_counter(bot_counter)][0]}", prompts=prompt_histories[bot_counter], # type: ignore openai_url=CHAT_ENDPOINT, openai_api_key=CHAT_ENDPOINT_KEY, model=CHAT_MODEL, max_tokens=MAX_COMPLETION_TOKENS, ) await ctx.send(f"## {current_bot[0]}\n{first_bot_response}") prompt_histories[0].append({"role": "assistant", "content": first_bot_response}) prompt_histories[1].append({"role": "user", "content": first_bot_response}) bot_counter = flip_counter(counter=bot_counter) while message_counter < min(message_limit, TALK_LIMIT): current_bot = bot_list[bot_counter] logger.info(f"Current bot is {current_bot}") bot_response = llama_wrapper.chat_completion_with_history( system_prompt=current_bot[1] + f"\nKeep your responses under 2-3 sentences. {current_bot[flip_counter(bot_counter)]}", prompts=prompt_histories[bot_counter], # type: ignore openai_url=CHAT_ENDPOINT, openai_api_key=CHAT_ENDPOINT_KEY, model=CHAT_MODEL, max_tokens=MAX_COMPLETION_TOKENS, ) message_counter += 1 prompt_histories[bot_counter].append( {"role": "assistant", "content": bot_response} ) prompt_histories[flip_counter(bot_counter)].append( {"role": "user", "content": bot_response} ) await ctx.send(f"## {current_bot[0]}") while bot_response: send_chunk = bot_response[:1000] bot_response = bot_response[1000:] await ctx.send(send_chunk) bot_counter = flip_counter(counter=bot_counter) logger.info(f"Message counter is {message_counter}/{limit}") async def handle_chat( ctx, *, bot_name: str, message: str, system_prompt: str, response_prefix: str ): await ctx.send(f"{bot_name} is searching its databanks for {message[:50]}...") # Get database instance db = get_database() # Get conversation context using RAG context = db.get_conversation_context( user_id=str(ctx.author.id), current_message=message, max_context=5 ) prompts = [{"role": "user", "content": message}] if context: prompts = context + prompts logger.info(prompts) system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences." try: bot_response = llama_wrapper.chat_completion_with_history( system_prompt=system_prompt_edit, prompts=prompts, # type: ignore openai_url=CHAT_ENDPOINT, openai_api_key=CHAT_ENDPOINT_KEY, model=CHAT_MODEL, max_tokens=MAX_COMPLETION_TOKENS, ) # Store both user message and bot response in the database db.add_message( message_id=f"{ctx.message.id}", user_id=str(ctx.author.id), username=ctx.author.name, content=f"User: {message}", channel_id=str(ctx.channel.id), guild_id=str(ctx.guild.id) if ctx.guild else None, ) db.add_message( message_id=f"{ctx.message.id}_response", user_id=str(bot.user.id), # type: ignore username=bot.user.name, # type: ignore content=f"Bot: {bot_response}", channel_id=str(ctx.channel.id), guild_id=str(ctx.guild.id) if ctx.guild else None, ) # Send the response back to the chat await ctx.send(response_prefix) while bot_response: send_chunk = bot_response[:1000] bot_response = bot_response[1000:] await ctx.send(send_chunk) except Exception as e: await ctx.send(f"Error: {str(e)}") # Run the bot if __name__ == "__main__": bot.run(DISCORD_TOKEN)