347 lines
12 KiB
Python
347 lines
12 KiB
Python
"""
|
|
DDNS service that updates Route53 and UniFi DNS policies.
|
|
|
|
Exported env vars for Route53:
|
|
RECORDS_FILE - path to YAML file with all DNS records
|
|
ROUTE53_HOSTED_ZONE_ID - AWS Route53 hosted zone ID (single zone supported)
|
|
GLOBAL_SKIP_IPV4 - skip IPv4 updates globally
|
|
GLOBAL_SKIP_IPV6 - skip IPv6 updates globally
|
|
LOG_LEVEL - logging level (DEBUG, INFO, WARNING, ERROR; default: INFO)
|
|
|
|
Exported env vars for UniFi:
|
|
UNIFI_HOST - UniFi controller URL (e.g., https://unifi.local:8443)
|
|
UNIFI_SITE_ID - UniFi site ID
|
|
UNIFI_API_TOKEN - UniFi API token
|
|
UNIFI_VERIFY_SSL - verify SSL certificates (default: false)
|
|
|
|
Exported env vars for debugging:
|
|
DEBUG - if true, starts debugpy and waits for a remote debugger connection on port 5678
|
|
"""
|
|
|
|
import os
|
|
|
|
DEBUG = os.getenv("DEBUG", "false").lower() == "true"
|
|
if DEBUG:
|
|
import debugpy
|
|
|
|
debugpy.listen(("0.0.0.0", 5678))
|
|
print("DEBUG: debugpy listening on 0.0.0.0:5678, waiting for debugger to attach...")
|
|
debugpy.wait_for_client()
|
|
print("DEBUG: debugger attached, resuming execution...")
|
|
|
|
import logging
|
|
import subprocess
|
|
import sys
|
|
from typing import Literal, TypedDict
|
|
|
|
import requests
|
|
import yaml
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
from route53_update import update_ipv4 as route53_update_ipv4, update_ipv6 as route53_update_ipv6
|
|
from unifi_update import UnifiConfig, update_records as unifi_update_records
|
|
|
|
try:
|
|
from yaml import CLoader as Loader
|
|
except ImportError:
|
|
from yaml import Loader # type: ignore
|
|
|
|
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
|
|
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(log_level)
|
|
|
|
RECORDS_FILE = os.getenv("RECORDS_FILE")
|
|
ROUTE53_HOSTED_ZONE_ID = os.getenv("ROUTE53_HOSTED_ZONE_ID")
|
|
GLOBAL_SKIP_IPV4 = os.getenv("GLOBAL_SKIP_IPV4", "false").lower() == "true"
|
|
GLOBAL_SKIP_IPV6 = os.getenv("GLOBAL_SKIP_IPV6", "false").lower() == "true"
|
|
|
|
UNIFI_HOST = os.getenv("UNIFI_HOST")
|
|
UNIFI_SITE_ID = os.getenv("UNIFI_SITE_ID")
|
|
UNIFI_API_TOKEN = os.getenv("UNIFI_API_TOKEN")
|
|
UNIFI_VERIFY_SSL = os.getenv("UNIFI_VERIFY_SSL", "false").lower() == "true"
|
|
|
|
NTFY_URL = os.getenv("NTFY_URL", "")
|
|
NTFY_TOPIC = os.getenv("NTFY_TOPIC", "")
|
|
NTFY_API_KEY = os.getenv("NTFY_API_KEY", "")
|
|
|
|
|
|
class Route53RecordType(TypedDict):
|
|
record: str
|
|
provider: Literal["route53"]
|
|
skip_ipv4: bool | None
|
|
skip_ipv6: bool | None
|
|
ttl_seconds: int | None
|
|
|
|
|
|
class UnifiRecordType(TypedDict):
|
|
record: str
|
|
provider: Literal["unifi"]
|
|
skip_ipv4: bool | None
|
|
skip_ipv6: bool | None
|
|
ttl_seconds: int | None
|
|
|
|
|
|
class RecordYamlStruct(TypedDict):
|
|
records: list[Route53RecordType | UnifiRecordType]
|
|
|
|
|
|
def send_ntfy_notification(title: str, message: str, priority: int = 3) -> None:
|
|
if not NTFY_URL or not NTFY_TOPIC:
|
|
return
|
|
try:
|
|
headers = {
|
|
"Title": title,
|
|
"Priority": str(priority),
|
|
}
|
|
if NTFY_API_KEY:
|
|
headers["Authorization"] = f"Bearer {NTFY_API_KEY}"
|
|
logger.info("Sending NTFY notification: %s", title)
|
|
response = requests.post(
|
|
f"{NTFY_URL}/{NTFY_TOPIC}",
|
|
data=message.encode(),
|
|
headers=headers,
|
|
timeout=10,
|
|
)
|
|
response.raise_for_status()
|
|
logger.info("NTFY notification sent: %s", title)
|
|
except Exception as e:
|
|
logger.warning("Failed to send NTFY notification: %s", e)
|
|
|
|
|
|
def get_ipv4() -> str:
|
|
logger.debug("Executing: curl -4 ifconfig.me")
|
|
result = subprocess.run(["curl", "-4", "ifconfig.me"], capture_output=True)
|
|
ip = result.stdout.decode()
|
|
logger.debug("IPv4 response: %s", ip.strip())
|
|
return ip
|
|
|
|
|
|
def get_ipv6() -> str:
|
|
logger.debug("Executing: curl -6 ifconfig.me")
|
|
result = subprocess.run(["curl", "-6", "ifconfig.me"], capture_output=True)
|
|
ip = result.stdout.decode()
|
|
logger.debug("IPv6 response: %s", ip.strip())
|
|
return ip
|
|
|
|
|
|
def _update_route53_records(
|
|
records: list[Route53RecordType],
|
|
public_ipv4: str | None,
|
|
public_ipv6: str | None,
|
|
) -> tuple[set[str], set[str]]:
|
|
logger.info("Processing %d Route53 record(s)", len(records))
|
|
updated_ipv4: set[str] = set()
|
|
updated_ipv6: set[str] = set()
|
|
|
|
for record in records:
|
|
logger.info(
|
|
"=== Processing Route53 record: %s (hosted_zone_id=%s) ===",
|
|
record["record"],
|
|
ROUTE53_HOSTED_ZONE_ID,
|
|
)
|
|
|
|
if record.get("skip_ipv4"):
|
|
logger.info("Skipping IPv4 for %s (skip_ipv4=true)", record["record"])
|
|
elif GLOBAL_SKIP_IPV4 or not public_ipv4:
|
|
logger.info("Skipping IPv4 for %s (global skip or no IPv4 available)", record["record"])
|
|
else:
|
|
logger.info("Updating IPv4 for %s -> %s", record["record"], public_ipv4)
|
|
changed = route53_update_ipv4(
|
|
hosted_zone_id=ROUTE53_HOSTED_ZONE_ID, # type: ignore[arg-type]
|
|
record=record["record"],
|
|
public_ipv4=public_ipv4,
|
|
)
|
|
if changed:
|
|
updated_ipv4.add(record["record"])
|
|
|
|
if record.get("skip_ipv6"):
|
|
logger.info("Skipping IPv6 for %s (skip_ipv6=true)", record["record"])
|
|
elif GLOBAL_SKIP_IPV6 or not public_ipv6:
|
|
logger.info("Skipping IPv6 for %s (global skip or no IPv6 available)", record["record"])
|
|
else:
|
|
logger.info("Updating IPv6 for %s -> %s", record["record"], public_ipv6)
|
|
changed = route53_update_ipv6(
|
|
hosted_zone_id=ROUTE53_HOSTED_ZONE_ID, # type: ignore[arg-type]
|
|
record=record["record"],
|
|
public_ipv6=public_ipv6,
|
|
)
|
|
if changed:
|
|
updated_ipv6.add(record["record"])
|
|
|
|
logger.info("=== Done processing Route53 record: %s ===", record["record"])
|
|
|
|
return updated_ipv4, updated_ipv6
|
|
|
|
|
|
def _update_unifi_records(
|
|
unifi_config: UnifiConfig,
|
|
public_ipv4: str | None,
|
|
public_ipv6: str | None,
|
|
) -> tuple[set[str], set[str]]:
|
|
logger.info("Processing %d UniFi record(s)", len(unifi_config["records"]))
|
|
updated_ipv4, updated_ipv6 = unifi_update_records(
|
|
unifi_config=unifi_config,
|
|
ipv4=public_ipv4,
|
|
ipv6=public_ipv6,
|
|
)
|
|
return updated_ipv4, updated_ipv6
|
|
|
|
|
|
def main() -> None:
|
|
logger.info("=== DDNS Update Starting ===")
|
|
logger.debug("Log level: %s", log_level)
|
|
logger.debug("RECORDS_FILE: %s", RECORDS_FILE)
|
|
logger.debug("ROUTE53_HOSTED_ZONE_ID: %s", ROUTE53_HOSTED_ZONE_ID)
|
|
logger.debug("GLOBAL_SKIP_IPV4: %s", GLOBAL_SKIP_IPV4)
|
|
logger.debug("GLOBAL_SKIP_IPV6: %s", GLOBAL_SKIP_IPV6)
|
|
logger.debug("UNIFI_HOST: %s", UNIFI_HOST)
|
|
logger.debug("UNIFI_SITE_ID: %s", UNIFI_SITE_ID)
|
|
logger.debug("UNIFI_API_TOKEN: %s", "****" if UNIFI_API_TOKEN else "not set")
|
|
logger.debug("UNIFI_VERIFY_SSL: %s", UNIFI_VERIFY_SSL)
|
|
|
|
if not RECORDS_FILE:
|
|
logger.error("RECORDS_FILE env var not found!")
|
|
exit(1)
|
|
|
|
logger.info("Loading records file: %s", RECORDS_FILE)
|
|
try:
|
|
with open(RECORDS_FILE) as f:
|
|
records_file_contents: RecordYamlStruct = yaml.load(f, Loader)
|
|
logger.debug("Loaded %d record(s) from YAML", len(records_file_contents["records"]))
|
|
except FileNotFoundError as e:
|
|
logger.error("Records file not found: %s", e)
|
|
sys.exit(1)
|
|
|
|
route53_records: list[Route53RecordType] = []
|
|
unifi_records: list[UnifiRecordType] = []
|
|
|
|
for record in records_file_contents["records"]:
|
|
provider = record.get("provider", "route53")
|
|
logger.debug("Categorizing record %s as provider=%s", record["record"], provider)
|
|
if provider == "unifi":
|
|
unifi_records.append(record) # type: ignore[arg-type]
|
|
else:
|
|
route53_records.append(record) # type: ignore[arg-type]
|
|
|
|
logger.info("Found %d Route53 record(s) and %d UniFi record(s)", len(route53_records), len(unifi_records))
|
|
|
|
if GLOBAL_SKIP_IPV4:
|
|
public_ipv4 = None
|
|
logger.warning("Globally skipping IPv4.")
|
|
else:
|
|
logger.info("Fetching public IPv4 address from ifconfig.me")
|
|
public_ipv4 = get_ipv4()
|
|
if not public_ipv4:
|
|
logger.error("Public IPv4 not found.")
|
|
exit(1)
|
|
logger.info("Public IPv4 is %s", public_ipv4)
|
|
|
|
if GLOBAL_SKIP_IPV6:
|
|
public_ipv6 = None
|
|
logger.warning("Globally skipping IPv6.")
|
|
else:
|
|
logger.info("Fetching public IPv6 address from ifconfig.me")
|
|
public_ipv6 = get_ipv6()
|
|
if not public_ipv6:
|
|
logger.error("Public IPv6 not found.")
|
|
exit(1)
|
|
logger.info("Public IPv6 is %s", public_ipv6)
|
|
|
|
route53_success = True
|
|
unifi_success = True
|
|
route53_updated_ipv4: set[str] = set()
|
|
route53_updated_ipv6: set[str] = set()
|
|
unifi_updated_ipv4: set[str] = set()
|
|
unifi_updated_ipv6: set[str] = set()
|
|
|
|
if route53_records:
|
|
if not ROUTE53_HOSTED_ZONE_ID:
|
|
logger.error("ROUTE53_HOSTED_ZONE_ID must be set for Route53 records!")
|
|
route53_success = False
|
|
else:
|
|
logger.info("=== Starting Route53 updates (hosted_zone_id=%s) ===", ROUTE53_HOSTED_ZONE_ID)
|
|
try:
|
|
route53_updated_ipv4, route53_updated_ipv6 = _update_route53_records(route53_records, public_ipv4, public_ipv6)
|
|
logger.info("=== Finished Route53 updates ===")
|
|
except Exception as e:
|
|
logger.error("Route53 updates failed: %s", e)
|
|
route53_success = False
|
|
|
|
if unifi_records:
|
|
logger.info("=== Starting UniFi updates ===")
|
|
if not all([UNIFI_HOST, UNIFI_SITE_ID, UNIFI_API_TOKEN]):
|
|
logger.error("UNIFI_HOST, UNIFI_SITE_ID, and UNIFI_API_TOKEN must be set for UniFi records!")
|
|
unifi_success = False
|
|
else:
|
|
try:
|
|
unifi_config: UnifiConfig = {
|
|
"host": UNIFI_HOST, # type: ignore
|
|
"site_id": UNIFI_SITE_ID, # type: ignore
|
|
"api_token": UNIFI_API_TOKEN, # type: ignore
|
|
"verify_ssl": UNIFI_VERIFY_SSL,
|
|
"records": unifi_records, # type: ignore[arg-type]
|
|
}
|
|
|
|
unifi_updated_ipv4, unifi_updated_ipv6 = _update_unifi_records(unifi_config, public_ipv4, public_ipv6)
|
|
logger.info("=== Finished UniFi updates ===")
|
|
except Exception as e:
|
|
logger.error("UniFi updates failed: %s", e)
|
|
unifi_success = False
|
|
|
|
route53_domains = [r["record"] for r in route53_records]
|
|
unifi_domains = [r["record"] for r in unifi_records]
|
|
|
|
route53_lines = []
|
|
if public_ipv4:
|
|
for domain in route53_domains:
|
|
if domain in route53_updated_ipv4:
|
|
route53_lines.append(f"{domain} (A): {public_ipv4}")
|
|
if public_ipv6:
|
|
for domain in route53_domains:
|
|
if domain in route53_updated_ipv6:
|
|
route53_lines.append(f"{domain} (AAAA): {public_ipv6}")
|
|
route53_text = "\n".join(route53_lines) if route53_lines else "No IPs available"
|
|
|
|
unifi_lines = []
|
|
if public_ipv4:
|
|
for domain in unifi_domains:
|
|
if domain in unifi_updated_ipv4:
|
|
unifi_lines.append(f"{domain} (A): {public_ipv4}")
|
|
if public_ipv6:
|
|
for domain in unifi_domains:
|
|
if domain in unifi_updated_ipv6:
|
|
unifi_lines.append(f"{domain} (AAAA): {public_ipv6}")
|
|
unifi_text = "\n".join(unifi_lines) if unifi_lines else "No IPs available"
|
|
|
|
if route53_records:
|
|
if route53_success:
|
|
has_changes = bool(route53_updated_ipv4 or route53_updated_ipv6)
|
|
ntfy_priority = 4 if has_changes else 2
|
|
ntfy_title = "Route53 IP Changed" if has_changes else "Route53 IP Unchanged"
|
|
send_ntfy_notification(ntfy_title, f"Records updated:\n{route53_text}", priority=ntfy_priority)
|
|
else:
|
|
send_ntfy_notification("Route53 Update Failed", f"Records:\n{route53_text}", priority=2)
|
|
|
|
if unifi_records:
|
|
if unifi_success:
|
|
has_changes = bool(unifi_updated_ipv4 or unifi_updated_ipv6)
|
|
ntfy_priority = 4 if has_changes else 2
|
|
ntfy_title = "UniFi IP Changed" if has_changes else "UniFi IP Unchanged"
|
|
send_ntfy_notification(ntfy_title, f"Records updated:\n{unifi_text}", priority=ntfy_priority)
|
|
else:
|
|
send_ntfy_notification("UniFi Update Failed", f"Records:\n{unifi_text}", priority=2)
|
|
|
|
logger.info("=== DDNS Update Complete ===")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|