120 lines
4.2 KiB
Python
120 lines
4.2 KiB
Python
"""
|
|
Route53 DNS record updater.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from typing import TYPE_CHECKING
|
|
|
|
import boto3
|
|
from dotenv import load_dotenv
|
|
|
|
try:
|
|
from mypy_boto3_route53 import Route53Client
|
|
except ImportError:
|
|
Route53Client = object # type: ignore
|
|
|
|
load_dotenv()
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_route53_client() -> Route53Client:
|
|
access_key = os.getenv("AWS_ACCESS_KEY_ID")
|
|
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
|
|
if access_key and secret_key:
|
|
return boto3.client(
|
|
"route53",
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_key,
|
|
)
|
|
return boto3.client("route53")
|
|
|
|
|
|
def _get_current_record(hosted_zone_id: str, record: str, record_type: str) -> str | None:
|
|
client = _get_route53_client()
|
|
try:
|
|
response = client.list_resource_record_sets(HostedZoneId=hosted_zone_id)
|
|
record_normalized = record.rstrip(".")
|
|
for rrset in response["ResourceRecordSets"]:
|
|
if rrset["Name"].rstrip(".") == record_normalized and rrset["Type"] == record_type:
|
|
records = rrset.get("ResourceRecords", [])
|
|
if records:
|
|
return records[0]["Value"].rstrip(".")
|
|
except Exception as e:
|
|
logger.warning("Failed to get current %s record for %s: %s", record_type, record, e)
|
|
return None
|
|
|
|
|
|
def update_ipv4(hosted_zone_id: str, record: str, public_ipv4: str) -> bool:
|
|
current = _get_current_record(hosted_zone_id, record, "A")
|
|
if current == public_ipv4:
|
|
logger.info("IPv4 for %s is already %s, no update needed", record, public_ipv4)
|
|
return False
|
|
logger.debug("Creating Route53 client for hosted zone %s", hosted_zone_id)
|
|
client: Route53Client = _get_route53_client()
|
|
logger.debug("Building ChangeBatch for IPv4: record=%s, ip=%s, ttl=300", record, public_ipv4)
|
|
try:
|
|
logger.info("Calling Route53 upsert for IPv4 on %s", record)
|
|
client.change_resource_record_sets(
|
|
HostedZoneId=hosted_zone_id,
|
|
ChangeBatch={
|
|
"Comment": "Update Public Addresses",
|
|
"Changes": [
|
|
{
|
|
"Action": "UPSERT",
|
|
"ResourceRecordSet": {
|
|
"Name": f"{record}",
|
|
"Type": "A",
|
|
"TTL": 300,
|
|
"ResourceRecords": [{"Value": public_ipv4}],
|
|
},
|
|
}
|
|
],
|
|
},
|
|
)
|
|
logger.info("Successfully updated IPv4 for %s -> %s", record, public_ipv4)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Error updating IPv4 for %s: %s", record, e)
|
|
raise e
|
|
|
|
|
|
def update_ipv6(hosted_zone_id: str, record: str, public_ipv6: str) -> bool:
|
|
current = _get_current_record(hosted_zone_id, record, "AAAA")
|
|
if current == public_ipv6:
|
|
logger.info("IPv6 for %s is already %s, no update needed", record, public_ipv6)
|
|
return False
|
|
logger.debug("Creating Route53 client for hosted zone %s", hosted_zone_id)
|
|
client = _get_route53_client()
|
|
logger.debug("Building ChangeBatch for IPv6: record=%s, ip=%s, ttl=300", record, public_ipv6)
|
|
try:
|
|
logger.info("Calling Route53 upsert for IPv6 on %s", record)
|
|
client.change_resource_record_sets(
|
|
HostedZoneId=hosted_zone_id,
|
|
ChangeBatch={
|
|
"Comment": "Update Public Addresses",
|
|
"Changes": [
|
|
{
|
|
"Action": "UPSERT",
|
|
"ResourceRecordSet": {
|
|
"Name": f"{record}",
|
|
"Type": "AAAA",
|
|
"TTL": 300,
|
|
"ResourceRecords": [{"Value": public_ipv6}],
|
|
},
|
|
}
|
|
],
|
|
},
|
|
)
|
|
logger.info("Successfully updated IPv6 for %s -> %s", record, public_ipv6)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Error updating IPv6 for %s: %s", record, e)
|
|
raise e
|