import json import os import smtplib import ssl import subprocess from email.message import EmailMessage from pathlib import Path from typing import Iterable, TypedDict, cast from dotenv import dotenv_values from openai import OpenAI from openai.types.chat import ChatCompletionMessageParam, ChatCompletionToolUnionParam class AWS_SES_DOTENV(TypedDict): USER: str PASSWORD: str ENDPOINT: str TLS_PORT: str SENDER: str RECEIVER: str class ToolCallController: def __init__(self, max_tool_calls=10): self.max_tool_calls = max_tool_calls self.tool_call_count = 0 def is_tool_call_allowed(self): return self.tool_call_count < self.max_tool_calls def increment(self): self.tool_call_count += 1 def reset(self): self.tool_call_count = 0 # Register tools tools: Iterable[ChatCompletionToolUnionParam] = [ { "type": "function", "function": { "name": "list_btrfs_pools", "description": "List the btrfs pools on this system. Returns a list like ['/btrfs/pool0', '/btrfs/backup0', etc.]", "parameters": { "type": "object", "properties": {"server_id": {"type": "string", "enum": ["all"]}}, "required": ["server_id"], }, }, }, { "type": "function", "function": { "name": "btrfs_device_stats", "description": "Runs `btrfs device stats` on a given pool.", "parameters": { "type": "object", "properties": { "pool_path": { "type": "string", } }, "required": ["pool_path"], }, }, }, { "type": "function", "function": { "name": "btrfs_filesystem_show", "description": "Runs `btrfs filesystem show` on a given pool.", "parameters": { "type": "object", "properties": { "pool_path": { "type": "string", } }, "required": ["pool_path"], }, }, }, { "type": "function", "function": { "name": "btrfs_scrub_start", "description": "Runs `btrfs scrub start` on a given pool.", "parameters": { "type": "object", "properties": { "pool_path": { "type": "string", } }, "required": ["pool_path"], }, }, }, { "type": "function", "function": { "name": "lsblk_filesystems", "description": ( "Lists current filesystems on this server. Can be used to translate a " "luks ID to a device. Example: luks-pool0-1c9a755a-2a81-4a2f-9bd7-6b6d7caaa523 " "translates to /dev/sdb because /dev/sdb is listed below that filesystem. " ), "parameters": { "type": "object", "properties": {"server_id": {"type": "string", "enum": ["all"]}}, "required": ["server_id"], }, }, }, { "type": "function", "function": { "name": "smartctl", "description": ( "Runs `smartctl -a` on a given device. `device_path` should be " "a device at /dev. Example:`/dev/sda` or `/dev/nvme0n1`." ), "parameters": { "type": "object", "properties": { "device_path": { "type": "string", } }, "required": ["device_path"], }, }, }, { "type": "function", "function": { "name": "alert_user", "description": ( "Sends the provided message to the user via a messaging service." ), "parameters": { "type": "object", "properties": { "message": { "type": "string", } }, "required": ["message"], }, }, }, ] def list_btrfs_pools(server_id: str) -> list[str]: btrfs_path = Path("/btrfs") pools = btrfs_path.iterdir() list_pools = list(map(lambda item: str(item), pools)) return list_pools def btrfs_device_stats(pool_path: str) -> str: command_result = subprocess.run( ["btrfs", "device", "stats", pool_path], capture_output=True ) stdout = command_result.stdout.decode() stderr = command_result.stderr.decode() output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}" return output def btrfs_filesystem_show(pool_path: str) -> str: command_result = subprocess.run( ["btrfs", "filesystem", "show", pool_path], capture_output=True ) stdout = command_result.stdout.decode() stderr = command_result.stderr.decode() output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}" return output def btrfs_scrub_start(pool_path: str) -> str: command_result = subprocess.run( ["btrfs", "scrub", "start", pool_path], capture_output=True ) stdout = command_result.stdout.decode() stderr = command_result.stderr.decode() output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}" return output def lsblk_filesystems(server_id: str) -> str: command_result = subprocess.run(["lsblk", "-fs"], capture_output=True) stdout = command_result.stdout.decode() stderr = command_result.stderr.decode() output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}" return output def smartctl(device_path: str) -> str: command_result = subprocess.run( ["smartctl", "-a", device_path], capture_output=True ) stdout = command_result.stdout.decode() stderr = command_result.stderr.decode() output = f"**STDOUT**\n{stdout}\n\n**STDERR**{stderr}" return output def load_ses_creds() -> AWS_SES_DOTENV: ses_dotenv_location = Path(os.getenv("HOME", "/root"), ".env/aws_ses") print(f"Loading env from {ses_dotenv_location}") raw_values = dotenv_values(ses_dotenv_location) if raw_values: aws_ses_config = cast(AWS_SES_DOTENV, raw_values) # print(f"AWS SES Credentials loaded: {aws_ses_config}") return aws_ses_config print("No email credentials supplied. Exiting.") exit(1) def alert_user(message: str) -> str: ses_config = load_ses_creds() port = int(ses_config["TLS_PORT"]) user = ses_config["USER"] password = ses_config["PASSWORD"] sender = ses_config["SENDER"] receiver = ses_config["RECEIVER"] # Create a secure SSL context context = ssl.create_default_context() msg = EmailMessage() msg["Subject"] = "Agent Disk Report" msg["From"] = sender msg["To"] = receiver msg.set_content(message) with smtplib.SMTP_SSL(ses_config["ENDPOINT"], port, context=context) as server: server.login(user, password) result = server.send_message(msg) print(result) return "Sent." def execute_tool(tool_name, arguments): if tool_name == "list_btrfs_pools": return list_btrfs_pools(**arguments) elif tool_name == "btrfs_device_stats": return btrfs_device_stats(**arguments) elif tool_name == "btrfs_filesystem_show": return btrfs_filesystem_show(**arguments) elif tool_name == "btrfs_scrub_start": return btrfs_scrub_start(**arguments) elif tool_name == "lsblk_filesystems": return lsblk_filesystems(**arguments) elif tool_name == "smartctl": return smartctl(**arguments) elif tool_name == "alert_user": return alert_user(**arguments) raise ValueError(f"Unknown tool: {tool_name}") def run_conversation(user_message: str, max_tool_calls=10): print("Processing initial message") controller = ToolCallController(max_tool_calls=max_tool_calls) messages: Iterable[ChatCompletionMessageParam] = [ { "role": "system", "content": "You are a system administrator with access to a variety of administrator tools.", } ] messages.append({"role": "user", "content": user_message}) while True: if not controller.is_tool_call_allowed(): messages.append( { "role": "user", "content": "You've reached the maximum number of tool calls. Please summarize based on available information.", } ) break response = client.chat.completions.create( model="qwen3.5-35b-a3b", messages=messages, tools=tools, tool_choice="auto" ) message = response.choices[0].message print(f"\n{message}\n") messages.append(message) if message.tool_calls: for tool_call in message.tool_calls: controller.increment() tool_name = tool_call.function.name arguments = json.loads(tool_call.function.arguments) print(f"Attempting to call {tool_name} with arguments {arguments}...") # Give the user a chance to stop a problem before it starts # keep_going = input("Continue? (Y/n) ") # if keep_going.lower() == "n": # exit(1) result = execute_tool(tool_name, arguments) messages.append( { "role": "tool", "tool_call_id": tool_call.id, "content": str(result), } ) else: break if __name__ == "__main__": client = OpenAI(base_url="https://llama-cpp.reeselink.com", api_key="") # Example usage print( run_conversation( "Check the btrfs pools on this system. Take the appropriate action if any pools aren't " "healthy. Don't run scrubs unless necessary. Also check the btrfs pool space and report " "if any are getting full. At the very end of performing your checks send a single, " "concise message to the user explaining what you did and what concerns you might have." ) )