fix linting, formatting, and add tests

This commit is contained in:
2026-05-23 19:06:53 -04:00
parent 5e708b009c
commit 6ec9fbe85f
15 changed files with 2911 additions and 491 deletions
+354 -216
View File
@@ -1,33 +1,41 @@
import discord
from discord.ext import commands
import os
"""Main Discord bot application."""
from __future__ import annotations
import base64
import traceback
from io import BytesIO
from openai import OpenAI
import logging
from database import get_database, CustomBotManager # type: ignore
from config import ( # type: ignore
CHAT_ENDPOINT_KEY,
DISCORD_TOKEN,
from io import BytesIO
from typing import TYPE_CHECKING
import discord
import requests
from discord import Message
from discord.ext import commands
from vibe_bot import llama_wrapper, tts
from vibe_bot.config import (
CHAT_ENDPOINT,
CHAT_ENDPOINT_KEY,
CHAT_MODEL,
IMAGE_EDIT_ENDPOINT_KEY,
IMAGE_GEN_ENDPOINT,
DISCORD_TOKEN,
IMAGE_EDIT_ENDPOINT,
IMAGE_EDIT_ENDPOINT_KEY,
MAX_COMPLETION_TOKENS,
TTS_MODEL_PATH,
TTS_VOICES_PATH,
TTS_VOICE,
TTS_SPEED,
TTS_VOICE,
TTS_VOICES_PATH,
)
import tts # type: ignore
import llama_wrapper # type: ignore
import requests
from vibe_bot.database import CustomBotManager, get_database
if TYPE_CHECKING:
from discord.ext.commands import Bot
from discord.ext.commands import Context as CommandsContext
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@@ -37,86 +45,123 @@ intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents)
# Initialize TTS engine
tts_engine: tts.TTSEngine | None = None
try:
tts_engine = tts.TTSEngine(TTS_MODEL_PATH, TTS_VOICES_PATH)
logger.info("TTS engine initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize TTS engine: {e}")
logger.info("Make sure kokoro-v1.0.onnx and voices-v1.0.bin are in the project directory")
tts_engine = None
except Exception:
logger.exception("Failed to initialize TTS engine")
logger.info(
"Make sure kokoro-v1.0.onnx and voices-v1.0.bin are in the project directory",
)
# Name and personality validation constants
MIN_BOT_NAME_LENGTH = 2
MAX_BOT_NAME_LENGTH = 50
MIN_PERSONALITY_LENGTH = 10
@bot.event
async def on_ready():
async def on_ready() -> None:
"""Log when the bot is ready and logged in."""
logger.info("Bot is starting up...")
print(f"Bot logged in as {bot.user}")
logger.info(f"Bot logged in as {bot.user}")
logger.info("Bot logged in as %s", bot.user)
@bot.command(name="custom-bot") # type: ignore
async def custom_bot(ctx, bot_name: str, *, personality: str):
"""Create a custom bot with a name and personality
@bot.command(name="custom-bot")
async def custom_bot(
ctx: CommandsContext[Bot],
bot_name: str,
*,
personality: str,
) -> None:
"""Create a custom bot with a name and personality.
Usage: !custom-bot <bot_name> <personality_description>
Example: !custom-bot alfred you are a proper british butler
"""
logger.info(
f"Custom bot command initiated by {ctx.author.name}: name='{bot_name}', personality length={len(personality)}"
"Custom bot command initiated by %s: name=%r, personality length=%d",
ctx.author.name,
bot_name,
len(personality),
)
# Validate bot name
if not bot_name or len(bot_name) < 2 or len(bot_name) > 50:
name_length = 0 if not bot_name else len(bot_name)
if (
not bot_name
or name_length < MIN_BOT_NAME_LENGTH
or name_length > MAX_BOT_NAME_LENGTH
):
logger.warning(
f"Invalid bot name from {ctx.author.name}: '{bot_name}' (length: {len(bot_name) if bot_name else 0})"
"Invalid bot name from %s: %r (length: %d)",
ctx.author.name,
bot_name,
name_length,
)
await ctx.send("Invalid bot name. Name must be between 2 and 50 characters.")
await ctx.send("Invalid bot name. Name must be between 2 and 50 characters.")
return
logger.info(f"Bot name validation passed for '{bot_name}'")
logger.info("Bot name validation passed for %r", bot_name)
# Validate personality
if not personality or len(personality) < 10:
personality_length = 0 if not personality else len(personality)
if not personality or personality_length < MIN_PERSONALITY_LENGTH:
logger.warning(
f"Invalid personality from {ctx.author.name}: length={len(personality) if personality else 0}"
"Invalid personality from %s: length=%d",
ctx.author.name,
personality_length,
)
await ctx.send(
"Invalid personality. Description must be at least 10 characters."
"Invalid personality. Description must be at least 10 characters.",
)
return
logger.info(f"Personality validation passed for bot '{bot_name}'")
logger.info("Personality validation passed for bot %r", bot_name)
# Create custom bot manager
logger.info(f"Initializing CustomBotManager for user {ctx.author.name}")
logger.info("Initializing CustomBotManager for user %s", ctx.author.name)
custom_bot_manager = CustomBotManager()
# Create the custom bot
logger.info(
f"Attempting to create custom bot '{bot_name}' for user {ctx.author.name}"
"Attempting to create custom bot %r for user %s",
bot_name,
ctx.author.name,
)
success = custom_bot_manager.create_custom_bot(
bot_name=bot_name, system_prompt=personality, created_by=str(ctx.author.id)
bot_name=bot_name,
system_prompt=personality,
created_by=str(ctx.author.id),
)
if success:
logger.info(
f"Successfully created custom bot '{bot_name}' for user {ctx.author.name}"
"Successfully created custom bot %r for user %s",
bot_name,
ctx.author.name,
)
await ctx.send(
f"Custom bot **'{bot_name}'** has been created with personality: *{personality}*"
f"Custom bot **'{bot_name}'** has been created "
f"with personality: *{personality}*",
)
await ctx.send(
f"\nYou can now use this bot with: " f"`!{bot_name} <your message>`",
)
await ctx.send(f"\nYou can now use this bot with: `!{bot_name} <your message>`")
else:
logger.warning(
f"Failed to create custom bot '{bot_name}' for user {ctx.author.name}"
"Failed to create custom bot %r for user %s",
bot_name,
ctx.author.name,
)
await ctx.send("Failed to create custom bot. It may already exist.")
await ctx.send("Failed to create custom bot. It may already exist.")
@bot.command(name="list-custom-bots")
async def list_custom_bots(ctx):
"""List all custom bots available in the server"""
logger.info(f"Listing custom bots requested by {ctx.author.name}")
async def list_custom_bots(ctx: CommandsContext[Bot]) -> None:
"""List all custom bots available in the server."""
logger.info("Listing custom bots requested by %s", ctx.author.name)
# Create custom bot manager
logger.info("Initializing CustomBotManager to list custom bots")
@@ -126,31 +171,36 @@ async def list_custom_bots(ctx):
bots = custom_bot_manager.list_custom_bots()
if not bots:
logger.info(f"No custom bots found for user {ctx.author.name}")
logger.info("No custom bots found for user %s", ctx.author.name)
await ctx.send(
"No custom bots have been created yet. Use `!custom-bot <name> <personality>` to create one."
"No custom bots have been created yet. "
"Use `!custom-bot <name> <personality>` to create one.",
)
return
logger.info(
f"Found {len(bots)} custom bots, displaying top 10 for {ctx.author.name}"
"Found %d custom bots, displaying top 10 for %s",
len(bots),
ctx.author.name,
)
bot_list = "🤖 **Available Custom Bots**:\n\n"
for name, prompt, creator in bots:
bot_list += f"• **{name}**\n"
bot_list = "Available Custom Bots:\n\n"
for name, _prompt, _creator in bots:
bot_list += f"* {name}\n"
logger.info(f"Sending bot list response to {ctx.author.name}")
logger.info("Sending bot list response to %s", ctx.author.name)
await ctx.send(bot_list)
@bot.command(name="delete-custom-bot") # type: ignore
async def delete_custom_bot(ctx, bot_name: str):
"""Delete a custom bot (only the creator can delete)
@bot.command(name="delete-custom-bot")
async def delete_custom_bot(ctx: CommandsContext[Bot], bot_name: str) -> None:
"""Delete a custom bot (only the creator can delete).
Usage: !delete-custom-bot <bot_name>
"""
logger.info(
f"Delete custom bot command initiated by {ctx.author.name}: bot_name='{bot_name}'"
"Delete custom bot command initiated by %s: bot_name=%r",
ctx.author.name,
bot_name,
)
# Create custom bot manager
@@ -158,45 +208,64 @@ async def delete_custom_bot(ctx, bot_name: str):
custom_bot_manager = CustomBotManager()
# Get bot info
logger.info(f"Looking up custom bot '{bot_name}' in database")
logger.info("Looking up custom bot %r in database", bot_name)
bot_info = custom_bot_manager.get_custom_bot(bot_name)
if not bot_info:
logger.warning(f"Custom bot '{bot_name}' not found by user {ctx.author.name}")
await ctx.send(f"Custom bot '{bot_name}' not found.")
logger.warning(
"Custom bot %r not found by user %s",
bot_name,
ctx.author.name,
)
await ctx.send(f"Custom bot '{bot_name}' not found.")
return
logger.info(f"Custom bot '{bot_name}' found, owned by user {bot_info[2]}")
logger.info(
"Custom bot %r found, owned by user %s",
bot_name,
bot_info[2],
)
# Check ownership
if bot_info[2] != str(ctx.author.id):
logger.warning(
f"User {ctx.author.name} attempted to delete bot '{bot_name}' they don't own"
"User %s attempted to delete bot %r they don't own",
ctx.author.name,
bot_name,
)
await ctx.send("You can only delete your own custom bots.")
await ctx.send("You can only delete your own custom bots.")
return
logger.info(f"User {ctx.author.name} is authorized to delete bot '{bot_name}'")
logger.info(
"User %s is authorized to delete bot %r",
ctx.author.name,
bot_name,
)
# Delete the bot
logger.info(f"Deleting custom bot '{bot_name}' from database")
logger.info("Deleting custom bot %r from database", bot_name)
success = custom_bot_manager.delete_custom_bot(bot_name)
if success:
logger.info(
f"Successfully deleted custom bot '{bot_name}' by user {ctx.author.name}"
"Successfully deleted custom bot %r by user %s",
bot_name,
ctx.author.name,
)
await ctx.send(f"Custom bot '{bot_name}' has been deleted.")
await ctx.send(f"Custom bot '{bot_name}' has been deleted.")
else:
logger.warning(
f"Failed to delete custom bot '{bot_name}' by user {ctx.author.name}"
"Failed to delete custom bot %r by user %s",
bot_name,
ctx.author.name,
)
await ctx.send("Failed to delete custom bot.")
await ctx.send("Failed to delete custom bot.")
# Handle custom bot commands
@bot.event
async def on_message(message):
async def on_message(message: Message) -> None:
"""Handle incoming messages for custom bot command detection."""
# Skip bot messages
if message.author == bot.user:
return
@@ -205,7 +274,9 @@ async def on_message(message):
message_content = message.content.lower()
logger.debug(
f"Processing message from {message_author}: '{message_content[:50]}...'"
"Processing message from %s: %r...",
message_author,
message_content[:50],
)
ctx = await bot.get_context(message)
@@ -216,24 +287,28 @@ async def on_message(message):
logger.info("Fetching list of custom bots to check for matching commands")
custom_bots = custom_bot_manager.list_custom_bots()
logger.info(f"Checking {len(custom_bots)} custom bots for command match")
logger.info("Checking %d custom bots for command match", len(custom_bots))
for bot_name, system_prompt, _ in custom_bots:
# Check if message starts with the custom bot name followed by a space
if message_content.startswith(f"!{bot_name} "):
logger.info(
f"Custom bot command detected: '{bot_name}' triggered by {message.author.name}"
"Custom bot command detected: %r triggered by %s",
bot_name,
message.author.name,
)
# Extract the actual message (remove the bot name prefix)
user_message = message.content[len(f"!{bot_name} ") :]
logger.debug(
f"Extracted user message for bot '{bot_name}': '{user_message[:50]}...'"
"Extracted user message for bot %r: %r...",
bot_name,
user_message[:50],
)
# Prepare the payload with custom personality
response_prefix = f"**{bot_name} response**"
response_prefix = f"{bot_name} response"
logger.info(f"Sending request to OpenAI API for bot '{bot_name}'")
logger.info("Sending request to OpenAI API for bot %r", bot_name)
await handle_chat(
ctx=ctx,
bot_name=bot_name,
@@ -248,8 +323,8 @@ async def on_message(message):
@bot.command(name="speak")
async def speak(ctx, *, message: str):
"""Have the bot speak the given text using Kokoro TTS, or have a custom bot speak
async def speak(ctx: CommandsContext[Bot], *, message: str) -> None:
"""Have the bot speak the given text using Kokoro TTS, or have a custom bot speak.
Usage: !speak <text> - plain text to speech
Usage: !speak <bot_name> <text> - have a custom bot respond and speak
@@ -257,113 +332,149 @@ async def speak(ctx, *, message: str):
Example: !speak alfred what time is it
"""
if tts_engine is None:
await ctx.send("❌ TTS engine not initialized. Make sure kokoro-v1.0.onnx and voices-v1.0.bin are present.")
await ctx.send(
"TTS engine not initialized. "
"Make sure kokoro-v1.0.onnx and voices-v1.0.bin are present.",
)
return
if not message or len(message.strip()) == 0:
await ctx.send("Please provide text to speak.")
if not message or not message.strip():
await ctx.send("Please provide text to speak.")
return
custom_bot_manager = CustomBotManager()
custom_bots = custom_bot_manager.list_custom_bots()
bot_names = [b[0] for b in custom_bots]
first_word = message.split()[0] if message.split() else ""
first_word = message.split(maxsplit=1)[0] if message.split() else ""
if first_word in bot_names:
bot_name = first_word
text_to_speak = message[len(bot_name):].lstrip()
if not text_to_speak:
await ctx.send("❌ Please provide text for the bot to respond to.")
await _speak_with_bot(ctx, first_word, message, tts_engine, custom_bot_manager)
else:
await _speak_plain(ctx, message, tts_engine)
async def _speak_with_bot(
ctx: CommandsContext[Bot],
bot_name: str,
message: str,
engine: tts.TTSEngine,
custom_bot_manager: CustomBotManager,
) -> None:
"""Handle speak command for a custom bot."""
text_to_speak = message[len(bot_name) :].lstrip()
if not text_to_speak:
await ctx.send("Please provide text for the bot to respond to.")
return
await ctx.send(f"**{bot_name}** is thinking...")
bot_info = custom_bot_manager.get_custom_bot(bot_name)
if not bot_info:
await ctx.send(f"Custom bot '{bot_name}' not found.")
return
_, system_prompt, _, _ = bot_info
system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences."
try:
db = get_database()
context = db.get_conversation_context(
user_id=str(ctx.author.id),
current_message=text_to_speak,
max_context=5,
)
prompts = [{"role": "user", "content": text_to_speak}]
if context:
prompts = context + prompts
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=system_prompt_edit,
prompts=prompts,
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
if not bot_response:
await ctx.send(f"**{bot_name}** failed to generate a response.")
return
await ctx.send(f"🔊 **{bot_name}** is thinking...")
bot_info = custom_bot_manager.get_custom_bot(bot_name)
if not bot_info:
await ctx.send(f"❌ Custom bot '{bot_name}' not found.")
return
_, system_prompt, _, _ = bot_info
system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences."
try:
db = get_database()
context = db.get_conversation_context(
user_id=str(ctx.author.id), current_message=text_to_speak, max_context=5
)
prompts = [{"role": "user", "content": text_to_speak}]
if context:
prompts = context + prompts
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=system_prompt_edit,
prompts=prompts,
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
max_tokens=MAX_COMPLETION_TOKENS,
)
if not bot_response:
await ctx.send(f"❌ **{bot_name}** failed to generate a response.")
return
db.add_message(
message_id=f"{ctx.message.id}",
user_id=str(ctx.author.id),
username=ctx.author.name,
content=f"User: {text_to_speak}",
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
db.add_message(
message_id=f"{ctx.message.id}",
user_id=str(ctx.author.id),
username=ctx.author.name,
content=f"User: {text_to_speak}",
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
if ctx.bot.user is not None:
db.add_message(
message_id=f"{ctx.message.id}_response",
user_id=str(bot.user.id),
username=bot.user.name,
user_id=str(ctx.bot.user.id),
username=ctx.bot.user.name,
content=f"Bot: {bot_response}",
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
await ctx.send(f"🔊 Generating speech for **{bot_name}**...")
audio_buffer = tts_engine.generate_audio(bot_response, voice=TTS_VOICE, speed=TTS_SPEED)
await ctx.send(f"Generating speech for **{bot_name}**...")
audio_buffer = engine.generate_audio(
bot_response,
voice=TTS_VOICE,
speed=TTS_SPEED,
)
audio_file = discord.File(audio_buffer, filename="speech.mp3")
await ctx.send(file=audio_file)
except Exception as e:
logger.error(f"Error in !speak command with bot '{bot_name}': {traceback.format_exc()}")
await ctx.send(f"Error generating speech: {str(e)}")
else:
if not message or len(message.strip()) == 0:
await ctx.send("❌ Please provide text to speak.")
return
audio_file = discord.File(audio_buffer, filename="speech.mp3")
await ctx.send(file=audio_file)
except Exception:
logger.exception(
"Error in speak command with bot %r",
bot_name,
)
await ctx.send("Error generating speech.")
try:
await ctx.send("🔊 Generating speech...")
audio_buffer = tts_engine.generate_audio(message, voice=TTS_VOICE, speed=TTS_SPEED)
audio_file = discord.File(audio_buffer, filename="speech.mp3")
await ctx.send(file=audio_file)
except Exception as e:
logger.error(f"Error in !speak command: {e}")
await ctx.send(f"❌ Error generating speech: {str(e)}")
async def _speak_plain(
ctx: CommandsContext[Bot],
message: str,
engine: tts.TTSEngine,
) -> None:
"""Handle speak command for plain text."""
try:
await ctx.send("Generating speech...")
audio_buffer = engine.generate_audio(
message,
voice=TTS_VOICE,
speed=TTS_SPEED,
)
audio_file = discord.File(audio_buffer, filename="speech.mp3")
await ctx.send(file=audio_file)
except Exception:
logger.exception("Error in speak command")
await ctx.send("Error generating speech.")
@bot.command(name="doodlebob")
async def doodlebob(ctx, *, message: str):
# add some logging
logger.info(f"Doodlebob command triggered by {ctx.author.name}: {message[:100]}")
async def doodlebob(ctx: CommandsContext[Bot], *, message: str) -> None:
"""Convert a message into an image using Doodlebob."""
logger.info(
"Doodlebob command triggered by %s: %s",
ctx.author.name,
message[:100],
)
await ctx.send(f"**Doodlebob erasing {message[:100]}...**")
system_prompt = (
"Given the following message, convert it to a detailed image generation prompt that will be passed directly into an image generation model."
"If told to generate an image of yourself, generate a picture of a rat. If told to generate a picture of 'me', 'myself', or some other self"
" reference, generate a picture of a rat. Only respond with a valid image generation prompt, do not affirm the user or respond to the user's"
" questions."
"Given the following message, convert it to a detailed image generation "
"prompt that will be passed directly into an image generation model. "
"If told to generate an image of yourself, generate a picture of a rat. "
"If told to generate a picture of 'me', 'myself', or some other self "
"reference, generate a picture of a rat. Only respond with a valid image "
"generation prompt, do not affirm the user or respond to the user's questions."
)
# Wait for the generated image prompt
@@ -378,7 +489,7 @@ async def doodlebob(ctx, *, message: str):
# If the string is empty we had an error
if image_prompt == "":
print("No image prompt supplied. Check for errors.")
logger.warning("No image prompt supplied. Check for errors.")
return
# Alert the user we're generating the image
@@ -397,11 +508,17 @@ async def doodlebob(ctx, *, message: str):
@bot.command(name="retcon")
async def retcon(ctx, *, message: str):
image_data_list = []
async def retcon(ctx: CommandsContext[Bot], *, message: str) -> None:
"""Edit an attached image based on a text prompt."""
image_data_list: list[BytesIO] = []
for discord_image in ctx.message.attachments:
image_url = discord_image.url
image_data = requests.get(image_url).content
try:
response = requests.get(image_url, timeout=30)
image_data = response.content
except requests.RequestException as e:
logger.warning("Failed to download image from %s: %s", image_url, e)
continue
image_bytestream = BytesIO(image_data)
image_data_list.append(image_bytestream)
@@ -421,20 +538,23 @@ async def retcon(ctx, *, message: str):
@bot.command(name="talkforme")
async def talkforme(ctx, *, message: str):
"""Have two bots talk to each other about a topic
async def talkforme(ctx: CommandsContext[Bot], *, message: str) -> None:
"""Have two bots talk to each other about a topic.
Usage: !talkforme bot1 bot2 4 some conversation topic
"""
talk_limit = 20
TALK_LIMIT = 20
MIN_TALKFORME_PARTS = 4
parts = message.split(" ", maxsplit=MIN_TALKFORME_PARTS - 1)
if len(parts) < MIN_TALKFORME_PARTS:
await ctx.send("Usage: !talkforme bot1 bot2 <number> <topic>")
return
bot1_name, bot2_name, limit, topic_list = (
message.split(" ")[0],
message.split(" ")[1],
message.split(" ")[2],
message.split(" ")[3:],
)
bot1_name = parts[0]
bot2_name = parts[1]
limit = parts[2]
topic_list = parts[3:]
topic = " ".join(topic_list)
@@ -444,49 +564,46 @@ async def talkforme(ctx, *, message: str):
if not bot1:
await ctx.send(f"{bot1_name} is not a real bot...")
return
else:
_, bot1_prompt, _, _ = bot1
_, bot1_prompt, _, _ = bot1
bot2 = custom_bot_manager.get_custom_bot(bot2_name)
if not bot2:
await ctx.send(f"{bot2_name} is not a real bot...")
return
else:
_, bot2_prompt, _, _ = bot2
_, bot2_prompt, _, _ = bot2
await ctx.send(
f'{bot1_name} is going to talk to {bot2_name} about "{topic[:50]}" for {limit} replies.'
f"{bot1_name} is going to talk to {bot2_name} "
f'about "{topic[:50]}" for {limit} replies.',
)
bot_list = [(bot1_name, bot1_prompt), (bot2_name, bot2_prompt)]
message_limit = int(limit)
try:
message_limit = int(limit)
except ValueError:
await ctx.send("Message limit must be an integer.")
return
def flip_counter(counter: int):
if counter == 0:
return 1
else:
return 0
def flip_user(user: str):
if user == "user":
return "assistant"
else:
return "user"
def flip_counter(counter: int) -> int:
"""Flip between 0 and 1."""
return 1 if counter == 0 else 0
message_counter = 0
bot_counter = 0
current_bot = bot_list[bot_counter]
prompt_histories = [
prompt_histories: list[list[dict[str, str]]] = [
[{"role": "user", "content": topic}],
[{"role": "assistant", "content": topic}],
]
first_bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=current_bot[1]
+ f"\nKeep your responses under 2-3 sentences. You are talking to {current_bot[flip_counter(bot_counter)][0]}",
prompts=prompt_histories[bot_counter], # type: ignore
system_prompt=(
current_bot[1] + f"\nKeep your responses under 2-3 sentences. "
f"You are talking to {current_bot[flip_counter(bot_counter)][0]}"
),
prompts=prompt_histories[bot_counter],
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
@@ -498,13 +615,15 @@ async def talkforme(ctx, *, message: str):
bot_counter = flip_counter(counter=bot_counter)
while message_counter < min(message_limit, TALK_LIMIT):
while message_counter < min(message_limit, talk_limit):
current_bot = bot_list[bot_counter]
logger.info(f"Current bot is {current_bot}")
logger.info("Current bot is %s", current_bot[0])
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=current_bot[1]
+ f"\nKeep your responses under 2-3 sentences. {current_bot[flip_counter(bot_counter)]}",
prompts=prompt_histories[bot_counter], # type: ignore
system_prompt=(
current_bot[1] + f"\nKeep your responses under 2-3 sentences. "
f"{current_bot[flip_counter(bot_counter)]}"
),
prompts=prompt_histories[bot_counter],
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
@@ -512,10 +631,10 @@ async def talkforme(ctx, *, message: str):
)
message_counter += 1
prompt_histories[bot_counter].append(
{"role": "assistant", "content": bot_response}
{"role": "assistant", "content": bot_response},
)
prompt_histories[flip_counter(bot_counter)].append(
{"role": "user", "content": bot_response}
{"role": "user", "content": bot_response},
)
await ctx.send(f"## {current_bot[0]}")
while bot_response:
@@ -523,12 +642,27 @@ async def talkforme(ctx, *, message: str):
bot_response = bot_response[1000:]
await ctx.send(send_chunk)
bot_counter = flip_counter(counter=bot_counter)
logger.info(f"Message counter is {message_counter}/{limit}")
logger.info("Message counter is %d/%s", message_counter, limit)
async def handle_chat(
ctx, *, bot_name: str, message: str, system_prompt: str, response_prefix: str
):
ctx: CommandsContext[Bot],
*,
bot_name: str,
message: str,
system_prompt: str,
response_prefix: str,
) -> None:
"""Handle chat completion for a custom bot command.
Args:
ctx: The Discord command context.
bot_name: The name of the custom bot.
message: The user message to process.
system_prompt: The system prompt for the bot.
response_prefix: The prefix for the response message.
"""
await ctx.send(f"{bot_name} is searching its databanks for {message[:50]}...")
# Get database instance
@@ -536,7 +670,9 @@ async def handle_chat(
# Get conversation context using RAG
context = db.get_conversation_context(
user_id=str(ctx.author.id), current_message=message, max_context=5
user_id=str(ctx.author.id),
current_message=message,
max_context=5,
)
prompts = [{"role": "user", "content": message}]
@@ -544,14 +680,14 @@ async def handle_chat(
if context:
prompts = context + prompts
logger.info(prompts)
logger.info("Chat prompts: %s", prompts)
system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences."
try:
bot_response = llama_wrapper.chat_completion_with_history(
system_prompt=system_prompt_edit,
prompts=prompts, # type: ignore
prompts=prompts,
openai_url=CHAT_ENDPOINT,
openai_api_key=CHAT_ENDPOINT_KEY,
model=CHAT_MODEL,
@@ -568,14 +704,15 @@ async def handle_chat(
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
db.add_message(
message_id=f"{ctx.message.id}_response",
user_id=str(bot.user.id), # type: ignore
username=bot.user.name, # type: ignore
content=f"Bot: {bot_response}",
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
if ctx.bot.user is not None:
db.add_message(
message_id=f"{ctx.message.id}_response",
user_id=str(ctx.bot.user.id),
username=ctx.bot.user.name,
content=f"Bot: {bot_response}",
channel_id=str(ctx.channel.id),
guild_id=str(ctx.guild.id) if ctx.guild else None,
)
# Send the response back to the chat
await ctx.send(response_prefix)
@@ -584,8 +721,9 @@ async def handle_chat(
bot_response = bot_response[1000:]
await ctx.send(send_chunk)
except Exception as e:
await ctx.send(f"Error: {str(e)}")
except Exception:
logger.exception("Error in handle_chat")
await ctx.send("An error occurred while processing your request.")
# Run the bot