import discord from discord.ext import commands import os import base64 from io import BytesIO from openai import OpenAI import logging from database import get_database, CustomBotManager # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) DISCORD_TOKEN = os.getenv("DISCORD_TOKEN", "placeholder") OPENAI_API_ENDPOINT = os.getenv("OPENAI_API_ENDPOINT") IMAGE_GEN_ENDPOINT = os.getenv("IMAGE_GEN_ENDPOINT") IMAGE_EDIT_ENDPOINT = os.getenv("IMAGE_EDIT_ENDPOINT") MAX_COMPLETION_TOKENS = int(os.getenv("MAX_COMPLETION_TOKENS", "1000")) if not OPENAI_API_ENDPOINT: raise Exception("OPENAI_API_ENDPOINT required.") if not IMAGE_GEN_ENDPOINT: raise Exception("IMAGE_GEN_ENDPOINT required.") # Set your OpenAI API key as an environment variable # You can also pass it directly but environment variables are safer OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "placeholder") # Initialize the bot intents = discord.Intents.default() intents.message_content = True bot = commands.Bot(command_prefix="!", intents=intents) # OpenAI Completions API endpoint OPENAI_COMPLETIONS_URL = f"{OPENAI_API_ENDPOINT}/chat/completions" @bot.event async def on_ready(): logger.info("Bot is starting up...") print(f"Bot logged in as {bot.user}") logger.info(f"Bot logged in as {bot.user}") @bot.command(name="custom-bot") async def custom_bot(ctx, bot_name: str, *, personality: str): """Create a custom bot with a name and personality Usage: !custom-bot Example: !custom-bot alfred you are a proper british butler """ logger.info( f"Custom bot command initiated by {ctx.author.name}: name='{bot_name}', personality length={len(personality)}" ) # Validate bot name if not bot_name or len(bot_name) < 2 or len(bot_name) > 50: logger.warning( f"Invalid bot name from {ctx.author.name}: '{bot_name}' (length: {len(bot_name) if bot_name else 0})" ) await ctx.send("❌ Invalid bot name. Name must be between 2 and 50 characters.") return logger.info(f"Bot name validation passed for '{bot_name}'") # Validate personality if not personality or len(personality) < 10: logger.warning( f"Invalid personality from {ctx.author.name}: length={len(personality) if personality else 0}" ) await ctx.send( "❌ Invalid personality. Description must be at least 10 characters." ) return logger.info(f"Personality validation passed for bot '{bot_name}'") # Create custom bot manager logger.info(f"Initializing CustomBotManager for user {ctx.author.name}") custom_bot_manager = CustomBotManager() # Create the custom bot logger.info( f"Attempting to create custom bot '{bot_name}' for user {ctx.author.name}" ) success = custom_bot_manager.create_custom_bot( bot_name=bot_name, system_prompt=personality, created_by=str(ctx.author.id) ) if success: logger.info( f"Successfully created custom bot '{bot_name}' for user {ctx.author.name}" ) await ctx.send( f"✅ Custom bot **'{bot_name}'** has been created with personality: *{personality}*" ) await ctx.send(f"\nYou can now use this bot with: `!{bot_name} `") else: logger.warning( f"Failed to create custom bot '{bot_name}' for user {ctx.author.name}" ) await ctx.send("❌ Failed to create custom bot. It may already exist.") @bot.command(name="list-custom-bots") async def list_custom_bots(ctx): """List all custom bots available in the server""" logger.info(f"Listing custom bots requested by {ctx.author.name}") # Create custom bot manager logger.info("Initializing CustomBotManager to list custom bots") custom_bot_manager = CustomBotManager() logger.info("Fetching list of custom bots from database") bots = custom_bot_manager.list_custom_bots() if not bots: logger.info(f"No custom bots found for user {ctx.author.name}") await ctx.send( "No custom bots have been created yet. Use `!custom-bot ` to create one." ) return logger.info( f"Found {len(bots)} custom bots, displaying top 10 for {ctx.author.name}" ) bot_list = "🤖 **Available Custom Bots**:\n\n" for name, prompt, creator in bots[:10]: # Limit to 10 bots bot_list += f"• **{name}** (created by {creator})\n" logger.info(f"Sending bot list response to {ctx.author.name}") await ctx.send(bot_list) @bot.command(name="delete-custom-bot") async def delete_custom_bot(ctx, bot_name: str): """Delete a custom bot (only the creator can delete) Usage: !delete-custom-bot """ logger.info( f"Delete custom bot command initiated by {ctx.author.name}: bot_name='{bot_name}'" ) # Create custom bot manager logger.info("Initializing CustomBotManager for delete operation") custom_bot_manager = CustomBotManager() # Get bot info logger.info(f"Looking up custom bot '{bot_name}' in database") bot_info = custom_bot_manager.get_custom_bot(bot_name) if not bot_info: logger.warning(f"Custom bot '{bot_name}' not found by user {ctx.author.name}") await ctx.send(f"❌ Custom bot '{bot_name}' not found.") return logger.info(f"Custom bot '{bot_name}' found, owned by user {bot_info[2]}") # Check ownership if bot_info[2] != str(ctx.author.id): logger.warning( f"User {ctx.author.name} attempted to delete bot '{bot_name}' they don't own" ) await ctx.send("❌ You can only delete your own custom bots.") return logger.info(f"User {ctx.author.name} is authorized to delete bot '{bot_name}'") # Delete the bot logger.info(f"Deleting custom bot '{bot_name}' from database") success = custom_bot_manager.delete_custom_bot(bot_name) if success: logger.info( f"Successfully deleted custom bot '{bot_name}' by user {ctx.author.name}" ) await ctx.send(f"✅ Custom bot '{bot_name}' has been deleted.") else: logger.warning( f"Failed to delete custom bot '{bot_name}' by user {ctx.author.name}" ) await ctx.send("❌ Failed to delete custom bot.") # Handle custom bot commands @bot.event async def on_message(message): # Skip bot messages if message.author == bot.user: return logger.debug( f"Processing message from {message.author.name}: '{message.content[:50]}...'" ) ctx = await bot.get_context(message) # Check if the message starts with a custom bot command content = message.content.lower() logger.info(f"Initializing CustomBotManager to check for custom bot commands") custom_bot_manager = CustomBotManager() logger.info("Fetching list of custom bots to check for matching commands") custom_bots = custom_bot_manager.list_custom_bots() logger.info(f"Checking {len(custom_bots)} custom bots for command match") for bot_name, system_prompt, _ in custom_bots: # Check if message starts with the custom bot name followed by a space if content.startswith(f"!{bot_name} "): logger.info( f"Custom bot command detected: '{bot_name}' triggered by {message.author.name}" ) # Extract the actual message (remove the bot name prefix) user_message = message.content[len(f"!{bot_name} ") :] logger.debug( f"Extracted user message for bot '{bot_name}': '{user_message[:50]}...'" ) # Prepare the payload with custom personality payload = { "model": "qwen3-vl-30b-a3b-instruct", "messages": [ { "role": "system", "content": system_prompt, }, {"role": "user", "content": user_message}, ], "max_completion_tokens": MAX_COMPLETION_TOKENS, } response_prefix = f"**{bot_name} response**" logger.info(f"Sending request to OpenAI API for bot '{bot_name}'") await handle_chat( ctx=ctx, message=user_message, payload=payload, response_prefix=response_prefix, ) return # If no custom bot matched, call the default event handler await bot.process_commands(message) @bot.command(name="doodlebob") async def doodlebob(ctx, *, message: str): # add some logging logger.info(f"Doodlebob command triggered by {ctx.author.name}: {message[:100]}") await ctx.send(f"**Doodlebob erasing {message[:100]}...**") image_prompt_payload = { "model": "qwen3-vl-30b-a3b-instruct", "messages": [ { "role": "system", "content": ( "Given the following message, convert it to a detailed image generation prompt that will be passed directly into an image generation model." "If told to generate an image of yourself, generate a picture of a rat. If told to generate a picture of 'me', 'myself', or some other self" " reference, generate a picture of a rat. Only respond with a valid image generation prompt, do not affirm the user or respond to the user's" " questions." ), }, {"role": "user", "content": message}, ], } # Wait for the generated image prompt image_prompt = await call_llm(ctx, image_prompt_payload) # If the string is empty we had an error if image_prompt == "": print("No image prompt supplied. Check for errors.") return # Alert the user we're generating the image await ctx.send(f"**Doodlebob calling drone strike on {image_prompt[:100]}...**") # Create the image prompt payload image_payload = { "model": "default", "prompt": image_prompt, "n": 1, "size": "1024x1024", } # Call the image generation endpoint response = requests.post( f"{IMAGE_GEN_ENDPOINT}/images/generations", json=image_payload, timeout=120, ) if response.status_code == 200: result = response.json() # Send image image_data = BytesIO(base64.b64decode(result["data"][0]["b64_json"])) send_img = discord.File(image_data, filename="image.png") await ctx.send(file=send_img) else: print(f"❌ Error: {response.status_code}") print(response.text) return None @bot.command(name="retcon") async def retcon(ctx, *, message: str): image_url = ctx.message.attachments[0].url image_data = requests.get(image_url).content image_bytestream = BytesIO(image_data) await ctx.send(f"**Rewriting history to match {message[:100]}...**") client = OpenAI(base_url=IMAGE_EDIT_ENDPOINT, api_key=OPENAI_API_KEY) result = client.images.edit( model="placeholder", image=[image_bytestream], prompt=message, size="1024x1024", ) image_base64 = result.data[0].b64_json image_bytes = base64.b64decode(image_base64) # Save the image to a file edited_image_data = BytesIO(image_bytes) send_img = discord.File(edited_image_data, filename="image.png") await ctx.send(file=send_img) async def handle_chat(ctx, *, message: str, payload: dict, response_prefix: str): # Check if API key is set if not OPENAI_API_KEY: await ctx.send( "Error: OpenAI API key is not configured. Please set the OPENAI_API_KEY environment variable." ) return # Get database instance db = get_database() # Get conversation context using RAG context = db.get_conversation_context( user_id=str(ctx.author.id), current_message=message, max_context=5 ) if context: payload["messages"][0][ "content" ] += f"\n\nRelevant conversation history:\n{context}" payload["messages"][1]["content"] = message print(payload) try: # Initialize OpenAI client client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_API_ENDPOINT) # Call OpenAI API response = client.chat.completions.create( model=payload["model"], messages=payload["messages"], max_completion_tokens=MAX_COMPLETION_TOKENS, frequency_penalty=1.5, presence_penalty=1.5, temperature=1, seed=-1, ) # Extract the generated text generated_text = response.choices[0].message.content.strip() # Store both user message and bot response in the database db.add_message( message_id=f"{ctx.message.id}", user_id=str(ctx.author.id), username=ctx.author.name, content=f"User: {message}", channel_id=str(ctx.channel.id), guild_id=str(ctx.guild.id) if ctx.guild else None, ) db.add_message( message_id=f"{ctx.message.id}_response", user_id=str(bot.user.id), username=bot.user.name, content=f"Bot: {generated_text}", channel_id=str(ctx.channel.id), guild_id=str(ctx.guild.id) if ctx.guild else None, ) # Send the response back to the chat await ctx.send(response_prefix) while generated_text: send_chunk = generated_text[:1000] generated_text = generated_text[1000:] await ctx.send(send_chunk) except requests.exceptions.HTTPError as e: await ctx.send(f"Error: OpenAI API error - {e}") except requests.exceptions.Timeout: await ctx.send("Error: Request timed out. Please try again.") except Exception as e: await ctx.send(f"Error: {str(e)}") async def call_llm(ctx, payload: dict) -> str: # Check if API key is set if not OPENAI_API_KEY: await ctx.send( "Error: OpenAI API key is not configured. Please set the OPENAI_API_KEY environment variable." ) return "" # Set headers headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json", } try: # Initialize OpenAI client client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_API_ENDPOINT) # Call OpenAI API response = client.chat.completions.create( model=payload["model"], messages=payload["messages"], max_tokens=MAX_COMPLETION_TOKENS, ) # Extract the generated text generated_text = response.choices[0].message.content.strip() print(generated_text) return generated_text except requests.exceptions.HTTPError as e: await ctx.send(f"Error: OpenAI API error - {e}") except requests.exceptions.Timeout: await ctx.send("Error: Request timed out. Please try again.") except Exception as e: await ctx.send(f"Error: {str(e)}") return "" # Run the bot if __name__ == "__main__": bot.run(DISCORD_TOKEN)