372 lines
12 KiB
Python
372 lines
12 KiB
Python
import discord
|
|
from discord.ext import commands
|
|
import requests
|
|
import os
|
|
import base64
|
|
from io import BytesIO
|
|
from openai import OpenAI
|
|
from database import get_database, CustomBotManager
|
|
|
|
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN", "placeholder")
|
|
|
|
OPENAI_API_ENDPOINT = os.getenv("OPENAI_API_ENDPOINT")
|
|
IMAGE_GEN_ENDPOINT = os.getenv("IMAGE_GEN_ENDPOINT")
|
|
IMAGE_EDIT_ENDPOINT = os.getenv("IMAGE_EDIT_ENDPOINT")
|
|
MAX_COMPLETION_TOKENS = int(os.getenv("MAX_COMPLETION_TOKENS", "1000"))
|
|
|
|
if not OPENAI_API_ENDPOINT:
|
|
raise Exception("OPENAI_API_ENDPOINT required.")
|
|
|
|
if not IMAGE_GEN_ENDPOINT:
|
|
raise Exception("IMAGE_GEN_ENDPOINT required.")
|
|
|
|
# Set your OpenAI API key as an environment variable
|
|
# You can also pass it directly but environment variables are safer
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "placeholder")
|
|
|
|
# Initialize the bot
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
bot = commands.Bot(command_prefix="!", intents=intents)
|
|
|
|
# OpenAI Completions API endpoint
|
|
OPENAI_COMPLETIONS_URL = f"{OPENAI_API_ENDPOINT}/chat/completions"
|
|
|
|
|
|
@bot.event
|
|
async def on_ready():
|
|
print(f"Bot logged in as {bot.user}")
|
|
|
|
|
|
@bot.command(name="custom-bot")
|
|
async def custom_bot(ctx, bot_name: str, *, personality: str):
|
|
"""Create a custom bot with a name and personality
|
|
|
|
Usage: !custom-bot <bot_name> <personality_description>
|
|
Example: !custom-bot alfred you are a proper british butler
|
|
"""
|
|
# Validate bot name
|
|
if not bot_name or len(bot_name) < 2 or len(bot_name) > 50:
|
|
await ctx.send("❌ Invalid bot name. Name must be between 2 and 50 characters.")
|
|
return
|
|
|
|
# Validate personality
|
|
if not personality or len(personality) < 10:
|
|
await ctx.send(
|
|
"❌ Invalid personality. Description must be at least 10 characters."
|
|
)
|
|
return
|
|
|
|
# Create custom bot manager
|
|
custom_bot_manager = CustomBotManager()
|
|
|
|
# Create the custom bot
|
|
success = custom_bot_manager.create_custom_bot(
|
|
bot_name=bot_name, system_prompt=personality, created_by=str(ctx.author.id)
|
|
)
|
|
|
|
if success:
|
|
await ctx.send(
|
|
f"✅ Custom bot **'{bot_name}'** has been created with personality: *{personality}*"
|
|
)
|
|
await ctx.send(f"\nYou can now use this bot with: `!{bot_name} <your message>`")
|
|
else:
|
|
await ctx.send("❌ Failed to create custom bot. It may already exist.")
|
|
|
|
|
|
@bot.command(name="list-custom-bots")
|
|
async def list_custom_bots(ctx):
|
|
"""List all custom bots available in the server"""
|
|
custom_bot_manager = CustomBotManager()
|
|
bots = custom_bot_manager.list_custom_bots()
|
|
|
|
if not bots:
|
|
await ctx.send(
|
|
"No custom bots have been created yet. Use `!custom-bot <name> <personality>` to create one."
|
|
)
|
|
return
|
|
|
|
bot_list = "🤖 **Available Custom Bots**:\n\n"
|
|
for name, prompt, creator in bots[:10]: # Limit to 10 bots
|
|
bot_list += f"• **{name}** (created by {creator})\n"
|
|
|
|
await ctx.send(bot_list)
|
|
|
|
|
|
@bot.command(name="delete-custom-bot")
|
|
async def delete_custom_bot(ctx, bot_name: str):
|
|
"""Delete a custom bot (only the creator can delete)
|
|
|
|
Usage: !delete-custom-bot <bot_name>
|
|
"""
|
|
custom_bot_manager = CustomBotManager()
|
|
bot_info = custom_bot_manager.get_custom_bot(bot_name)
|
|
|
|
if not bot_info:
|
|
await ctx.send(f"❌ Custom bot '{bot_name}' not found.")
|
|
return
|
|
|
|
if bot_info[2] != str(ctx.author.id):
|
|
await ctx.send("❌ You can only delete your own custom bots.")
|
|
return
|
|
|
|
success = custom_bot_manager.delete_custom_bot(bot_name)
|
|
|
|
if success:
|
|
await ctx.send(f"✅ Custom bot '{bot_name}' has been deleted.")
|
|
else:
|
|
await ctx.send("❌ Failed to delete custom bot.")
|
|
|
|
|
|
# Handle custom bot commands
|
|
@bot.event
|
|
async def on_message(message):
|
|
# Skip bot messages
|
|
if message.author == bot.user:
|
|
return
|
|
|
|
ctx = await bot.get_context(message)
|
|
|
|
# Check if the message starts with a custom bot command
|
|
content = message.content.lower()
|
|
|
|
custom_bot_manager = CustomBotManager()
|
|
custom_bots = custom_bot_manager.list_custom_bots()
|
|
|
|
for bot_name, system_prompt, _ in custom_bots:
|
|
# Check if message starts with the custom bot name followed by a space
|
|
if content.startswith(f"!{bot_name} "):
|
|
# Extract the actual message (remove the bot name prefix)
|
|
user_message = message.content[len(f"!{bot_name} ") :]
|
|
|
|
# Prepare the payload with custom personality
|
|
payload = {
|
|
"model": "qwen3-vl-30b-a3b-instruct",
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": system_prompt,
|
|
},
|
|
{"role": "user", "content": user_message},
|
|
],
|
|
"max_completion_tokens": MAX_COMPLETION_TOKENS,
|
|
}
|
|
|
|
response_prefix = f"**{bot_name} response**"
|
|
|
|
await handle_chat(
|
|
ctx=ctx,
|
|
message=user_message,
|
|
payload=payload,
|
|
response_prefix=response_prefix,
|
|
)
|
|
return
|
|
|
|
# If no custom bot matched, call the default event handler
|
|
await bot.process_commands(message)
|
|
|
|
|
|
@bot.command(name="doodlebob")
|
|
async def doodlebob(ctx, *, message: str):
|
|
await ctx.send(f"**Doodlebob erasing {message[:100]}...**")
|
|
|
|
image_prompt_payload = {
|
|
"model": "qwen3-vl-30b-a3b-instruct",
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": (
|
|
"Given the following message, convert it to a detailed image generation prompt that will be passed directly into an image generation model."
|
|
"If told to generate an image of yourself, generate a picture of a rat. If told to generate a picture of 'me', 'myself', or some other self"
|
|
" reference, generate a picture of a rat. Only respond with a valid image generation prompt, do not affirm the user or respond to the user's"
|
|
" questions."
|
|
),
|
|
},
|
|
{"role": "user", "content": message},
|
|
],
|
|
}
|
|
|
|
# Wait for the generated image prompt
|
|
image_prompt = await call_llm(ctx, image_prompt_payload)
|
|
|
|
# If the string is empty we had an error
|
|
if image_prompt == "":
|
|
print("No image prompt supplied. Check for errors.")
|
|
return
|
|
|
|
# Alert the user we're generating the image
|
|
await ctx.send(f"**Doodlebob calling drone strike on {image_prompt[:100]}...**")
|
|
|
|
# Create the image prompt payload
|
|
image_payload = {
|
|
"model": "default",
|
|
"prompt": image_prompt,
|
|
"n": 1,
|
|
"size": "1024x1024",
|
|
}
|
|
|
|
# Call the image generation endpoint
|
|
response = requests.post(
|
|
f"{IMAGE_GEN_ENDPOINT}/images/generations",
|
|
json=image_payload,
|
|
timeout=120,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
# Send image
|
|
image_data = BytesIO(base64.b64decode(result["data"][0]["b64_json"]))
|
|
send_img = discord.File(image_data, filename="image.png")
|
|
await ctx.send(file=send_img)
|
|
|
|
else:
|
|
print(f"❌ Error: {response.status_code}")
|
|
print(response.text)
|
|
return None
|
|
|
|
|
|
@bot.command(name="retcon")
|
|
async def retcon(ctx, *, message: str):
|
|
image_url = ctx.message.attachments[0].url
|
|
image_data = requests.get(image_url).content
|
|
image_bytestream = BytesIO(image_data)
|
|
|
|
await ctx.send(f"**Rewriting history to match {message[:100]}...**")
|
|
|
|
client = OpenAI(base_url=IMAGE_EDIT_ENDPOINT, api_key=OPENAI_API_KEY)
|
|
|
|
result = client.images.edit(
|
|
model="placeholder",
|
|
image=[image_bytestream],
|
|
prompt=message,
|
|
size="1024x1024",
|
|
)
|
|
|
|
image_base64 = result.data[0].b64_json
|
|
image_bytes = base64.b64decode(image_base64)
|
|
|
|
# Save the image to a file
|
|
edited_image_data = BytesIO(image_bytes)
|
|
send_img = discord.File(edited_image_data, filename="image.png")
|
|
await ctx.send(file=send_img)
|
|
|
|
|
|
async def handle_chat(ctx, *, message: str, payload: dict, response_prefix: str):
|
|
# Check if API key is set
|
|
if not OPENAI_API_KEY:
|
|
await ctx.send(
|
|
"Error: OpenAI API key is not configured. Please set the OPENAI_API_KEY environment variable."
|
|
)
|
|
return
|
|
|
|
# Set headers
|
|
headers = {
|
|
"Authorization": f"Bearer {OPENAI_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
# Get database instance
|
|
db = get_database()
|
|
|
|
# Get conversation context using RAG
|
|
context = db.get_conversation_context(
|
|
user_id=str(ctx.author.id), current_message=message, max_context=5
|
|
)
|
|
|
|
if context:
|
|
payload["messages"][0][
|
|
"content"
|
|
] += f"\n\nRelevant conversation history:\n{context}"
|
|
|
|
payload["messages"][1]["content"] = message
|
|
|
|
print(payload)
|
|
|
|
try:
|
|
# Send request to OpenAI API
|
|
response = requests.post(
|
|
OPENAI_COMPLETIONS_URL, json=payload, headers=headers, timeout=300
|
|
)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
|
|
# Extract the generated text
|
|
generated_text = result["choices"][0]["message"]["content"].strip()
|
|
|
|
# Store both user message and bot response in the database
|
|
db.add_message(
|
|
message_id=f"{ctx.message.id}",
|
|
user_id=str(ctx.author.id),
|
|
username=ctx.author.name,
|
|
content=f"User: {message}",
|
|
channel_id=str(ctx.channel.id),
|
|
guild_id=str(ctx.guild.id) if ctx.guild else None,
|
|
)
|
|
|
|
db.add_message(
|
|
message_id=f"{ctx.message.id}_response",
|
|
user_id=str(bot.user.id),
|
|
username=bot.user.name,
|
|
content=f"Bot: {generated_text}",
|
|
channel_id=str(ctx.channel.id),
|
|
guild_id=str(ctx.guild.id) if ctx.guild else None,
|
|
)
|
|
|
|
# Send the response back to the chat
|
|
await ctx.send(response_prefix)
|
|
while generated_text:
|
|
send_chunk = generated_text[:1000]
|
|
generated_text = generated_text[1000:]
|
|
await ctx.send(send_chunk)
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
await ctx.send(f"Error: OpenAI API error - {e}")
|
|
except requests.exceptions.Timeout:
|
|
await ctx.send("Error: Request timed out. Please try again.")
|
|
except Exception as e:
|
|
await ctx.send(f"Error: {str(e)}")
|
|
|
|
|
|
async def call_llm(ctx, payload: dict) -> str:
|
|
# Check if API key is set
|
|
if not OPENAI_API_KEY:
|
|
await ctx.send(
|
|
"Error: OpenAI API key is not configured. Please set the OPENAI_API_KEY environment variable."
|
|
)
|
|
return ""
|
|
|
|
# Set headers
|
|
headers = {
|
|
"Authorization": f"Bearer {OPENAI_API_KEY}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
try:
|
|
# Send request to OpenAI API
|
|
response = requests.post(
|
|
OPENAI_COMPLETIONS_URL, json=payload, headers=headers, timeout=300
|
|
)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
|
|
# Extract the generated text
|
|
generated_text = result["choices"][0]["message"]["content"].strip()
|
|
print(generated_text)
|
|
|
|
return generated_text
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
await ctx.send(f"Error: OpenAI API error - {e}")
|
|
except requests.exceptions.Timeout:
|
|
await ctx.send("Error: Request timed out. Please try again.")
|
|
except Exception as e:
|
|
await ctx.send(f"Error: {str(e)}")
|
|
return ""
|
|
|
|
|
|
# Run the bot
|
|
if __name__ == "__main__":
|
|
bot.run(DISCORD_TOKEN)
|