746 lines
23 KiB
Python
746 lines
23 KiB
Python
"""Main Discord bot application."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import logging
|
|
from io import BytesIO
|
|
from typing import TYPE_CHECKING
|
|
|
|
import discord
|
|
import requests
|
|
from discord import Message
|
|
from discord.ext import commands
|
|
|
|
from vibe_bot import llama_wrapper, tts
|
|
from vibe_bot.config import (
|
|
CHAT_ENDPOINT,
|
|
CHAT_ENDPOINT_KEY,
|
|
CHAT_MODEL,
|
|
DISCORD_TOKEN,
|
|
IMAGE_EDIT_ENDPOINT,
|
|
IMAGE_EDIT_ENDPOINT_KEY,
|
|
IMAGE_EDIT_MODEL,
|
|
IMAGE_GEN_ENDPOINT,
|
|
IMAGE_GEN_ENDPOINT_KEY,
|
|
IMAGE_GEN_MODEL,
|
|
MAX_COMPLETION_TOKENS,
|
|
TTS_MODEL_PATH,
|
|
TTS_SPEED,
|
|
TTS_VOICE,
|
|
TTS_VOICES_PATH,
|
|
)
|
|
from vibe_bot.database import CustomBotManager, get_database
|
|
|
|
if TYPE_CHECKING:
|
|
from discord.ext.commands import Bot
|
|
from discord.ext.commands import Context as CommandsContext
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize the bot
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
bot = commands.Bot(command_prefix="!", intents=intents)
|
|
|
|
# Initialize TTS engine
|
|
tts_engine: tts.TTSEngine | None = None
|
|
try:
|
|
tts_engine = tts.TTSEngine(TTS_MODEL_PATH, TTS_VOICES_PATH)
|
|
logger.info("TTS engine initialized successfully")
|
|
except Exception:
|
|
logger.exception("Failed to initialize TTS engine")
|
|
logger.info(
|
|
"Make sure kokoro-v1.0.onnx and voices-v1.0.bin are in the project directory",
|
|
)
|
|
|
|
# Name and personality validation constants
|
|
MIN_BOT_NAME_LENGTH = 2
|
|
MAX_BOT_NAME_LENGTH = 50
|
|
MIN_PERSONALITY_LENGTH = 10
|
|
|
|
|
|
@bot.event
|
|
async def on_ready() -> None:
|
|
"""Log when the bot is ready and logged in."""
|
|
logger.info("Bot is starting up...")
|
|
logger.info("Bot logged in as %s", bot.user)
|
|
|
|
|
|
@bot.command(name="custom-bot")
|
|
async def custom_bot(
|
|
ctx: CommandsContext[Bot],
|
|
bot_name: str,
|
|
*,
|
|
personality: str,
|
|
) -> None:
|
|
"""Create a custom bot with a name and personality.
|
|
|
|
Usage: !custom-bot <bot_name> <personality_description>
|
|
Example: !custom-bot alfred you are a proper british butler
|
|
"""
|
|
logger.info(
|
|
"Custom bot command initiated by %s: name=%r, personality length=%d",
|
|
ctx.author.name,
|
|
bot_name,
|
|
len(personality),
|
|
)
|
|
|
|
# Validate bot name
|
|
name_length = 0 if not bot_name else len(bot_name)
|
|
if (
|
|
not bot_name
|
|
or name_length < MIN_BOT_NAME_LENGTH
|
|
or name_length > MAX_BOT_NAME_LENGTH
|
|
):
|
|
logger.warning(
|
|
"Invalid bot name from %s: %r (length: %d)",
|
|
ctx.author.name,
|
|
bot_name,
|
|
name_length,
|
|
)
|
|
await ctx.send("Invalid bot name. Name must be between 2 and 50 characters.")
|
|
return
|
|
|
|
logger.info("Bot name validation passed for %r", bot_name)
|
|
|
|
# Validate personality
|
|
personality_length = 0 if not personality else len(personality)
|
|
if not personality or personality_length < MIN_PERSONALITY_LENGTH:
|
|
logger.warning(
|
|
"Invalid personality from %s: length=%d",
|
|
ctx.author.name,
|
|
personality_length,
|
|
)
|
|
await ctx.send(
|
|
"Invalid personality. Description must be at least 10 characters.",
|
|
)
|
|
return
|
|
|
|
logger.info("Personality validation passed for bot %r", bot_name)
|
|
|
|
# Create custom bot manager
|
|
logger.info("Initializing CustomBotManager for user %s", ctx.author.name)
|
|
custom_bot_manager = CustomBotManager()
|
|
|
|
# Create the custom bot
|
|
logger.info(
|
|
"Attempting to create custom bot %r for user %s",
|
|
bot_name,
|
|
ctx.author.name,
|
|
)
|
|
success = custom_bot_manager.create_custom_bot(
|
|
bot_name=bot_name,
|
|
system_prompt=personality,
|
|
created_by=str(ctx.author.id),
|
|
)
|
|
|
|
if success:
|
|
logger.info(
|
|
"Successfully created custom bot %r for user %s",
|
|
bot_name,
|
|
ctx.author.name,
|
|
)
|
|
await ctx.send(
|
|
f"Custom bot **'{bot_name}'** has been created "
|
|
f"with personality: *{personality}*",
|
|
)
|
|
await ctx.send(
|
|
f"\nYou can now use this bot with: " f"`!{bot_name} <your message>`",
|
|
)
|
|
else:
|
|
logger.warning(
|
|
"Failed to create custom bot %r for user %s",
|
|
bot_name,
|
|
ctx.author.name,
|
|
)
|
|
await ctx.send("Failed to create custom bot. It may already exist.")
|
|
|
|
|
|
@bot.command(name="list-custom-bots")
|
|
async def list_custom_bots(ctx: CommandsContext[Bot]) -> None:
|
|
"""List all custom bots available in the server."""
|
|
logger.info("Listing custom bots requested by %s", ctx.author.name)
|
|
|
|
# Create custom bot manager
|
|
logger.info("Initializing CustomBotManager to list custom bots")
|
|
custom_bot_manager = CustomBotManager()
|
|
|
|
logger.info("Fetching list of custom bots from database")
|
|
bots = custom_bot_manager.list_custom_bots()
|
|
|
|
if not bots:
|
|
logger.info("No custom bots found for user %s", ctx.author.name)
|
|
await ctx.send(
|
|
"No custom bots have been created yet. "
|
|
"Use `!custom-bot <name> <personality>` to create one.",
|
|
)
|
|
return
|
|
|
|
logger.info(
|
|
"Found %d custom bots, displaying top 10 for %s",
|
|
len(bots),
|
|
ctx.author.name,
|
|
)
|
|
bot_list = "Available Custom Bots:\n\n"
|
|
for name, _prompt, _creator in bots:
|
|
bot_list += f"* {name}\n"
|
|
|
|
logger.info("Sending bot list response to %s", ctx.author.name)
|
|
await ctx.send(bot_list)
|
|
|
|
|
|
@bot.command(name="delete-custom-bot")
|
|
async def delete_custom_bot(ctx: CommandsContext[Bot], bot_name: str) -> None:
|
|
"""Delete a custom bot (only the creator can delete).
|
|
|
|
Usage: !delete-custom-bot <bot_name>
|
|
"""
|
|
logger.info(
|
|
"Delete custom bot command initiated by %s: bot_name=%r",
|
|
ctx.author.name,
|
|
bot_name,
|
|
)
|
|
|
|
# Create custom bot manager
|
|
logger.info("Initializing CustomBotManager for delete operation")
|
|
custom_bot_manager = CustomBotManager()
|
|
|
|
# Get bot info
|
|
logger.info("Looking up custom bot %r in database", bot_name)
|
|
bot_info = custom_bot_manager.get_custom_bot(bot_name)
|
|
|
|
if not bot_info:
|
|
logger.warning(
|
|
"Custom bot %r not found by user %s",
|
|
bot_name,
|
|
ctx.author.name,
|
|
)
|
|
await ctx.send(f"Custom bot '{bot_name}' not found.")
|
|
return
|
|
|
|
logger.info(
|
|
"Custom bot %r found, owned by user %s",
|
|
bot_name,
|
|
bot_info[2],
|
|
)
|
|
|
|
# Check ownership
|
|
if bot_info[2] != str(ctx.author.id):
|
|
logger.warning(
|
|
"User %s attempted to delete bot %r they don't own",
|
|
ctx.author.name,
|
|
bot_name,
|
|
)
|
|
await ctx.send("You can only delete your own custom bots.")
|
|
return
|
|
|
|
logger.info(
|
|
"User %s is authorized to delete bot %r",
|
|
ctx.author.name,
|
|
bot_name,
|
|
)
|
|
|
|
# Delete the bot
|
|
logger.info("Deleting custom bot %r from database", bot_name)
|
|
success = custom_bot_manager.delete_custom_bot(bot_name)
|
|
|
|
if success:
|
|
logger.info(
|
|
"Successfully deleted custom bot %r by user %s",
|
|
bot_name,
|
|
ctx.author.name,
|
|
)
|
|
await ctx.send(f"Custom bot '{bot_name}' has been deleted.")
|
|
else:
|
|
logger.warning(
|
|
"Failed to delete custom bot %r by user %s",
|
|
bot_name,
|
|
ctx.author.name,
|
|
)
|
|
await ctx.send("Failed to delete custom bot.")
|
|
|
|
|
|
# Handle custom bot commands
|
|
@bot.event
|
|
async def on_message(message: Message) -> None:
|
|
"""Handle incoming messages for custom bot command detection."""
|
|
# Skip bot messages
|
|
if message.author == bot.user:
|
|
return
|
|
|
|
message_author = message.author.name
|
|
message_content = message.content.lower()
|
|
|
|
logger.debug(
|
|
"Processing message from %s: %r...",
|
|
message_author,
|
|
message_content[:50],
|
|
)
|
|
|
|
ctx = await bot.get_context(message)
|
|
|
|
logger.info("Initializing CustomBotManager to check for custom bot commands")
|
|
custom_bot_manager = CustomBotManager()
|
|
|
|
logger.info("Fetching list of custom bots to check for matching commands")
|
|
custom_bots = custom_bot_manager.list_custom_bots()
|
|
|
|
logger.info("Checking %d custom bots for command match", len(custom_bots))
|
|
for bot_name, system_prompt, _ in custom_bots:
|
|
# Check if message starts with the custom bot name followed by a space
|
|
if message_content.startswith(f"!{bot_name} "):
|
|
logger.info(
|
|
"Custom bot command detected: %r triggered by %s",
|
|
bot_name,
|
|
message.author.name,
|
|
)
|
|
|
|
# Extract the actual message (remove the bot name prefix)
|
|
user_message = message.content[len(f"!{bot_name} ") :]
|
|
logger.debug(
|
|
"Extracted user message for bot %r: %r...",
|
|
bot_name,
|
|
user_message[:50],
|
|
)
|
|
|
|
# Prepare the payload with custom personality
|
|
response_prefix = f"{bot_name} response"
|
|
|
|
logger.info("Sending request to OpenAI API for bot %r", bot_name)
|
|
await handle_chat(
|
|
ctx=ctx,
|
|
bot_name=bot_name,
|
|
message=user_message,
|
|
system_prompt=system_prompt,
|
|
response_prefix=response_prefix,
|
|
)
|
|
return
|
|
|
|
# If no custom bot matched, call the default event handler
|
|
await bot.process_commands(message)
|
|
|
|
|
|
@bot.command(name="speak")
|
|
async def speak(ctx: CommandsContext[Bot], *, message: str) -> None:
|
|
"""Have the bot speak the given text using Kokoro TTS, or have a custom bot speak.
|
|
|
|
Usage: !speak <text> - plain text to speech
|
|
Usage: !speak <bot_name> <text> - have a custom bot respond and speak
|
|
Example: !speak hello world
|
|
Example: !speak alfred what time is it
|
|
"""
|
|
if tts_engine is None:
|
|
await ctx.send(
|
|
"TTS engine not initialized. "
|
|
"Make sure kokoro-v1.0.onnx and voices-v1.0.bin are present.",
|
|
)
|
|
return
|
|
|
|
if not message or not message.strip():
|
|
await ctx.send("Please provide text to speak.")
|
|
return
|
|
|
|
custom_bot_manager = CustomBotManager()
|
|
custom_bots = custom_bot_manager.list_custom_bots()
|
|
bot_names = [b[0] for b in custom_bots]
|
|
|
|
first_word = message.split(maxsplit=1)[0] if message.split() else ""
|
|
if first_word in bot_names:
|
|
await _speak_with_bot(ctx, first_word, message, tts_engine, custom_bot_manager)
|
|
else:
|
|
await _speak_plain(ctx, message, tts_engine)
|
|
|
|
|
|
async def _speak_with_bot(
|
|
ctx: CommandsContext[Bot],
|
|
bot_name: str,
|
|
message: str,
|
|
engine: tts.TTSEngine,
|
|
custom_bot_manager: CustomBotManager,
|
|
) -> None:
|
|
"""Handle speak command for a custom bot."""
|
|
text_to_speak = message[len(bot_name) :].lstrip()
|
|
if not text_to_speak:
|
|
await ctx.send("Please provide text for the bot to respond to.")
|
|
return
|
|
|
|
await ctx.send(f"**{bot_name}** is thinking...")
|
|
|
|
bot_info = custom_bot_manager.get_custom_bot(bot_name)
|
|
if not bot_info:
|
|
await ctx.send(f"Custom bot '{bot_name}' not found.")
|
|
return
|
|
|
|
_, system_prompt, _, _ = bot_info
|
|
system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences."
|
|
|
|
try:
|
|
db = get_database()
|
|
context = db.get_conversation_context(
|
|
user_id=str(ctx.author.id),
|
|
current_message=text_to_speak,
|
|
max_context=5,
|
|
)
|
|
|
|
prompts = [{"role": "user", "content": text_to_speak}]
|
|
if context:
|
|
prompts = context + prompts
|
|
|
|
bot_response = llama_wrapper.chat_completion_with_history(
|
|
system_prompt=system_prompt_edit,
|
|
prompts=prompts,
|
|
openai_url=CHAT_ENDPOINT,
|
|
openai_api_key=CHAT_ENDPOINT_KEY,
|
|
model=CHAT_MODEL,
|
|
max_tokens=MAX_COMPLETION_TOKENS,
|
|
)
|
|
|
|
if not bot_response:
|
|
await ctx.send(f"**{bot_name}** failed to generate a response.")
|
|
return
|
|
|
|
db.add_message(
|
|
message_id=f"{ctx.message.id}",
|
|
user_id=str(ctx.author.id),
|
|
username=ctx.author.name,
|
|
content=f"User: {text_to_speak}",
|
|
channel_id=str(ctx.channel.id),
|
|
guild_id=str(ctx.guild.id) if ctx.guild else None,
|
|
)
|
|
|
|
if ctx.bot.user is not None:
|
|
db.add_message(
|
|
message_id=f"{ctx.message.id}_response",
|
|
user_id=str(ctx.bot.user.id),
|
|
username=ctx.bot.user.name,
|
|
content=bot_response,
|
|
channel_id=str(ctx.channel.id),
|
|
guild_id=str(ctx.guild.id) if ctx.guild else None,
|
|
)
|
|
|
|
await ctx.send(f"Generating speech for **{bot_name}**...")
|
|
audio_buffer = engine.generate_audio(
|
|
bot_response,
|
|
voice=TTS_VOICE,
|
|
speed=TTS_SPEED,
|
|
)
|
|
|
|
audio_file = discord.File(audio_buffer, filename="speech.mp3")
|
|
await ctx.send(file=audio_file)
|
|
except Exception:
|
|
logger.exception(
|
|
"Error in speak command with bot %r",
|
|
bot_name,
|
|
)
|
|
await ctx.send("Error generating speech.")
|
|
|
|
|
|
async def _speak_plain(
|
|
ctx: CommandsContext[Bot],
|
|
message: str,
|
|
engine: tts.TTSEngine,
|
|
) -> None:
|
|
"""Handle speak command for plain text."""
|
|
try:
|
|
await ctx.send("Generating speech...")
|
|
audio_buffer = engine.generate_audio(
|
|
message,
|
|
voice=TTS_VOICE,
|
|
speed=TTS_SPEED,
|
|
)
|
|
|
|
audio_file = discord.File(audio_buffer, filename="speech.mp3")
|
|
await ctx.send(file=audio_file)
|
|
except Exception:
|
|
logger.exception("Error in speak command")
|
|
await ctx.send("Error generating speech.")
|
|
|
|
|
|
@bot.command(name="doodlebob")
|
|
async def doodlebob(ctx: CommandsContext[Bot], *, message: str) -> None:
|
|
"""Convert a message into an image using Doodlebob."""
|
|
logger.info(
|
|
"Doodlebob command triggered by %s: %s",
|
|
ctx.author.name,
|
|
message[:100],
|
|
)
|
|
await ctx.send(f"**Doodlebob erasing {message[:100]}...**")
|
|
|
|
system_prompt = (
|
|
"Given the following message, convert it to a detailed image generation "
|
|
"prompt that will be passed directly into an image generation model. "
|
|
"If told to generate an image of yourself, generate a picture of a rat. "
|
|
"If told to generate a picture of 'me', 'myself', or some other self "
|
|
"reference, generate a picture of a rat. Only respond with a valid image "
|
|
"generation prompt, do not affirm the user or respond to the user's questions."
|
|
)
|
|
|
|
# Wait for the generated image prompt
|
|
image_prompt = llama_wrapper.chat_completion_instruct(
|
|
system_prompt=system_prompt,
|
|
user_prompt=message,
|
|
openai_url=CHAT_ENDPOINT,
|
|
openai_api_key=CHAT_ENDPOINT_KEY,
|
|
model=CHAT_MODEL,
|
|
max_tokens=MAX_COMPLETION_TOKENS,
|
|
)
|
|
|
|
# If the string is empty we had an error
|
|
if image_prompt == "":
|
|
logger.warning("No image prompt supplied. Check for errors.")
|
|
return
|
|
|
|
# Alert the user we're generating the image
|
|
await ctx.send(f"**Doodlebob calling drone strike on {image_prompt[:100]}...**")
|
|
|
|
image_b64 = llama_wrapper.image_generation(
|
|
prompt=image_prompt,
|
|
openai_url=IMAGE_GEN_ENDPOINT,
|
|
openai_api_key=IMAGE_GEN_ENDPOINT_KEY,
|
|
model=IMAGE_GEN_MODEL,
|
|
)
|
|
|
|
if not image_b64:
|
|
logger.warning("Image generation returned empty response.")
|
|
await ctx.send("Failed to generate image. The server may be busy.")
|
|
return
|
|
|
|
try:
|
|
edited_image_data = BytesIO(base64.b64decode(image_b64))
|
|
send_img = discord.File(edited_image_data, filename="image.png")
|
|
await ctx.send(file=send_img)
|
|
except Exception:
|
|
logger.exception("Failed to decode image data")
|
|
await ctx.send("Failed to process the generated image.")
|
|
|
|
|
|
@bot.command(name="retcon")
|
|
async def retcon(ctx: CommandsContext[Bot], *, message: str) -> None:
|
|
"""Edit an attached image based on a text prompt."""
|
|
image_data_list: list[BytesIO] = []
|
|
for discord_image in ctx.message.attachments:
|
|
image_url = discord_image.url
|
|
try:
|
|
response = requests.get(image_url, timeout=30)
|
|
image_data = response.content
|
|
except requests.RequestException as e:
|
|
logger.warning("Failed to download image from %s: %s", image_url, e)
|
|
continue
|
|
image_bytestream = BytesIO(image_data)
|
|
image_data_list.append(image_bytestream)
|
|
|
|
await ctx.send(f"**Rewriting history to match {message[:100]}...**")
|
|
|
|
image_b64 = llama_wrapper.image_edit(
|
|
image=image_data_list,
|
|
prompt=message,
|
|
openai_url=IMAGE_EDIT_ENDPOINT,
|
|
openai_api_key=IMAGE_EDIT_ENDPOINT_KEY,
|
|
model=IMAGE_EDIT_MODEL,
|
|
)
|
|
|
|
# Save the image to a file
|
|
edited_image_data = BytesIO(base64.b64decode(image_b64))
|
|
send_img = discord.File(edited_image_data, filename="image.png")
|
|
await ctx.send(file=send_img)
|
|
|
|
|
|
@bot.command(name="talkforme")
|
|
async def talkforme(ctx: CommandsContext[Bot], *, message: str) -> None:
|
|
"""Have two bots talk to each other about a topic.
|
|
|
|
Usage: !talkforme bot1 bot2 4 some conversation topic
|
|
"""
|
|
talk_limit = 20
|
|
|
|
MIN_TALKFORME_PARTS = 4
|
|
parts = message.split(" ", maxsplit=MIN_TALKFORME_PARTS - 1)
|
|
if len(parts) < MIN_TALKFORME_PARTS:
|
|
await ctx.send("Usage: !talkforme bot1 bot2 <number> <topic>")
|
|
return
|
|
|
|
bot1_name = parts[0]
|
|
bot2_name = parts[1]
|
|
limit = parts[2]
|
|
topic_list = parts[3:]
|
|
|
|
topic = " ".join(topic_list)
|
|
|
|
custom_bot_manager = CustomBotManager()
|
|
bot1 = custom_bot_manager.get_custom_bot(bot1_name)
|
|
|
|
if not bot1:
|
|
await ctx.send(f"{bot1_name} is not a real bot...")
|
|
return
|
|
_, bot1_prompt, _, _ = bot1
|
|
|
|
bot2 = custom_bot_manager.get_custom_bot(bot2_name)
|
|
|
|
if not bot2:
|
|
await ctx.send(f"{bot2_name} is not a real bot...")
|
|
return
|
|
_, bot2_prompt, _, _ = bot2
|
|
|
|
await ctx.send(
|
|
f"{bot1_name} is going to talk to {bot2_name} "
|
|
f'about "{topic[:50]}" for {limit} replies.',
|
|
)
|
|
|
|
bot_list = [(bot1_name, bot1_prompt), (bot2_name, bot2_prompt)]
|
|
|
|
try:
|
|
message_limit = int(limit)
|
|
except ValueError:
|
|
await ctx.send("Message limit must be an integer.")
|
|
return
|
|
|
|
def flip_counter(counter: int) -> int:
|
|
"""Flip between 0 and 1."""
|
|
return 1 if counter == 0 else 0
|
|
|
|
message_counter = 0
|
|
bot_counter = 0
|
|
current_bot = bot_list[bot_counter]
|
|
prompt_histories: list[list[dict[str, str]]] = [
|
|
[{"role": "user", "content": topic}],
|
|
[{"role": "assistant", "content": topic}],
|
|
]
|
|
|
|
first_bot_response = llama_wrapper.chat_completion_with_history(
|
|
system_prompt=(
|
|
current_bot[1] + f"\nKeep your responses under 2-3 sentences. "
|
|
f"You are talking to {current_bot[flip_counter(bot_counter)][0]}"
|
|
),
|
|
prompts=prompt_histories[bot_counter],
|
|
openai_url=CHAT_ENDPOINT,
|
|
openai_api_key=CHAT_ENDPOINT_KEY,
|
|
model=CHAT_MODEL,
|
|
max_tokens=MAX_COMPLETION_TOKENS,
|
|
)
|
|
await ctx.send(f"## {current_bot[0]}\n{first_bot_response}")
|
|
prompt_histories[0].append({"role": "assistant", "content": first_bot_response})
|
|
prompt_histories[1].append({"role": "user", "content": first_bot_response})
|
|
|
|
bot_counter = flip_counter(counter=bot_counter)
|
|
|
|
while message_counter < min(message_limit, talk_limit):
|
|
current_bot = bot_list[bot_counter]
|
|
logger.info("Current bot is %s", current_bot[0])
|
|
bot_response = llama_wrapper.chat_completion_with_history(
|
|
system_prompt=(
|
|
current_bot[1] + f"\nKeep your responses under 2-3 sentences. "
|
|
f"You are talking to {current_bot[flip_counter(bot_counter)][0]}"
|
|
),
|
|
prompts=prompt_histories[bot_counter],
|
|
openai_url=CHAT_ENDPOINT,
|
|
openai_api_key=CHAT_ENDPOINT_KEY,
|
|
model=CHAT_MODEL,
|
|
max_tokens=MAX_COMPLETION_TOKENS,
|
|
)
|
|
message_counter += 1
|
|
prompt_histories[bot_counter].append(
|
|
{"role": "assistant", "content": bot_response},
|
|
)
|
|
prompt_histories[flip_counter(bot_counter)].append(
|
|
{"role": "user", "content": bot_response},
|
|
)
|
|
await ctx.send(f"## {current_bot[0]}")
|
|
while bot_response:
|
|
send_chunk = bot_response[:1000]
|
|
bot_response = bot_response[1000:]
|
|
await ctx.send(send_chunk)
|
|
bot_counter = flip_counter(counter=bot_counter)
|
|
logger.info("Message counter is %d/%s", message_counter, limit)
|
|
|
|
|
|
async def handle_chat(
|
|
ctx: CommandsContext[Bot],
|
|
*,
|
|
bot_name: str,
|
|
message: str,
|
|
system_prompt: str,
|
|
response_prefix: str,
|
|
) -> None:
|
|
"""Handle chat completion for a custom bot command.
|
|
|
|
Args:
|
|
ctx: The Discord command context.
|
|
bot_name: The name of the custom bot.
|
|
message: The user message to process.
|
|
system_prompt: The system prompt for the bot.
|
|
response_prefix: The prefix for the response message.
|
|
|
|
"""
|
|
await ctx.send(f"{bot_name} is searching its databanks for {message[:50]}...")
|
|
|
|
# Get database instance
|
|
db = get_database()
|
|
|
|
# Get conversation context using RAG
|
|
context = db.get_conversation_context(
|
|
user_id=str(ctx.author.id),
|
|
current_message=message,
|
|
max_context=5,
|
|
)
|
|
|
|
prompts = [{"role": "user", "content": message}]
|
|
|
|
if context:
|
|
prompts = context + prompts
|
|
|
|
logger.info("Chat prompts: %s", prompts)
|
|
|
|
system_prompt_edit = f"{system_prompt}\nKeep your responses under 2-3 sentences."
|
|
|
|
try:
|
|
bot_response = llama_wrapper.chat_completion_with_history(
|
|
system_prompt=system_prompt_edit,
|
|
prompts=prompts,
|
|
openai_url=CHAT_ENDPOINT,
|
|
openai_api_key=CHAT_ENDPOINT_KEY,
|
|
model=CHAT_MODEL,
|
|
max_tokens=MAX_COMPLETION_TOKENS,
|
|
)
|
|
|
|
# Store both user message and bot response in the database
|
|
db.add_message(
|
|
message_id=f"{ctx.message.id}",
|
|
user_id=str(ctx.author.id),
|
|
username=ctx.author.name,
|
|
content=f"User: {message}",
|
|
channel_id=str(ctx.channel.id),
|
|
guild_id=str(ctx.guild.id) if ctx.guild else None,
|
|
)
|
|
|
|
if ctx.bot.user is not None:
|
|
db.add_message(
|
|
message_id=f"{ctx.message.id}_response",
|
|
user_id=str(ctx.bot.user.id),
|
|
username=ctx.bot.user.name,
|
|
content=bot_response,
|
|
channel_id=str(ctx.channel.id),
|
|
guild_id=str(ctx.guild.id) if ctx.guild else None,
|
|
)
|
|
|
|
# Send the response back to the chat
|
|
await ctx.send(response_prefix)
|
|
while bot_response:
|
|
send_chunk = bot_response[:1000]
|
|
bot_response = bot_response[1000:]
|
|
await ctx.send(send_chunk)
|
|
|
|
except Exception:
|
|
logger.exception("Error in handle_chat")
|
|
await ctx.send("An error occurred while processing your request.")
|
|
|
|
|
|
# Run the bot
|
|
if __name__ == "__main__":
|
|
bot.run(DISCORD_TOKEN)
|