rename podman_ projects to container_
This commit is contained in:
173
active/container_ddns/update.py
Normal file
173
active/container_ddns/update.py
Normal file
@@ -0,0 +1,173 @@
|
||||
"""
|
||||
export HOSTED_ZONE_ID=<aws hosted zone ID>
|
||||
export ROUTE53_RECORD=something.mydomain.com
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import yaml
|
||||
import sys
|
||||
from typing import TYPE_CHECKING, TypedDict
|
||||
|
||||
import boto3
|
||||
|
||||
try:
|
||||
from yaml import CLoader as Loader
|
||||
except ImportError:
|
||||
from yaml import Loader # type: ignore
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from mypy_boto3_route53 import Route53Client
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S",
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
ROUTE53_RECORDS_FILE = os.getenv("ROUTE53_RECORDS_FILE")
|
||||
GLOBAL_SKIP_IPV4 = os.getenv("GLOBAL_SKIP_IPV4", "false").lower() == "true"
|
||||
GLOBAL_SKIP_IPV6 = os.getenv("GLOBAL_SKIP_IPV6", "false").lower() == "true"
|
||||
|
||||
|
||||
class RecordType(TypedDict):
|
||||
record: str
|
||||
hosted_zone_id: str
|
||||
skip_ipv4: bool | None
|
||||
skip_ipv6: bool | None
|
||||
|
||||
|
||||
class RecordYamlStruct(TypedDict):
|
||||
records: list[RecordType]
|
||||
|
||||
|
||||
def get_ipv4() -> str:
|
||||
result = subprocess.run(["curl", "-4", "ifconfig.me"], capture_output=True)
|
||||
return result.stdout.decode()
|
||||
|
||||
|
||||
def get_ipv6() -> str:
|
||||
result = subprocess.run(["curl", "-6", "ifconfig.me"], capture_output=True)
|
||||
return result.stdout.decode()
|
||||
|
||||
|
||||
def update_ipv4(hosted_zone_id: str, record: str, public_ipv4: str):
|
||||
client: Route53Client = boto3.client("route53")
|
||||
try:
|
||||
logger.info("Calling upsert for ipv4.")
|
||||
client.change_resource_record_sets(
|
||||
HostedZoneId=hosted_zone_id,
|
||||
ChangeBatch={
|
||||
"Comment": "Update Public Addresses",
|
||||
"Changes": [
|
||||
{
|
||||
"Action": "UPSERT",
|
||||
"ResourceRecordSet": {
|
||||
"Name": f"{record}",
|
||||
"Type": "A",
|
||||
"TTL": 300,
|
||||
"ResourceRecords": [{"Value": public_ipv4}],
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
)
|
||||
logger.info(f"Successfully updated ipv4 for {record}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating ipv4 for {record}.")
|
||||
raise e
|
||||
|
||||
|
||||
def update_ipv6(hosted_zone_id: str, record: str, public_ipv6: str):
|
||||
client = boto3.client("route53")
|
||||
try:
|
||||
logger.info("Calling upsert for ipv6.")
|
||||
client.change_resource_record_sets(
|
||||
HostedZoneId=hosted_zone_id,
|
||||
ChangeBatch={
|
||||
"Comment": "Update Public Addresses",
|
||||
"Changes": [
|
||||
{
|
||||
"Action": "UPSERT",
|
||||
"ResourceRecordSet": {
|
||||
"Name": f"{record}",
|
||||
"Type": "AAAA",
|
||||
"TTL": 300,
|
||||
"ResourceRecords": [{"Value": public_ipv6}],
|
||||
},
|
||||
}
|
||||
],
|
||||
},
|
||||
)
|
||||
logger.info(f"Successfully updated ipv6 for {record}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating ipv6 for {record}.")
|
||||
raise e
|
||||
|
||||
|
||||
def main():
|
||||
if not ROUTE53_RECORDS_FILE:
|
||||
logger.error("ROUTE53_RECORDS_FILE env var not found!")
|
||||
exit(1)
|
||||
|
||||
try:
|
||||
with open(ROUTE53_RECORDS_FILE) as f:
|
||||
records_file_contents: RecordYamlStruct = yaml.load(f, Loader)
|
||||
except FileNotFoundError as e:
|
||||
logger.error(e)
|
||||
sys.exit(1)
|
||||
|
||||
if GLOBAL_SKIP_IPV4:
|
||||
public_ipv4 = None
|
||||
logger.warning("Globally skipping IPv4.")
|
||||
else:
|
||||
logger.info("Getting IPv4 address from ifconfig.me")
|
||||
public_ipv4 = get_ipv4()
|
||||
if not public_ipv4:
|
||||
logger.error("Public IPv4 not found.")
|
||||
exit(1)
|
||||
logger.info(f"Public IPv4 is {public_ipv4}")
|
||||
|
||||
if GLOBAL_SKIP_IPV6:
|
||||
public_ipv6 = None
|
||||
logger.warning("Globally Skipping IPv6")
|
||||
else:
|
||||
logger.info("Getting IPv6 address from ifconfig.me")
|
||||
public_ipv6 = get_ipv6()
|
||||
if not public_ipv6:
|
||||
logger.error("Public IPv6 not found.")
|
||||
exit(1)
|
||||
logger.info(f"Public IPv6 is {public_ipv6}")
|
||||
|
||||
for record in records_file_contents["records"]:
|
||||
|
||||
logger.info(f"Attempting to update {record['record']} from {record['hosted_zone_id']}.")
|
||||
|
||||
if record.get("skip_ipv4"):
|
||||
logger.info(f"{record['record']} requested to skip IPv4")
|
||||
elif GLOBAL_SKIP_IPV4 or not public_ipv4:
|
||||
logger.info("Globally skipping IPv4")
|
||||
else:
|
||||
update_ipv4(
|
||||
hosted_zone_id=record["hosted_zone_id"],
|
||||
record=record["record"],
|
||||
public_ipv4=public_ipv4,
|
||||
)
|
||||
|
||||
if record.get("skip_ipv6"):
|
||||
logger.info(f"{record['record']} requested to skip IPv6")
|
||||
elif GLOBAL_SKIP_IPV6 or not public_ipv6:
|
||||
logger.info("Globally skipping IPv6")
|
||||
else:
|
||||
update_ipv6(
|
||||
hosted_zone_id=record["hosted_zone_id"],
|
||||
record=record["record"],
|
||||
public_ipv6=public_ipv6,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user