333 lines
10 KiB
Python
333 lines
10 KiB
Python
import json
|
|
import os
|
|
import smtplib
|
|
import ssl
|
|
import subprocess
|
|
from email.message import EmailMessage
|
|
from pathlib import Path
|
|
from typing import Iterable, TypedDict, cast
|
|
|
|
from dotenv import dotenv_values
|
|
from openai import OpenAI
|
|
from openai.types.chat import ChatCompletionMessageParam, ChatCompletionToolUnionParam
|
|
|
|
|
|
class AWS_SES_DOTENV(TypedDict):
|
|
USER: str
|
|
PASSWORD: str
|
|
ENDPOINT: str
|
|
TLS_PORT: str
|
|
SENDER: str
|
|
RECEIVER: str
|
|
|
|
|
|
class ToolCallController:
|
|
def __init__(self, max_tool_calls=10):
|
|
self.max_tool_calls = max_tool_calls
|
|
self.tool_call_count = 0
|
|
|
|
def is_tool_call_allowed(self):
|
|
return self.tool_call_count < self.max_tool_calls
|
|
|
|
def increment(self):
|
|
self.tool_call_count += 1
|
|
|
|
def reset(self):
|
|
self.tool_call_count = 0
|
|
|
|
|
|
# Register tools
|
|
tools: Iterable[ChatCompletionToolUnionParam] = [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "list_btrfs_pools",
|
|
"description": "List the btrfs pools on this system. Returns a list like ['/btrfs/pool0', '/btrfs/backup0', etc.]",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {"server_id": {"type": "string", "enum": ["all"]}},
|
|
"required": ["server_id"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "btrfs_device_stats",
|
|
"description": "Runs `btrfs device stats` on a given pool.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"pool_path": {
|
|
"type": "string",
|
|
}
|
|
},
|
|
"required": ["pool_path"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "btrfs_filesystem_show",
|
|
"description": "Runs `btrfs filesystem show` on a given pool.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"pool_path": {
|
|
"type": "string",
|
|
}
|
|
},
|
|
"required": ["pool_path"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "btrfs_scrub_start",
|
|
"description": "Runs `btrfs scrub start` on a given pool.",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"pool_path": {
|
|
"type": "string",
|
|
}
|
|
},
|
|
"required": ["pool_path"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "lsblk_filesystems",
|
|
"description": (
|
|
"Lists current filesystems on this server. Can be used to translate a "
|
|
"luks ID to a device. Example: luks-pool0-1c9a755a-2a81-4a2f-9bd7-6b6d7caaa523 "
|
|
"translates to /dev/sdb because /dev/sdb is listed below that filesystem. "
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {"server_id": {"type": "string", "enum": ["all"]}},
|
|
"required": ["server_id"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "smartctl",
|
|
"description": (
|
|
"Runs `smartctl -a` on a given device. `device_path` should be "
|
|
"a device at /dev. Example:`/dev/sda` or `/dev/nvme0n1`."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"device_path": {
|
|
"type": "string",
|
|
}
|
|
},
|
|
"required": ["device_path"],
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "alert_user",
|
|
"description": (
|
|
"Sends the provided message to the user via a messaging service."
|
|
),
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"message": {
|
|
"type": "string",
|
|
}
|
|
},
|
|
"required": ["message"],
|
|
},
|
|
},
|
|
},
|
|
]
|
|
|
|
|
|
def list_btrfs_pools(server_id: str) -> list[str]:
|
|
btrfs_path = Path("/btrfs")
|
|
pools = btrfs_path.iterdir()
|
|
list_pools = list(map(lambda item: str(item), pools))
|
|
return list_pools
|
|
|
|
|
|
def btrfs_device_stats(pool_path: str) -> str:
|
|
command_result = subprocess.run(
|
|
["btrfs", "device", "stats", pool_path], capture_output=True
|
|
)
|
|
stdout = command_result.stdout.decode()
|
|
stderr = command_result.stderr.decode()
|
|
output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}"
|
|
return output
|
|
|
|
|
|
def btrfs_filesystem_show(pool_path: str) -> str:
|
|
command_result = subprocess.run(
|
|
["btrfs", "filesystem", "show", pool_path], capture_output=True
|
|
)
|
|
stdout = command_result.stdout.decode()
|
|
stderr = command_result.stderr.decode()
|
|
output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}"
|
|
return output
|
|
|
|
|
|
def btrfs_scrub_start(pool_path: str) -> str:
|
|
command_result = subprocess.run(
|
|
["btrfs", "scrub", "start", pool_path], capture_output=True
|
|
)
|
|
stdout = command_result.stdout.decode()
|
|
stderr = command_result.stderr.decode()
|
|
output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}"
|
|
return output
|
|
|
|
|
|
def lsblk_filesystems(server_id: str) -> str:
|
|
command_result = subprocess.run(["lsblk", "-fs"], capture_output=True)
|
|
stdout = command_result.stdout.decode()
|
|
stderr = command_result.stderr.decode()
|
|
output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}"
|
|
return output
|
|
|
|
|
|
def smartctl(device_path: str) -> str:
|
|
command_result = subprocess.run(
|
|
["smartctl", "-a", device_path], capture_output=True
|
|
)
|
|
stdout = command_result.stdout.decode()
|
|
stderr = command_result.stderr.decode()
|
|
output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}"
|
|
return output
|
|
|
|
|
|
def load_ses_creds() -> AWS_SES_DOTENV:
|
|
ses_dotenv_location = Path(os.getenv("HOME", "/root"), ".env/aws_ses")
|
|
print(f"Loading env from {ses_dotenv_location}")
|
|
raw_values = dotenv_values(ses_dotenv_location)
|
|
if raw_values:
|
|
aws_ses_config = cast(AWS_SES_DOTENV, raw_values)
|
|
print(f"AWS SES Credentials loaded: {aws_ses_config}")
|
|
return aws_ses_config
|
|
print("No email credentials supplied. Exiting.")
|
|
exit(1)
|
|
|
|
|
|
def alert_user(message: str) -> str:
|
|
ses_config = load_ses_creds()
|
|
port = int(ses_config["TLS_PORT"])
|
|
user = ses_config["USER"]
|
|
password = ses_config["PASSWORD"]
|
|
sender = ses_config["SENDER"]
|
|
receiver = ses_config["RECEIVER"]
|
|
|
|
# Create a secure SSL context
|
|
context = ssl.create_default_context()
|
|
|
|
msg = EmailMessage()
|
|
msg["Subject"] = "Agent Disk Report"
|
|
msg["From"] = sender
|
|
msg["To"] = receiver
|
|
msg.set_content(message)
|
|
|
|
with smtplib.SMTP_SSL(ses_config["ENDPOINT"], port, context=context) as server:
|
|
server.login(user, password)
|
|
result = server.send_message(msg)
|
|
print(result)
|
|
|
|
return "Sent."
|
|
|
|
|
|
def execute_tool(tool_name, arguments):
|
|
if tool_name == "list_btrfs_pools":
|
|
return list_btrfs_pools(**arguments)
|
|
elif tool_name == "btrfs_device_stats":
|
|
return btrfs_device_stats(**arguments)
|
|
elif tool_name == "btrfs_filesystem_show":
|
|
return btrfs_filesystem_show(**arguments)
|
|
elif tool_name == "btrfs_scrub_start":
|
|
return btrfs_scrub_start(**arguments)
|
|
elif tool_name == "lsblk_filesystems":
|
|
return lsblk_filesystems(**arguments)
|
|
elif tool_name == "smartctl":
|
|
return smartctl(**arguments)
|
|
elif tool_name == "alert_user":
|
|
return alert_user(**arguments)
|
|
raise ValueError(f"Unknown tool: {tool_name}")
|
|
|
|
|
|
def run_conversation(user_message: str, max_tool_calls=10):
|
|
print("Processing initial message")
|
|
controller = ToolCallController(max_tool_calls=max_tool_calls)
|
|
messages: Iterable[ChatCompletionMessageParam] = [
|
|
{
|
|
"role": "system",
|
|
"content": "You are a system administrator with access to a variety of administrator tools.",
|
|
}
|
|
]
|
|
|
|
messages.append({"role": "user", "content": user_message})
|
|
|
|
while True:
|
|
if not controller.is_tool_call_allowed():
|
|
messages.append(
|
|
{
|
|
"role": "user",
|
|
"content": "You've reached the maximum number of tool calls. Please summarize based on available information.",
|
|
}
|
|
)
|
|
break
|
|
|
|
response = client.chat.completions.create(
|
|
model="qwen3.5-35b-a3b", messages=messages, tools=tools, tool_choice="auto"
|
|
)
|
|
|
|
message = response.choices[0].message
|
|
print(f"\n{message}\n")
|
|
messages.append(message)
|
|
|
|
if message.tool_calls:
|
|
for tool_call in message.tool_calls:
|
|
controller.increment()
|
|
tool_name = tool_call.function.name
|
|
arguments = json.loads(tool_call.function.arguments)
|
|
print(f"Attempting to call {tool_name} with arguments {arguments}...")
|
|
|
|
# Give the user a chance to stop a problem before it starts
|
|
keep_going = input("Continue? (Y/n) ")
|
|
if keep_going.lower() == "n":
|
|
exit(1)
|
|
|
|
result = execute_tool(tool_name, arguments)
|
|
|
|
messages.append(
|
|
{
|
|
"role": "tool",
|
|
"tool_call_id": tool_call.id,
|
|
"content": str(result),
|
|
}
|
|
)
|
|
else:
|
|
break
|
|
|
|
|
|
if __name__ == "__main__":
|
|
client = OpenAI(base_url="https://llama-cpp.reeselink.com", api_key="")
|
|
# Example usage
|
|
print(
|
|
run_conversation(
|
|
"Check the btrfs pools on this system. Take the appropriate action if any pools aren't "
|
|
"healthy. Don't run scrubs unless necessary. Also check the btrfs pool space and report "
|
|
"if any are getting full. At the very end of performing your checks send a single, "
|
|
"concise message to the user explaining what you did and what concerns you might have."
|
|
)
|
|
)
|