""" Route53 DNS record updater. """ import logging import os from typing import TYPE_CHECKING import boto3 from dotenv import load_dotenv try: from mypy_boto3_route53 import Route53Client except ImportError: Route53Client = object # type: ignore load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) def _get_route53_client() -> Route53Client: access_key = os.getenv("AWS_ACCESS_KEY_ID") secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") if access_key and secret_key: return boto3.client( "route53", aws_access_key_id=access_key, aws_secret_access_key=secret_key, ) return boto3.client("route53") def _get_current_record(hosted_zone_id: str, record: str, record_type: str) -> str | None: client = _get_route53_client() try: response = client.list_resource_record_sets(HostedZoneId=hosted_zone_id) record_normalized = record.rstrip(".") for rrset in response["ResourceRecordSets"]: if rrset["Name"].rstrip(".") == record_normalized and rrset["Type"] == record_type: records = rrset.get("ResourceRecords", []) if records: return records[0]["Value"].rstrip(".") except Exception as e: logger.warning("Failed to get current %s record for %s: %s", record_type, record, e) return None def update_ipv4(hosted_zone_id: str, record: str, public_ipv4: str) -> bool: current = _get_current_record(hosted_zone_id, record, "A") if current == public_ipv4: logger.info("IPv4 for %s is already %s, no update needed", record, public_ipv4) return False logger.debug("Creating Route53 client for hosted zone %s", hosted_zone_id) client: Route53Client = _get_route53_client() logger.debug("Building ChangeBatch for IPv4: record=%s, ip=%s, ttl=300", record, public_ipv4) try: logger.info("Calling Route53 upsert for IPv4 on %s", record) client.change_resource_record_sets( HostedZoneId=hosted_zone_id, ChangeBatch={ "Comment": "Update Public Addresses", "Changes": [ { "Action": "UPSERT", "ResourceRecordSet": { "Name": f"{record}", "Type": "A", "TTL": 300, "ResourceRecords": [{"Value": public_ipv4}], }, } ], }, ) logger.info("Successfully updated IPv4 for %s -> %s", record, public_ipv4) return True except Exception as e: logger.error("Error updating IPv4 for %s: %s", record, e) raise e def update_ipv6(hosted_zone_id: str, record: str, public_ipv6: str) -> bool: current = _get_current_record(hosted_zone_id, record, "AAAA") if current == public_ipv6: logger.info("IPv6 for %s is already %s, no update needed", record, public_ipv6) return False logger.debug("Creating Route53 client for hosted zone %s", hosted_zone_id) client = _get_route53_client() logger.debug("Building ChangeBatch for IPv6: record=%s, ip=%s, ttl=300", record, public_ipv6) try: logger.info("Calling Route53 upsert for IPv6 on %s", record) client.change_resource_record_sets( HostedZoneId=hosted_zone_id, ChangeBatch={ "Comment": "Update Public Addresses", "Changes": [ { "Action": "UPSERT", "ResourceRecordSet": { "Name": f"{record}", "Type": "AAAA", "TTL": 300, "ResourceRecords": [{"Value": public_ipv6}], }, } ], }, ) logger.info("Successfully updated IPv6 for %s -> %s", record, public_ipv6) return True except Exception as e: logger.error("Error updating IPv6 for %s: %s", record, e) raise e