Files
vibe-bot/vibe_bot/main.py
T
2026-05-22 14:35:49 -04:00

567 lines
19 KiB
Python

import discord
from discord.ext import commands
import os
import base64
import traceback
from io import BytesIO
from openai import OpenAI
import logging
from database import get_database, CustomBotManager # type: ignore
from config import ( # type: ignore
CHAT_ENDPOINT_KEY,
DISCORD_TOKEN,
CHAT_ENDPOINT,
CHAT_MODEL,
IMAGE_EDIT_ENDPOINT_KEY,
IMAGE_GEN_ENDPOINT,
IMAGE_EDIT_ENDPOINT,
MAX_COMPLETION_TOKENS,
TTS_MODEL_PATH,
TTS_VOICES_PATH,
TTS_VOICE,
TTS_SPEED,
)
import tts # type: ignore
import llama_wrapper # type: ignore
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Initialize the bot
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents)
# Initialize TTS engine
try:
tts_engine = tts.TTSEngine(TTS_MODEL_PATH, TTS_VOICES_PATH)
logger.info("TTS engine initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize TTS engine: {e}")
logger.info("Make sure kokoro-v1.0.onnx and voices-v1.0.bin are in the project directory")
tts_engine = None
@bot.event
async def on_ready():
logger.info("Bot is starting up...")
print(f"Bot logged in as {bot.user}")
logger.info(f"Bot logged in as {bot.user}")
@bot.command(name="custom-bot") # type: ignore
async def custom_bot(ctx, bot_name: str, *, personality: str):
"""Create a custom bot with a name and personality
Usage: !custom-bot <bot_name> <personality_description>
Example: !custom-bot alfred you are a proper british butler
"""
logger.info(
f"Custom bot command initiated by {ctx.author.name}: name='{bot_name}', personality length={len(personality)}"
)
# Validate bot name
if not bot_name or len(bot_name) < 2 or len(bot_name) > 50:
logger.warning(
f"Invalid bot name from {ctx.author.name}: '{bot_name}' (length: {len(bot_name) if bot_name else 0})"
)
await ctx.send("❌ Invalid bot name. Name must be between 2 and 50 characters.")
return
logger.info(f"Bot name validation passed for '{bot_name}'")
# Validate personality
if not personality or len(personality) < 10:
logger.warning(
f"Invalid personality from {ctx.author.name}: length={len(personality) if personality else 0}"
)
await ctx.send(
"❌ Invalid personality. Description must be at least 10 characters."
)
return
logger.info(f"Personality validation passed for bot '{bot_name}'")
# Create custom bot manager
logger.info(f"Initializing CustomBotManager for user {ctx.author.name}")
custom_bot_manager = CustomBotManager()
# Create the custom bot
logger.info(
f"Attempting to create custom bot '{bot_name}' for user {ctx.author.name}"
)
success = custom_bot_manager.create_custom_bot(
bot_name=bot_name, system_prompt=personality, created_by=str(ctx.author.id)
)
if success:
logger.info(
f"Successfully created custom bot '{bot_name}' for user {ctx.author.name}"
)
await ctx.send(
f"✅ Custom bot **'{bot_name}'** has been created with personality: *{personality}*"
)
await ctx.send(f"\nYou can now use this bot with: `!{bot_name} <your message>`")
else:
logger.warning(
f"Failed to create custom bot '{bot_name}' for user {ctx.author.name}"
)
await ctx.send("❌ Failed to create custom bot. It may already exist.")
@bot.command(name="list-custom-bots")
async def list_custom_bots(ctx):
"""List all custom bots available in the server"""
logger.info(f"Listing custom bots requested by {ctx.author.name}")
# Create custom bot manager
logger.info("Initializing CustomBotManager to list custom bots")
custom_bot_manager = CustomBotManager()
logger.info("Fetching list of custom bots from database")
bots = custom_bot_manager.list_custom_bots()
if not bots:
logger.info(f"No custom bots found for user {ctx.author.name}")
await ctx.send(
"No custom bots have been created yet. Use `!custom-bot <name> <personality>` to create one."
)
return
logger.info(
f"Found {len(bots)} custom bots, displaying top 10 for {ctx.author.name}"
)
bot_list = "🤖 **Available Custom Bots**:\n\n"
for name, prompt, creator in bots:
bot_list += f"• **{name}**\n"
logger.info(f"Sending bot list response to {ctx.author.name}")
await ctx.send(bot_list)
@bot.command(name="delete-custom-bot") # type: ignore
async def delete_custom_bot(ctx, bot_name: str):
"""Delete a custom bot (only the creator can delete)
Usage: !delete-custom-bot <bot_name>
"""
logger.info(
f"Delete custom bot command initiated by {ctx.author.name}: bot_name='{bot_name}'"
)
# Create custom bot manager
logger.info("Initializing CustomBotManager for delete operation")
custom_bot_manager = CustomBotManager()
# Get bot info
logger.info(f"Looking up custom bot '{bot_name}' in database")
bot_info = custom_bot_manager.get_custom_bot(bot_name)
if not bot_info:
logger.warning(f"Custom bot '{bot_name}' not found by user {ctx.author.name}")
await ctx.send(f"❌ Custom bot '{bot_name}' not found.")
return
logger.info(f"Custom bot '{bot_name}' found, owned by user {bot_info[2]}")
# Check ownership
if bot_info[2] != str(ctx.author.id):
logger.warning(
f"User {ctx.author.name} attempted to delete bot '{bot_name}' they don't own"
)
await ctx.send("❌ You can only delete your own custom bots.")
return
logger.info(f"User {ctx.author.name} is authorized to delete bot '{bot_name}'")
# Delete the bot
logger.info(f"Deleting custom bot '{bot_name}' from database")
success = custom_bot_manager.delete_custom_bot(bot_name)
if success:
logger.info(
f"Successfully deleted custom bot '{bot_name}' by user {ctx.author.name}"
)
await ctx.send(f"✅ Custom bot '{bot_name}' has been deleted.")
else:
logger.warning(
f"Failed to delete custom bot '{bot_name}' by user {ctx.author.name}"
)
await ctx.send("❌ Failed to delete custom bot.")
# Handle custom bot commands
@bot.event
async def on_message(message):
# Skip bot messages
if message.author == bot.user:
return
message_author = message.author.name
message_content = message.content.lower()
logger.debug(
f"Processing message from {message_author}: '{message_content[:50]}...'"
)
ctx = await bot.get_context(message)
logger.info("Initializing CustomBotManager to check for custom bot commands")
custom_bot_manager = CustomBotManager()
logger.info("Fetching list of custom bots to check for matching commands")
custom_bots = custom_bot_manager.list_custom_bots()
logger.info(f"Checking {len(custom_bots)} custom bots for command match")
for bot_name, system_prompt, _ in custom_bots:
# Check if message starts with the custom bot name followed by a space
if message_content.startswith(f"!{bot_name} "):
logger.info(
f"Custom bot command detected: '{bot_name}' triggered by {message.author.name}"
)
# Extract the actual message (remove the bot name prefix)
user_message = message.content[len(f"!{bot_name} ") :]
logger.debug(
f"Extracted user message for bot '{bot_name}': '{user_message[:50]}...'"
)
# Prepare the payload with custom personality
response_prefix = f"**{bot_name} response**"
logger.info(f"Sending request to OpenAI API for bot '{bot_name}'")
await handle_chat(
ctx=ctx,
bot_name=bot_name,
message=user_message,
system_prompt=system_prompt,
response_prefix=response_prefix,
)
return
# If no custom bot matched, call the default event handler
await bot.process_commands(message)
@bot.command(name="speak")
async def speak(ctx, *, message: str):
"""Have the bot speak the given text using Kokoro TTS, or have a custom bot speak
Usage: !speak <text> - plain text to speech
Usage: !speak <bot_name> <text> - have a custom bot respond and speak
Example: !speak hello world
Example: !speak alfred what time is it
"""
if tts_engine is None:
await ctx.send("❌ TTS engine not initialized. Make sure kokoro-v1.0.onnx and voices-v1.0.bin are present.")
return
if not message or len(message.strip()) == 0:
await ctx.send("❌ Please provide text to speak.")
return
custom_bot_manager = CustomBotManager()
custom_bots = custom_bot_manager.list_custom_bots()
bot_names = [b[0] for b in custom_bots]
first_word = message.split()[0] if message.split() else ""
if first_word in bot_names:
bot_name = first_word
text_to_speak = message[len(bot_name):].lstrip()
if not text_to_speak:
await ctx.send("❌ Please provide text for the bot to respond to.")
return
await ctx.send(f"🔊 **{bot_name}** is thinking...")
bot_info = custom_bot_manager.get_custom_bot(bot_name)
if not bot_info:
await ctx.send(f"❌ Custom bot '{bot_name}' not found.")
return
_, system_prompt, _, _ = bot_info
system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences."
try:
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=system_prompt_edit,
prompts=[{"role": "user", "content": text_to_speak}],
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
if not bot_response:
await ctx.send(f"❌ **{bot_name}** failed to generate a response.")
return
await ctx.send(f"🔊 Generating speech for **{bot_name}**...")
audio_buffer = tts_engine.generate_audio(bot_response, voice=TTS_VOICE, speed=TTS_SPEED)
audio_file = discord.File(audio_buffer, filename="speech.mp3")
await ctx.send(file=audio_file)
except Exception as e:
logger.error(f"Error in !speak command with bot '{bot_name}': {traceback.format_exc()}")
await ctx.send(f"❌ Error generating speech: {str(e)}")
else:
if not message or len(message.strip()) == 0:
await ctx.send("❌ Please provide text to speak.")
return
try:
await ctx.send("🔊 Generating speech...")
audio_buffer = tts_engine.generate_audio(message, voice=TTS_VOICE, speed=TTS_SPEED)
audio_file = discord.File(audio_buffer, filename="speech.mp3")
await ctx.send(file=audio_file)
except Exception as e:
logger.error(f"Error in !speak command: {e}")
await ctx.send(f"❌ Error generating speech: {str(e)}")
@bot.command(name="doodlebob")
async def doodlebob(ctx, *, message: str):
# add some logging
logger.info(f"Doodlebob command triggered by {ctx.author.name}: {message[:100]}")
await ctx.send(f"**Doodlebob erasing {message[:100]}...**")
system_prompt = (
"Given the following message, convert it to a detailed image generation prompt that will be passed directly into an image generation model."
"If told to generate an image of yourself, generate a picture of a rat. If told to generate a picture of 'me', 'myself', or some other self"
" reference, generate a picture of a rat. Only respond with a valid image generation prompt, do not affirm the user or respond to the user's"
" questions."
)
# Wait for the generated image prompt
image_prompt = llama_wrapper.chat_completion_instruct(
system_prompt=system_prompt,
user_prompt=message,
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
# If the string is empty we had an error
if image_prompt == "":
print("No image prompt supplied. Check for errors.")
return
# Alert the user we're generating the image
await ctx.send(f"**Doodlebob calling drone strike on {image_prompt[:100]}...**")
image_b64 = llama_wrapper.image_generation(
prompt=image_prompt,
openai_url=IMAGE_EDIT_ENDPOINT,
openai_api_key=IMAGE_EDIT_ENDPOINT_KEY,
)
# Save the image to a file
edited_image_data = BytesIO(base64.b64decode(image_b64))
send_img = discord.File(edited_image_data, filename="image.png")
await ctx.send(file=send_img)
@bot.command(name="retcon")
async def retcon(ctx, *, message: str):
image_data_list = []
for discord_image in ctx.message.attachments:
image_url = discord_image.url
image_data = requests.get(image_url).content
image_bytestream = BytesIO(image_data)
image_data_list.append(image_bytestream)
await ctx.send(f"**Rewriting history to match {message[:100]}...**")
image_b64 = llama_wrapper.image_edit(
image=image_data_list,
prompt=message,
openai_url=IMAGE_EDIT_ENDPOINT,
openai_api_key=IMAGE_EDIT_ENDPOINT_KEY,
)
# Save the image to a file
edited_image_data = BytesIO(base64.b64decode(image_b64))
send_img = discord.File(edited_image_data, filename="image.png")
await ctx.send(file=send_img)
@bot.command(name="talkforme")
async def talkforme(ctx, *, message: str):
"""Have two bots talk to each other about a topic
Usage: !talkforme bot1 bot2 4 some conversation topic
"""
TALK_LIMIT = 20
bot1_name, bot2_name, limit, topic_list = (
message.split(" ")[0],
message.split(" ")[1],
message.split(" ")[2],
message.split(" ")[3:],
)
topic = " ".join(topic_list)
custom_bot_manager = CustomBotManager()
bot1 = custom_bot_manager.get_custom_bot(bot1_name)
if not bot1:
await ctx.send(f"{bot1_name} is not a real bot...")
return
else:
_, bot1_prompt, _, _ = bot1
bot2 = custom_bot_manager.get_custom_bot(bot2_name)
if not bot2:
await ctx.send(f"{bot2_name} is not a real bot...")
return
else:
_, bot2_prompt, _, _ = bot2
await ctx.send(
f'{bot1_name} is going to talk to {bot2_name} about "{topic[:50]}" for {limit} replies.'
)
bot_list = [(bot1_name, bot1_prompt), (bot2_name, bot2_prompt)]
message_limit = int(limit)
def flip_counter(counter: int):
if counter == 0:
return 1
else:
return 0
def flip_user(user: str):
if user == "user":
return "assistant"
else:
return "user"
message_counter = 0
bot_counter = 0
current_bot = bot_list[bot_counter]
prompt_histories = [
[{"role": "user", "content": topic}],
[{"role": "assistant", "content": topic}],
]
first_bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=current_bot[1]
+ f"\nKeep your responses under 2-3 sentences. You are talking to {current_bot[flip_counter(bot_counter)][0]}",
prompts=prompt_histories[bot_counter], # type: ignore
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
await ctx.send(f"## {current_bot[0]}\n{first_bot_response}")
prompt_histories[0].append({"role": "assistant", "content": first_bot_response})
prompt_histories[1].append({"role": "user", "content": first_bot_response})
bot_counter = flip_counter(counter=bot_counter)
while message_counter < min(message_limit, TALK_LIMIT):
current_bot = bot_list[bot_counter]
logger.info(f"Current bot is {current_bot}")
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=current_bot[1]
+ f"\nKeep your responses under 2-3 sentences. {current_bot[flip_counter(bot_counter)]}",
prompts=prompt_histories[bot_counter], # type: ignore
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
message_counter += 1
prompt_histories[bot_counter].append(
{"role": "assistant", "content": bot_response}
)
prompt_histories[flip_counter(bot_counter)].append(
{"role": "user", "content": bot_response}
)
await ctx.send(f"## {current_bot[0]}")
while bot_response:
send_chunk = bot_response[:1000]
bot_response = bot_response[1000:]
await ctx.send(send_chunk)
bot_counter = flip_counter(counter=bot_counter)
logger.info(f"Message counter is {message_counter}/{limit}")
async def handle_chat(
ctx, *, bot_name: str, message: str, system_prompt: str, response_prefix: str
):
await ctx.send(f"{bot_name} is searching its databanks for {message[:50]}...")
# Get database instance
db = get_database()
# Get conversation context using RAG
context = db.get_conversation_context(
user_id=str(ctx.author.id), current_message=message, max_context=5
)
prompts = [{"role": "user", "content": message}]
if context:
prompts = context + prompts
logger.info(prompts)
system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences."
try:
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=system_prompt_edit,
prompts=prompts, # type: ignore
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
# Store both user message and bot response in the database
db.add_message(
message_id=f"{ctx.message.id}",
user_id=str(ctx.author.id),
username=ctx.author.name,
content=f"User: {message}",
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
db.add_message(
message_id=f"{ctx.message.id}_response",
user_id=str(bot.user.id), # type: ignore
username=bot.user.name, # type: ignore
content=f"Bot: {bot_response}",
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
# Send the response back to the chat
await ctx.send(response_prefix)
while bot_response:
send_chunk = bot_response[:1000]
bot_response = bot_response[1000:]
await ctx.send(send_chunk)
except Exception as e:
await ctx.send(f"Error: {str(e)}")
# Run the bot
if __name__ == "__main__":
bot.run(DISCORD_TOKEN)