Files
vibe-bot/vibe_bot/main.py

467 lines
15 KiB
Python

import discord
from discord.ext import commands
import os
import base64
from io import BytesIO
from openai import OpenAI
import logging
from database import get_database, CustomBotManager # type: ignore
from config import ( # type: ignore
CHAT_ENDPOINT_KEY,
DISCORD_TOKEN,
CHAT_ENDPOINT,
CHAT_MODEL,
IMAGE_EDIT_ENDPOINT_KEY,
IMAGE_GEN_ENDPOINT,
IMAGE_EDIT_ENDPOINT,
MAX_COMPLETION_TOKENS,
)
import llama_wrapper # type: ignore
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize the bot
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents)
@bot.event
async def on_ready():
logger.info("Bot is starting up...")
print(f"Bot logged in as {bot.user}")
logger.info(f"Bot logged in as {bot.user}")
@bot.command(name="custom-bot") # type: ignore
async def custom_bot(ctx, bot_name: str, *, personality: str):
"""Create a custom bot with a name and personality
Usage: !custom-bot <bot_name> <personality_description>
Example: !custom-bot alfred you are a proper british butler
"""
logger.info(
f"Custom bot command initiated by {ctx.author.name}: name='{bot_name}', personality length={len(personality)}"
)
# Validate bot name
if not bot_name or len(bot_name) < 2 or len(bot_name) > 50:
logger.warning(
f"Invalid bot name from {ctx.author.name}: '{bot_name}' (length: {len(bot_name) if bot_name else 0})"
)
await ctx.send("❌ Invalid bot name. Name must be between 2 and 50 characters.")
return
logger.info(f"Bot name validation passed for '{bot_name}'")
# Validate personality
if not personality or len(personality) < 10:
logger.warning(
f"Invalid personality from {ctx.author.name}: length={len(personality) if personality else 0}"
)
await ctx.send(
"❌ Invalid personality. Description must be at least 10 characters."
)
return
logger.info(f"Personality validation passed for bot '{bot_name}'")
# Create custom bot manager
logger.info(f"Initializing CustomBotManager for user {ctx.author.name}")
custom_bot_manager = CustomBotManager()
# Create the custom bot
logger.info(
f"Attempting to create custom bot '{bot_name}' for user {ctx.author.name}"
)
success = custom_bot_manager.create_custom_bot(
bot_name=bot_name, system_prompt=personality, created_by=str(ctx.author.id)
)
if success:
logger.info(
f"Successfully created custom bot '{bot_name}' for user {ctx.author.name}"
)
await ctx.send(
f"✅ Custom bot **'{bot_name}'** has been created with personality: *{personality}*"
)
await ctx.send(f"\nYou can now use this bot with: `!{bot_name} <your message>`")
else:
logger.warning(
f"Failed to create custom bot '{bot_name}' for user {ctx.author.name}"
)
await ctx.send("❌ Failed to create custom bot. It may already exist.")
@bot.command(name="list-custom-bots")
async def list_custom_bots(ctx):
"""List all custom bots available in the server"""
logger.info(f"Listing custom bots requested by {ctx.author.name}")
# Create custom bot manager
logger.info("Initializing CustomBotManager to list custom bots")
custom_bot_manager = CustomBotManager()
logger.info("Fetching list of custom bots from database")
bots = custom_bot_manager.list_custom_bots()
if not bots:
logger.info(f"No custom bots found for user {ctx.author.name}")
await ctx.send(
"No custom bots have been created yet. Use `!custom-bot <name> <personality>` to create one."
)
return
logger.info(
f"Found {len(bots)} custom bots, displaying top 10 for {ctx.author.name}"
)
bot_list = "🤖 **Available Custom Bots**:\n\n"
for name, prompt, creator in bots:
bot_list += f"• **{name}**\n"
logger.info(f"Sending bot list response to {ctx.author.name}")
await ctx.send(bot_list)
@bot.command(name="delete-custom-bot") # type: ignore
async def delete_custom_bot(ctx, bot_name: str):
"""Delete a custom bot (only the creator can delete)
Usage: !delete-custom-bot <bot_name>
"""
logger.info(
f"Delete custom bot command initiated by {ctx.author.name}: bot_name='{bot_name}'"
)
# Create custom bot manager
logger.info("Initializing CustomBotManager for delete operation")
custom_bot_manager = CustomBotManager()
# Get bot info
logger.info(f"Looking up custom bot '{bot_name}' in database")
bot_info = custom_bot_manager.get_custom_bot(bot_name)
if not bot_info:
logger.warning(f"Custom bot '{bot_name}' not found by user {ctx.author.name}")
await ctx.send(f"❌ Custom bot '{bot_name}' not found.")
return
logger.info(f"Custom bot '{bot_name}' found, owned by user {bot_info[2]}")
# Check ownership
if bot_info[2] != str(ctx.author.id):
logger.warning(
f"User {ctx.author.name} attempted to delete bot '{bot_name}' they don't own"
)
await ctx.send("❌ You can only delete your own custom bots.")
return
logger.info(f"User {ctx.author.name} is authorized to delete bot '{bot_name}'")
# Delete the bot
logger.info(f"Deleting custom bot '{bot_name}' from database")
success = custom_bot_manager.delete_custom_bot(bot_name)
if success:
logger.info(
f"Successfully deleted custom bot '{bot_name}' by user {ctx.author.name}"
)
await ctx.send(f"✅ Custom bot '{bot_name}' has been deleted.")
else:
logger.warning(
f"Failed to delete custom bot '{bot_name}' by user {ctx.author.name}"
)
await ctx.send("❌ Failed to delete custom bot.")
# Handle custom bot commands
@bot.event
async def on_message(message):
# Skip bot messages
if message.author == bot.user:
return
message_author = message.author.name
message_content = message.content.lower()
logger.debug(
f"Processing message from {message_author}: '{message_content[:50]}...'"
)
ctx = await bot.get_context(message)
logger.info("Initializing CustomBotManager to check for custom bot commands")
custom_bot_manager = CustomBotManager()
logger.info("Fetching list of custom bots to check for matching commands")
custom_bots = custom_bot_manager.list_custom_bots()
logger.info(f"Checking {len(custom_bots)} custom bots for command match")
for bot_name, system_prompt, _ in custom_bots:
# Check if message starts with the custom bot name followed by a space
if message_content.startswith(f"!{bot_name} "):
logger.info(
f"Custom bot command detected: '{bot_name}' triggered by {message.author.name}"
)
# Extract the actual message (remove the bot name prefix)
user_message = message.content[len(f"!{bot_name} ") :]
logger.debug(
f"Extracted user message for bot '{bot_name}': '{user_message[:50]}...'"
)
# Prepare the payload with custom personality
response_prefix = f"**{bot_name} response**"
logger.info(f"Sending request to OpenAI API for bot '{bot_name}'")
await handle_chat(
ctx=ctx,
bot_name=bot_name,
message=user_message,
system_prompt=system_prompt,
response_prefix=response_prefix,
)
return
# If no custom bot matched, call the default event handler
await bot.process_commands(message)
@bot.command(name="doodlebob")
async def doodlebob(ctx, *, message: str):
# add some logging
logger.info(f"Doodlebob command triggered by {ctx.author.name}: {message[:100]}")
await ctx.send(f"**Doodlebob erasing {message[:100]}...**")
system_prompt = (
"Given the following message, convert it to a detailed image generation prompt that will be passed directly into an image generation model."
"If told to generate an image of yourself, generate a picture of a rat. If told to generate a picture of 'me', 'myself', or some other self"
" reference, generate a picture of a rat. Only respond with a valid image generation prompt, do not affirm the user or respond to the user's"
" questions."
)
# Wait for the generated image prompt
image_prompt = llama_wrapper.chat_completion_instruct(
system_prompt=system_prompt,
user_prompt=message,
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
# If the string is empty we had an error
if image_prompt == "":
print("No image prompt supplied. Check for errors.")
return
# Alert the user we're generating the image
await ctx.send(f"**Doodlebob calling drone strike on {image_prompt[:100]}...**")
image_b64 = llama_wrapper.image_generation(
prompt=message,
openai_url=IMAGE_EDIT_ENDPOINT,
openai_api_key=IMAGE_EDIT_ENDPOINT_KEY,
)
# Save the image to a file
edited_image_data = BytesIO(base64.b64decode(image_b64))
send_img = discord.File(edited_image_data, filename="image.png")
await ctx.send(file=send_img)
@bot.command(name="retcon")
async def retcon(ctx, *, message: str):
image_url = ctx.message.attachments[0].url
image_data = requests.get(image_url).content
image_bytestream = BytesIO(image_data)
await ctx.send(f"**Rewriting history to match {message[:100]}...**")
image_b64 = llama_wrapper.image_edit(
image=image_bytestream,
prompt=message,
openai_url=IMAGE_EDIT_ENDPOINT,
openai_api_key=IMAGE_EDIT_ENDPOINT_KEY,
)
# Save the image to a file
edited_image_data = BytesIO(base64.b64decode(image_b64))
send_img = discord.File(edited_image_data, filename="image.png")
await ctx.send(file=send_img)
@bot.command(name="talkforme")
async def talkforme(ctx, *, message: str):
"""Have two bots talk to each other about a topic
Usage: !talkforme bot1 bot2 4 some conversation topic
"""
TALK_LIMIT = 20
bot1_name, bot2_name, limit, topic_list = (
message.split(" ")[0],
message.split(" ")[1],
message.split(" ")[2],
message.split(" ")[3:],
)
topic = " ".join(topic_list)
custom_bot_manager = CustomBotManager()
bot1 = custom_bot_manager.get_custom_bot(bot1_name)
if not bot1:
await ctx.send(f"{bot1_name} is not a real bot...")
return
else:
_, bot1_prompt, _, _ = bot1
bot2 = custom_bot_manager.get_custom_bot(bot2_name)
if not bot2:
await ctx.send(f"{bot2_name} is not a real bot...")
return
else:
_, bot2_prompt, _, _ = bot2
await ctx.send(
f'{bot1_name} is going to talk to {bot2_name} about "{topic[:50]}" for {limit} replies.'
)
bot_list = [(bot1_name, bot1_prompt), (bot2_name, bot2_prompt)]
message_limit = int(limit)
def flip_counter(counter: int):
if counter == 0:
return 1
else:
return 0
def flip_user(user: str):
if user == "user":
return "assistant"
else:
return "user"
message_counter = 0
bot_counter = 0
current_user = "user"
current_bot = bot_list[bot_counter]
prompt_history = [{"role": current_user, "content": topic}]
first_bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=current_bot[1] + "\nKeep your responses under 2-3 sentences.",
prompts=prompt_history, # type: ignore
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
await ctx.send(f"## {current_bot[0]}\n{first_bot_response}")
prompt_history.append({"role": current_user, "content": first_bot_response})
current_user = flip_user(current_user)
bot_counter = flip_counter(counter=bot_counter)
while message_counter < min(message_limit, TALK_LIMIT):
current_bot = bot_list[bot_counter]
logger.info(f"Current user is {current_user}")
logger.info(f"Current bot is {current_bot}")
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=current_bot[1] + "\nKeep your responses under 2-3 sentences.",
prompts=prompt_history, # type: ignore
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
message_counter += 1
await ctx.send(f"## {current_bot[0]}")
while bot_response:
send_chunk = bot_response[:1000]
bot_response = bot_response[1000:]
await ctx.send(send_chunk)
prompt_history.append({"role": current_user, "content": bot_response})
bot_counter = flip_counter(counter=bot_counter)
current_user = flip_user(current_user)
logger.info(f"Message counter is {message_counter}/{limit}")
async def handle_chat(
ctx, *, bot_name: str, message: str, system_prompt: str, response_prefix: str
):
await ctx.send(f"{bot_name} is searching its databanks for {message[:50]}...")
# Get database instance
db = get_database()
# Get conversation context using RAG
context = db.get_conversation_context(
user_id=str(ctx.author.id), current_message=message, max_context=5
)
if context:
user_message = f"\n\nRelevant conversation history:\n{context}\n\n{message}"
else:
user_message = message
logger.info(user_message)
system_prompt_edit = (
"Keep your responses somewhat short, limited to 500 words or less. "
f"{system_prompt}"
)
try:
bot_response = llama_wrapper.chat_completion_instruct(
system_prompt=system_prompt_edit,
user_prompt=user_message,
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
# Store both user message and bot response in the database
db.add_message(
message_id=f"{ctx.message.id}",
user_id=str(ctx.author.id),
username=ctx.author.name,
content=f"User: {message}",
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
db.add_message(
message_id=f"{ctx.message.id}_response",
user_id=str(bot.user.id), # type: ignore
username=bot.user.name, # type: ignore
content=f"Bot: {bot_response}",
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
# Send the response back to the chat
await ctx.send(response_prefix)
while bot_response:
send_chunk = bot_response[:1000]
bot_response = bot_response[1000:]
await ctx.send(send_chunk)
except Exception as e:
await ctx.send(f"Error: {str(e)}")
# Run the bot
if __name__ == "__main__":
bot.run(DISCORD_TOKEN)