#!/usr/bin/env python3 """ RabbitMQ setup & consumer using pika. - Creates two **topic exchanges**: nic, reese - For each exchange creates two queues: add, delete - Binds the queues with routing keys “add” and “delete” - Subscribes (consumes) from a **single** exchange/queue pair that is supplied via a tiny config file (config.yaml). Run: python3 rabbit_demo.py """ import logging import sys from pathlib import Path import pika # type: ignore import yaml # ---------------------------------------------------------------------- # 1️⃣ Load configuration # ---------------------------------------------------------------------- DEFAULT_CFG = """ rabbitmq: host: "localhost" port: 5672 virtual_host: "/" username: "guest" password: "guest" # Which *exchange* (topic) you actually want to listen to. # The program will create a temporary queue, bind it to this exchange # with the routing key supplied in `routing_key`. subscriber: exchange: "nic" # ← change to “reese” or any other exchange routing_key: "add" # ← could be “add”, “delete”, or any pattern """ CONFIG_PATH = Path("active/device_unifi/config.yaml") if not CONFIG_PATH.exists(): CONFIG_PATH.write_text(DEFAULT_CFG) with CONFIG_PATH.open() as f: cfg = yaml.safe_load(f) # ---------------------------------------------------------------------- # 2️⃣ Build connection parameters # ---------------------------------------------------------------------- cred = pika.PlainCredentials(cfg["rabbitmq"]["username"], cfg["rabbitmq"]["password"]) params = pika.ConnectionParameters( host=cfg["rabbitmq"]["host"], port=cfg["rabbitmq"]["port"], virtual_host=cfg["rabbitmq"]["virtual_host"], credentials=cred, ) # ---------------------------------------------------------------------- # 3️⃣ Helper to declare exchanges / queues # ---------------------------------------------------------------------- def declare_topology(channel): """ Create the two topic exchanges and the four queues, then bind each queue to its exchange with the appropriate routing key. """ exchanges = ["nic", "reese"] routing_keys = ["add", "delete"] for exch in exchanges: channel.exchange_declare(exchange=exch, exchange_type="topic", durable=True) for key in routing_keys: queue_name = f"{exch}_{key}" # e.g. nic_add, reese_delete channel.queue_declare(queue=queue_name, durable=True) # bind queue to the exchange with the same routing key channel.queue_bind(queue=queue_name, exchange=exch, routing_key=key) logging.info( f"Declared queue {queue_name} bound to {exch} with key '{key}'" ) # ---------------------------------------------------------------------- # 4️⃣ Consumer callback # ---------------------------------------------------------------------- def on_message(ch, method, properties, body): logging.info( f"Received from exchange '{method.exchange}' " f"routing_key='{method.routing_key}': {body!r}" ) # Acknowledge the message ch.basic_ack(delivery_tag=method.delivery_tag) # ---------------------------------------------------------------------- # 5️⃣ Main routine # ---------------------------------------------------------------------- def main(): logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s" ) with pika.BlockingConnection(params) as conn: channel = conn.channel() # 1️⃣ Declare the static topology (exchanges + queues) declare_topology(channel) # 2️⃣ Set up a *temporary* queue for the subscriber defined in config result = channel.queue_declare( queue="", exclusive=True ) # server‑generated name tmp_queue = result.method.queue exch = cfg["subscriber"]["exchange"] rkey = cfg["subscriber"]["routing_key"] channel.queue_bind(queue=tmp_queue, exchange=exch, routing_key=rkey) logging.info( f"Subscribed to exchange '{exch}' with routing_key '{rkey}' " f"using temporary queue '{tmp_queue}'" ) # 3️⃣ Start consuming channel.basic_consume(queue=tmp_queue, on_message_callback=on_message) try: logging.info("Waiting for messages. Press Ctrl+C to exit.") channel.start_consuming() except KeyboardInterrupt: logging.info("Interrupted – closing connection.") channel.stop_consuming() if __name__ == "__main__": sys.exit(main())