init agentic server monitor scripts
All checks were successful
Podman DDNS Image / build-and-push-ddns (push) Successful in 1m4s

This commit is contained in:
2026-04-06 11:46:51 -04:00
parent 3bc92c5889
commit f359a64218
2 changed files with 145 additions and 3 deletions

View File

@@ -0,0 +1,141 @@
import os
import subprocess
from functools import wraps
from typing import Callable
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
def make_verbose(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
print("==========")
print(f"Calling {func.__name__} with params ({', '.join(args)}) and ({kwargs})")
result = func(*args, **kwargs)
print("==========")
return result
return wrapper
def run_command(command: list[str]) -> str:
"""Runs a command with subprocess.run and returns the stdout, stderr in a single string"""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdouts: list[str] = []
stderrs: list[str] = []
if process.stdout:
print("**STDOUT**")
for line in process.stdout:
line = line.strip()
print(line)
stdouts.append(line)
if process.stderr:
print("**STDERR**")
for line in process.stderr:
line = line.strip()
print(line)
stderrs.append(line)
output = f"**STDOUT**\n{'\n'.join(stdouts)}\n\n**STDERR**{'\n'.join(stderrs)}"
return output
@make_verbose
def get_fstab() -> str:
"""Retruns the"""
output = run_command(["ssh", "driveripper", "virsh", "list", "--name"])
return output
@make_verbose
def check_vm_type(server_name: str) -> str:
"""Returns various information about a given server like OS and version. server_name should be a server from list_vms."""
output = run_command(["ssh", f"{server_name}-root", "cat", "/etc/*-release"])
return output
@make_verbose
def get_updates_fedora(server_name: str) -> str:
"""Check for updates for a given Fedora server"""
output = run_command(["ssh", f"{server_name}-root"])
return output
@make_verbose
def get_security_updates_fedora(server_name: str) -> str:
"""Checks only for security updates for a given Fedora server"""
output = run_command(
["ssh", f"{server_name}-root", "dnf", "check-update", "--security"]
)
return output
@make_verbose
def perform_security_updates_fedora(server_name: str) -> str:
"""Applies security updates for a given Fedora server"""
output = run_command(
["ssh", f"{server_name}-root", "dnf", "update", "--security", "-y"]
)
return output
@make_verbose
def perform_security_updates_ubuntu(server_name: str) -> str:
"""Applies security updates for a given Ubuntu server"""
output = run_command(["ssh", f"{server_name}-root", "apt", "update", "-y"])
return output
def get_api_key() -> str:
return os.getenv("OPENAI_API_KEY", "placeholder")
if __name__ == "__main__":
# Run the agent
llm = ChatOpenAI(
model="instruct",
base_url="https://llama-instruct.reeselink.com",
api_key=get_api_key,
temperature=0.7,
timeout=30,
max_retries=2,
verbose=True,
top_p=1,
)
agent = create_agent(
model=llm,
tools=[
list_vms,
check_vm_type,
get_updates_fedora,
get_security_updates_fedora,
perform_security_updates_fedora,
perform_security_updates_ubuntu,
],
system_prompt="You are a helpful assistant",
)
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": (
"List all the available servers. Then, for each server, check the "
"server's OS and use the appropriate update check tool to check for "
"security updates. If any server needs security updates, apply them with "
"the appropriate update tool. Finally, provide a brief summary of what "
"you did."
),
}
]
}
)
print(result["messages"][-1].content)

View File

@@ -100,13 +100,14 @@ def get_api_key() -> str:
if __name__ == "__main__":
# Run the agent
llm = ChatOpenAI(
model="qwen3.5-35b-a3b",
base_url="https://llama-cpp.reeselink.com",
model="instruct",
base_url="https://llama-instruct.reeselink.com",
api_key=get_api_key,
temperature=0.95,
temperature=0.7,
timeout=30,
max_retries=2,
verbose=True,
top_p=1,
)
agent = create_agent(