Files
vibe-bot/vibe_bot/main.py
T
2026-05-24 00:20:42 -04:00

803 lines
25 KiB
Python

"""Main Discord bot application."""
from __future__ import annotations
import base64
import logging
from io import BytesIO
from typing import TYPE_CHECKING
import discord
import requests
from discord import Message
from discord.ext import commands
from vibe_bot import llama_wrapper, tts
from vibe_bot.config import (
CHAT_ENDPOINT,
CHAT_ENDPOINT_KEY,
CHAT_MODEL,
DISCORD_TOKEN,
IMAGE_EDIT_ENDPOINT,
IMAGE_EDIT_ENDPOINT_KEY,
IMAGE_EDIT_MODEL,
IMAGE_GEN_ENDPOINT,
IMAGE_GEN_ENDPOINT_KEY,
IMAGE_GEN_MODEL,
MAX_COMPLETION_TOKENS,
TTS_MODEL_PATH,
TTS_SPEED,
TTS_VOICE,
TTS_VOICES_PATH,
)
from vibe_bot.database import CustomBotManager, get_database
if TYPE_CHECKING:
from discord.ext.commands import Bot
from discord.ext.commands import Context as CommandsContext
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# Initialize the bot
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents)
# Initialize TTS engine
tts_engine: tts.TTSEngine | None = None
try:
tts_engine = tts.TTSEngine(TTS_MODEL_PATH, TTS_VOICES_PATH)
logger.info("TTS engine initialized successfully")
except Exception:
logger.exception("Failed to initialize TTS engine")
logger.info(
"Make sure kokoro-v1.0.onnx and voices-v1.0.bin are in the project directory",
)
# Name and personality validation constants
MIN_BOT_NAME_LENGTH = 2
MAX_BOT_NAME_LENGTH = 50
MIN_PERSONALITY_LENGTH = 10
@bot.event
async def on_ready() -> None:
"""Log when the bot is ready and logged in."""
logger.info("Bot is starting up...")
logger.info("Bot logged in as %s", bot.user)
@bot.command(name="custom-bot")
async def custom_bot(
ctx: CommandsContext[Bot],
bot_name: str,
*,
personality: str,
) -> None:
"""Create a custom bot with a name and personality.
Usage: !custom-bot <bot_name> <personality_description>
Example: !custom-bot alfred you are a proper british butler
"""
logger.info(
"Custom bot command initiated by %s: name=%r, personality length=%d",
ctx.author.name,
bot_name,
len(personality),
)
# Validate bot name
name_length = 0 if not bot_name else len(bot_name)
if (
not bot_name
or name_length < MIN_BOT_NAME_LENGTH
or name_length > MAX_BOT_NAME_LENGTH
):
logger.warning(
"Invalid bot name from %s: %r (length: %d)",
ctx.author.name,
bot_name,
name_length,
)
await ctx.send("Invalid bot name. Name must be between 2 and 50 characters.")
return
logger.info("Bot name validation passed for %r", bot_name)
# Validate personality
personality_length = 0 if not personality else len(personality)
if not personality or personality_length < MIN_PERSONALITY_LENGTH:
logger.warning(
"Invalid personality from %s: length=%d",
ctx.author.name,
personality_length,
)
await ctx.send(
"Invalid personality. Description must be at least 10 characters.",
)
return
logger.info("Personality validation passed for bot %r", bot_name)
# Create custom bot manager
logger.info("Initializing CustomBotManager for user %s", ctx.author.name)
custom_bot_manager = CustomBotManager()
# Create the custom bot
logger.info(
"Attempting to create custom bot %r for user %s",
bot_name,
ctx.author.name,
)
success = custom_bot_manager.create_custom_bot(
bot_name=bot_name,
system_prompt=personality,
created_by=str(ctx.author.id),
)
if success:
logger.info(
"Successfully created custom bot %r for user %s",
bot_name,
ctx.author.name,
)
await ctx.send(
f"Custom bot **'{bot_name}'** has been created "
f"with personality: *{personality}*",
)
await ctx.send(
f"\nYou can now use this bot with: " f"`!{bot_name} <your message>`",
)
else:
logger.warning(
"Failed to create custom bot %r for user %s",
bot_name,
ctx.author.name,
)
await ctx.send("Failed to create custom bot. It may already exist.")
@bot.command(name="list-custom-bots")
async def list_custom_bots(ctx: CommandsContext[Bot]) -> None:
"""List all custom bots available in the server."""
logger.info("Listing custom bots requested by %s", ctx.author.name)
# Create custom bot manager
logger.info("Initializing CustomBotManager to list custom bots")
custom_bot_manager = CustomBotManager()
logger.info("Fetching list of custom bots from database")
bots = custom_bot_manager.list_custom_bots()
if not bots:
logger.info("No custom bots found for user %s", ctx.author.name)
await ctx.send(
"No custom bots have been created yet. "
"Use `!custom-bot <name> <personality>` to create one.",
)
return
logger.info(
"Found %d custom bots, displaying top 10 for %s",
len(bots),
ctx.author.name,
)
bot_list = "Available Custom Bots:\n\n"
for name, _prompt, _creator in bots:
bot_list += f"* {name}\n"
logger.info("Sending bot list response to %s", ctx.author.name)
await ctx.send(bot_list)
@bot.command(name="delete-custom-bot")
async def delete_custom_bot(ctx: CommandsContext[Bot], bot_name: str) -> None:
"""Delete a custom bot (only the creator can delete).
Usage: !delete-custom-bot <bot_name>
"""
logger.info(
"Delete custom bot command initiated by %s: bot_name=%r",
ctx.author.name,
bot_name,
)
# Create custom bot manager
logger.info("Initializing CustomBotManager for delete operation")
custom_bot_manager = CustomBotManager()
# Get bot info
logger.info("Looking up custom bot %r in database", bot_name)
bot_info = custom_bot_manager.get_custom_bot(bot_name)
if not bot_info:
logger.warning(
"Custom bot %r not found by user %s",
bot_name,
ctx.author.name,
)
await ctx.send(f"Custom bot '{bot_name}' not found.")
return
logger.info(
"Custom bot %r found, owned by user %s",
bot_name,
bot_info[2],
)
# Check ownership
if bot_info[2] != str(ctx.author.id):
logger.warning(
"User %s attempted to delete bot %r they don't own",
ctx.author.name,
bot_name,
)
await ctx.send("You can only delete your own custom bots.")
return
logger.info(
"User %s is authorized to delete bot %r",
ctx.author.name,
bot_name,
)
# Delete the bot
logger.info("Deleting custom bot %r from database", bot_name)
success = custom_bot_manager.delete_custom_bot(bot_name)
if success:
logger.info(
"Successfully deleted custom bot %r by user %s",
bot_name,
ctx.author.name,
)
await ctx.send(f"Custom bot '{bot_name}' has been deleted.")
else:
logger.warning(
"Failed to delete custom bot %r by user %s",
bot_name,
ctx.author.name,
)
await ctx.send("Failed to delete custom bot.")
# Handle custom bot commands
@bot.event
async def on_message(message: Message) -> None:
"""Handle incoming messages for custom bot command detection."""
# Skip bot messages
if message.author == bot.user:
return
message_author = message.author.name
message_content = message.content.lower()
logger.debug(
"Processing message from %s: %r...",
message_author,
message_content[:50],
)
ctx = await bot.get_context(message)
logger.info("Initializing CustomBotManager to check for custom bot commands")
custom_bot_manager = CustomBotManager()
logger.info("Fetching list of custom bots to check for matching commands")
custom_bots = custom_bot_manager.list_custom_bots()
logger.info("Checking %d custom bots for command match", len(custom_bots))
for bot_name, system_prompt, _ in custom_bots:
# Check if message starts with the custom bot name followed by a space
if message_content.startswith(f"!{bot_name} "):
logger.info(
"Custom bot command detected: %r triggered by %s",
bot_name,
message.author.name,
)
# Extract the actual message (remove the bot name prefix)
user_message = message.content[len(f"!{bot_name} ") :]
logger.debug(
"Extracted user message for bot %r: %r...",
bot_name,
user_message[:50],
)
# Prepare the payload with custom personality
response_prefix = f"{bot_name} response"
logger.info("Sending request to OpenAI API for bot %r", bot_name)
await handle_chat(
ctx=ctx,
bot_name=bot_name,
message=user_message,
system_prompt=system_prompt,
response_prefix=response_prefix,
)
return
# If no custom bot matched, call the default event handler
await bot.process_commands(message)
@bot.command(name="speak")
async def speak(ctx: CommandsContext[Bot], *, message: str) -> None:
"""Have the bot speak the given text using Kokoro TTS, or have a custom bot speak.
Usage: !speak <text> - plain text to speech
Usage: !speak <bot_name> <text> - have a custom bot respond and speak
Example: !speak hello world
Example: !speak alfred what time is it
"""
if tts_engine is None:
await ctx.send(
"TTS engine not initialized. "
"Make sure kokoro-v1.0.onnx and voices-v1.0.bin are present.",
)
return
if not message or not message.strip():
await ctx.send("Please provide text to speak.")
return
custom_bot_manager = CustomBotManager()
custom_bots = custom_bot_manager.list_custom_bots()
bot_names = [b[0] for b in custom_bots]
first_word = message.split(maxsplit=1)[0] if message.split() else ""
if first_word in bot_names:
await _speak_with_bot(ctx, first_word, message, tts_engine, custom_bot_manager)
else:
await _speak_plain(ctx, message, tts_engine)
async def _speak_with_bot(
ctx: CommandsContext[Bot],
bot_name: str,
message: str,
engine: tts.TTSEngine,
custom_bot_manager: CustomBotManager,
) -> None:
"""Handle speak command for a custom bot."""
text_to_speak = message[len(bot_name) :].lstrip()
if not text_to_speak:
await ctx.send("Please provide text for the bot to respond to.")
return
await ctx.send(f"**{bot_name}** is thinking...")
bot_info = custom_bot_manager.get_custom_bot(bot_name)
if not bot_info:
await ctx.send(f"Custom bot '{bot_name}' not found.")
return
_, system_prompt, _, _ = bot_info
system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences."
try:
db = get_database()
context = db.get_conversation_context(
user_id=str(ctx.author.id),
current_message=text_to_speak,
max_context=5,
)
prompts = [{"role": "user", "content": text_to_speak}]
if context:
prompts = context + prompts
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=system_prompt_edit,
prompts=prompts,
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
if not bot_response:
await ctx.send(f"**{bot_name}** failed to generate a response.")
return
db.add_message(
message_id=f"{ctx.message.id}",
user_id=str(ctx.author.id),
username=ctx.author.name,
content=f"User: {text_to_speak}",
bot_name=bot_name,
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
if ctx.bot.user is not None:
db.add_message(
message_id=f"{ctx.message.id}_response",
user_id=str(ctx.bot.user.id),
username=ctx.bot.user.name,
content=bot_response,
bot_name=bot_name,
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
await ctx.send(f"Generating speech for **{bot_name}**...")
audio_buffer = engine.generate_audio(
bot_response,
voice=TTS_VOICE,
speed=TTS_SPEED,
)
audio_file = discord.File(audio_buffer, filename="speech.mp3")
await ctx.send(file=audio_file)
except Exception:
logger.exception(
"Error in speak command with bot %r",
bot_name,
)
await ctx.send("Error generating speech.")
async def _speak_plain(
ctx: CommandsContext[Bot],
message: str,
engine: tts.TTSEngine,
) -> None:
"""Handle speak command for plain text."""
try:
await ctx.send("Generating speech...")
audio_buffer = engine.generate_audio(
message,
voice=TTS_VOICE,
speed=TTS_SPEED,
)
audio_file = discord.File(audio_buffer, filename="speech.mp3")
await ctx.send(file=audio_file)
except Exception:
logger.exception("Error in speak command")
await ctx.send("Error generating speech.")
@bot.command(name="doodlebob")
async def doodlebob(ctx: CommandsContext[Bot], *, message: str) -> None:
"""Convert a message into an image using Doodlebob."""
logger.info(
"Doodlebob command triggered by %s: %s",
ctx.author.name,
message[:100],
)
await ctx.send(f"**Doodlebob erasing {message[:100]}...**")
system_prompt = (
"Given the following message, convert it to a detailed image generation "
"prompt that will be passed directly into an image generation model. "
"If told to generate an image of yourself, generate a picture of a rat. "
"If told to generate a picture of 'me', 'myself', or some other self "
"reference, generate a picture of a rat. Only respond with a valid image "
"generation prompt, do not affirm the user or respond to the user's questions."
)
# Wait for the generated image prompt
image_prompt = llama_wrapper.chat_completion_instruct(
system_prompt=system_prompt,
user_prompt=message,
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
# If the string is empty we had an error
if image_prompt == "":
logger.warning("No image prompt supplied. Check for errors.")
return
# Alert the user we're generating the image
await ctx.send(f"**Doodlebob calling drone strike on {image_prompt[:100]}...**")
image_b64 = llama_wrapper.image_generation(
prompt=image_prompt,
openai_url=IMAGE_GEN_ENDPOINT,
openai_api_key=IMAGE_GEN_ENDPOINT_KEY,
model=IMAGE_GEN_MODEL,
)
if not image_b64:
logger.warning("Image generation returned empty response.")
await ctx.send("Failed to generate image. The server may be busy.")
return
try:
edited_image_data = BytesIO(base64.b64decode(image_b64))
send_img = discord.File(edited_image_data, filename="image.png")
await ctx.send(file=send_img)
except Exception:
logger.exception("Failed to decode image data")
await ctx.send("Failed to process the generated image.")
@bot.command(name="retcon")
async def retcon(ctx: CommandsContext[Bot], *, message: str) -> None:
"""Edit an attached image based on a text prompt."""
image_data_list: list[BytesIO] = []
for discord_image in ctx.message.attachments:
image_url = discord_image.url
try:
response = requests.get(image_url, timeout=30)
image_data = response.content
except requests.RequestException as e:
logger.warning("Failed to download image from %s: %s", image_url, e)
continue
image_bytestream = BytesIO(image_data)
image_data_list.append(image_bytestream)
await ctx.send(f"**Rewriting history to match {message[:100]}...**")
image_b64 = llama_wrapper.image_edit(
image=image_data_list,
prompt=message,
openai_url=IMAGE_EDIT_ENDPOINT,
openai_api_key=IMAGE_EDIT_ENDPOINT_KEY,
model=IMAGE_EDIT_MODEL,
)
# Save the image to a file
edited_image_data = BytesIO(base64.b64decode(image_b64))
send_img = discord.File(edited_image_data, filename="image.png")
await ctx.send(file=send_img)
@bot.command(name="history")
async def history(ctx: CommandsContext[Bot], bot_name: str) -> None:
"""View the chat history of a custom bot.
Usage: !history <bot_name>
"""
logger.info(
"History command triggered by %s for bot %r",
ctx.author.name,
bot_name,
)
custom_bot_manager = CustomBotManager()
bot_info = custom_bot_manager.get_custom_bot(bot_name)
if not bot_info:
await ctx.send(f"Custom bot '{bot_name}' not found.")
return
db = get_database()
history = db.get_bot_history(bot_name=bot_name, limit=20)
if not history:
await ctx.send(f"No chat history found for **{bot_name}**. ")
return
history.reverse()
formatted_history: list[str] = []
for user_msg, bot_resp in history:
formatted_history.append(user_msg)
formatted_history.append(f"{bot_name}: {bot_resp}")
header = f"Chat History for **{bot_name}**:\n\n"
full_text = header + "\n---\n".join(formatted_history)
chunk_size = 1900
chunks: list[str] = []
current_chunk = full_text
while current_chunk:
if len(current_chunk) <= chunk_size:
chunks.append(current_chunk)
break
split_pos = current_chunk.rfind("\n", 0, chunk_size)
if split_pos == -1:
split_pos = chunk_size
chunks.append(current_chunk[:split_pos])
current_chunk = current_chunk[split_pos:].lstrip("\n")
for chunk in chunks:
await ctx.send(chunk)
@bot.command(name="talkforme")
async def talkforme(ctx: CommandsContext[Bot], *, message: str) -> None:
"""Have two bots talk to each other about a topic.
Usage: !talkforme bot1 bot2 4 some conversation topic
"""
talk_limit = 20
MIN_TALKFORME_PARTS = 4
parts = message.split(" ", maxsplit=MIN_TALKFORME_PARTS - 1)
if len(parts) < MIN_TALKFORME_PARTS:
await ctx.send("Usage: !talkforme bot1 bot2 <number> <topic>")
return
bot1_name = parts[0]
bot2_name = parts[1]
limit = parts[2]
topic_list = parts[3:]
topic = " ".join(topic_list)
custom_bot_manager = CustomBotManager()
bot1 = custom_bot_manager.get_custom_bot(bot1_name)
if not bot1:
await ctx.send(f"{bot1_name} is not a real bot...")
return
_, bot1_prompt, _, _ = bot1
bot2 = custom_bot_manager.get_custom_bot(bot2_name)
if not bot2:
await ctx.send(f"{bot2_name} is not a real bot...")
return
_, bot2_prompt, _, _ = bot2
await ctx.send(
f"{bot1_name} is going to talk to {bot2_name} "
f'about "{topic[:50]}" for {limit} replies.',
)
bot_list = [(bot1_name, bot1_prompt), (bot2_name, bot2_prompt)]
try:
message_limit = int(limit)
except ValueError:
await ctx.send("Message limit must be an integer.")
return
def flip_counter(counter: int) -> int:
"""Flip between 0 and 1."""
return 1 if counter == 0 else 0
message_counter = 0
bot_counter = 0
current_bot = bot_list[bot_counter]
prompt_histories: list[list[dict[str, str]]] = [
[{"role": "user", "content": topic}],
[{"role": "assistant", "content": topic}],
]
first_bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=(
current_bot[1] + f"\nKeep your responses under 2-3 sentences. "
f"You are talking to {current_bot[flip_counter(bot_counter)][0]}"
),
prompts=prompt_histories[bot_counter],
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
await ctx.send(f"## {current_bot[0]}\n{first_bot_response}")
prompt_histories[0].append({"role": "assistant", "content": first_bot_response})
prompt_histories[1].append({"role": "user", "content": first_bot_response})
bot_counter = flip_counter(counter=bot_counter)
while message_counter < min(message_limit, talk_limit):
current_bot = bot_list[bot_counter]
logger.info("Current bot is %s", current_bot[0])
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=(
current_bot[1] + f"\nKeep your responses under 2-3 sentences. "
f"You are talking to {current_bot[flip_counter(bot_counter)][0]}"
),
prompts=prompt_histories[bot_counter],
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
message_counter += 1
prompt_histories[bot_counter].append(
{"role": "assistant", "content": bot_response},
)
prompt_histories[flip_counter(bot_counter)].append(
{"role": "user", "content": bot_response},
)
await ctx.send(f"## {current_bot[0]}")
while bot_response:
send_chunk = bot_response[:1000]
bot_response = bot_response[1000:]
await ctx.send(send_chunk)
bot_counter = flip_counter(counter=bot_counter)
logger.info("Message counter is %d/%s", message_counter, limit)
async def handle_chat(
ctx: CommandsContext[Bot],
*,
bot_name: str,
message: str,
system_prompt: str,
response_prefix: str,
) -> None:
"""Handle chat completion for a custom bot command.
Args:
ctx: The Discord command context.
bot_name: The name of the custom bot.
message: The user message to process.
system_prompt: The system prompt for the bot.
response_prefix: The prefix for the response message.
"""
await ctx.send(f"{bot_name} is searching its databanks for {message[:50]}...")
# Get database instance
db = get_database()
# Get conversation context using RAG
context = db.get_conversation_context(
user_id=str(ctx.author.id),
current_message=message,
max_context=5,
)
prompts = [{"role": "user", "content": message}]
if context:
prompts = context + prompts
logger.info("Chat prompts: %s", prompts)
system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences."
try:
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=system_prompt_edit,
prompts=prompts,
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
# Store both user message and bot response in the database
db.add_message(
message_id=f"{ctx.message.id}",
user_id=str(ctx.author.id),
username=ctx.author.name,
content=f"User: {message}",
bot_name=bot_name,
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
if ctx.bot.user is not None:
db.add_message(
message_id=f"{ctx.message.id}_response",
user_id=str(ctx.bot.user.id),
username=ctx.bot.user.name,
content=bot_response,
bot_name=bot_name,
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
# Send the response back to the chat
await ctx.send(response_prefix)
while bot_response:
send_chunk = bot_response[:1000]
bot_response = bot_response[1000:]
await ctx.send(send_chunk)
except Exception:
logger.exception("Error in handle_chat")
await ctx.send("An error occurred while processing your request.")
# Run the bot
if __name__ == "__main__":
bot.run(DISCORD_TOKEN)