checkpoint commit
All checks were successful
Podman DDNS Image / build-and-push-ddns (push) Successful in 1m3s

This commit is contained in:
2026-05-05 06:26:40 -04:00
parent e43c534ceb
commit f2015e2c71
76 changed files with 4265 additions and 235 deletions

View File

@@ -0,0 +1,103 @@
import json
import os
from typing import TypedDict
import requests
UNIFI_API_ENDPOINT = os.getenv("UNIFI_API_ENDPOINT", "https://192.168.1.1")
UNIFI_SITE_NAME = os.getenv("UNIFI_SITE_NAME", "Default")
UNIFI_API_KEY = os.getenv("UNIFI_API_KEY")
if not UNIFI_API_KEY:
try:
with open("active/device_unifi/secrets/api-key", "r") as f:
UNIFI_API_KEY = f.read()
except (FileNotFoundError, PermissionError) as e:
print(e)
print("UNIFI_API_KEY required.")
type uuid_type = str
class UnifiSite(TypedDict):
id: uuid_type
internalReference: str
name: str
class ApiPaths:
@classmethod
def list_sites(cls) -> str:
return "/proxy/network/integration/v1/sites"
@classmethod
def list_records(cls, site_id: uuid_type) -> str:
return f"/proxy/network/integration/v1/sites/{site_id}/dns/policies"
@classmethod
def create_record(cls, site_id: uuid_type) -> str:
return f"/proxy/network/integration/v1/sites/{site_id}/dns/policies"
@classmethod
def update_record(cls, site_id: uuid_type, record_id: uuid_type) -> str:
return f"/proxy/network/integration/v1/sites/{site_id}/dns/policies/{record_id}"
class ApiWrapper:
@classmethod
def api_get(cls, path: str):
return requests.get(
f"{UNIFI_API_ENDPOINT}{path}",
headers={"X-API-Key": UNIFI_API_KEY},
verify=False,
).json()
@classmethod
def api_put(cls, path: str, body: dict):
return requests.put(
f"{UNIFI_API_ENDPOINT}{path}",
headers={"X-API-Key": UNIFI_API_KEY},
verify=False,
json=json.dumps(body),
).json()
class ApiHelperMethods:
@classmethod
def site_name_to_id(cls, site_name: str) -> uuid_type:
results = ApiWrapper.api_get(ApiPaths.list_sites())
data: list[UnifiSite] = results.get("data")
if not data:
print("No sites found")
exit(1)
filtered_sites: list[UnifiSite] = list(
filter(lambda data_item: data_item["name"] == site_name, data)
)
if not filtered_sites:
print("Site with that name not found")
exit(1)
site_id = filtered_sites[0]["id"]
return site_id
# @classmethod
# def upsert_dns_ipv4_record(cls, record_name: str, ipv4_addr: str) -> uuid_type:
@classmethod
def dns_record_exists(cls, record_name: str) -> bool:
site_id = ApiHelperMethods.site_name_to_id(UNIFI_SITE_NAME)
record_id = ApiWrapper.api_get(path=ApiPaths.list_records(site_id))
matched_dns_records = list(
filter(
lambda record_item: record_item["domain"] == record_name,
record_id["data"],
)
)
if matched_dns_records:
return True
return False
if __name__ == "__main__":
import sys
site_name = sys.argv[1]