diff --git a/tests/unit/test_pubsub/test_rabbitmq_backend.py b/tests/unit/test_pubsub/test_rabbitmq_backend.py index ffe18fd7..54599723 100644 --- a/tests/unit/test_pubsub/test_rabbitmq_backend.py +++ b/tests/unit/test_pubsub/test_rabbitmq_backend.py @@ -1,5 +1,5 @@ """ -Unit tests for RabbitMQ backend — queue name mapping and factory dispatch. +Unit tests for RabbitMQ backend — topic parsing and factory dispatch. Does not require a running RabbitMQ instance. """ @@ -12,7 +12,7 @@ from trustgraph.base.rabbitmq_backend import RabbitMQBackend from trustgraph.base.pubsub import get_pubsub, add_pubsub_args -class TestRabbitMQMapQueueName: +class TestRabbitMQParseTopic: @pytest.fixture def backend(self): @@ -20,43 +20,48 @@ class TestRabbitMQMapQueueName: return b def test_flow_is_durable(self, backend): - name, durable = backend.map_queue_name('flow:tg:text-completion-request') + exchange, cls, durable = backend._parse_topic('flow:tg:text-completion-request') assert durable is True - assert name == 'tg.flow.text-completion-request' + assert cls == 'flow' + assert exchange == 'tg.flow.text-completion-request' def test_notify_is_not_durable(self, backend): - name, durable = backend.map_queue_name('notify:tg:config') + exchange, cls, durable = backend._parse_topic('notify:tg:config') assert durable is False - assert name == 'tg.notify.config' + assert cls == 'notify' + assert exchange == 'tg.notify.config' def test_request_is_not_durable(self, backend): - name, durable = backend.map_queue_name('request:tg:config') + exchange, cls, durable = backend._parse_topic('request:tg:config') assert durable is False - assert name == 'tg.request.config' + assert cls == 'request' + assert exchange == 'tg.request.config' def test_response_is_not_durable(self, backend): - name, durable = backend.map_queue_name('response:tg:librarian') + exchange, cls, durable = backend._parse_topic('response:tg:librarian') assert durable is False - assert name == 'tg.response.librarian' + assert cls == 'response' + assert exchange == 'tg.response.librarian' def test_custom_topicspace(self, backend): - name, durable = backend.map_queue_name('flow:prod:my-queue') - assert name == 'prod.flow.my-queue' + exchange, cls, durable = backend._parse_topic('flow:prod:my-queue') + assert exchange == 'prod.flow.my-queue' assert durable is True def test_no_colon_defaults_to_flow(self, backend): - name, durable = backend.map_queue_name('simple-queue') - assert name == 'tg.simple-queue' - assert durable is False + exchange, cls, durable = backend._parse_topic('simple-queue') + assert exchange == 'tg.flow.simple-queue' + assert cls == 'flow' + assert durable is True def test_invalid_class_raises(self, backend): - with pytest.raises(ValueError, match="Invalid queue class"): - backend.map_queue_name('unknown:tg:topic') + with pytest.raises(ValueError, match="Invalid topic class"): + backend._parse_topic('unknown:tg:topic') - def test_flow_with_flow_suffix(self, backend): - """Queue names with flow suffix (e.g. :default) are preserved.""" - name, durable = backend.map_queue_name('request:tg:prompt:default') - assert name == 'tg.request.prompt:default' + def test_topic_with_flow_suffix(self, backend): + """Topic names with flow suffix (e.g. :default) are preserved.""" + exchange, cls, durable = backend._parse_topic('request:tg:prompt:default') + assert exchange == 'tg.request.prompt:default' class TestGetPubsubRabbitMQ: diff --git a/trustgraph-base/trustgraph/base/backend.py b/trustgraph-base/trustgraph/base/backend.py index 0f95ca1b..a105ca17 100644 --- a/trustgraph-base/trustgraph/base/backend.py +++ b/trustgraph-base/trustgraph/base/backend.py @@ -121,7 +121,7 @@ class PubSubBackend(Protocol): Create a producer for a topic. Args: - topic: Generic topic format (qos/tenant/namespace/queue) + topic: Queue identifier in class:topicspace:topic format schema: Dataclass type for messages **options: Backend-specific options (e.g., chunking_enabled) @@ -159,59 +159,55 @@ class PubSubBackend(Protocol): """ ... - async def create_queue(self, topic: str, subscription: str) -> None: + async def create_topic(self, topic: str) -> None: """ - Pre-create a queue so it exists before any consumer connects. + Create the broker-side resources for a logical topic. - The topic and subscription together identify the queue, mirroring - create_consumer where the queue name is derived from both. + For RabbitMQ this creates a fanout exchange. For Pulsar this is + a no-op (topics auto-create on first use). - Idempotent — creating an already-existing queue succeeds silently. + Idempotent — creating an already-existing topic succeeds silently. Args: - topic: Queue identifier in class:topicspace:topic format - subscription: Subscription/consumer group name + topic: Topic identifier in class:topicspace:topic format """ ... - async def delete_queue(self, topic: str, subscription: str) -> None: + async def delete_topic(self, topic: str) -> None: """ - Delete a queue and any messages it contains. + Delete a topic and discard any in-flight messages. - The topic and subscription together identify the queue, mirroring - create_consumer where the queue name is derived from both. + For RabbitMQ this deletes the fanout exchange; consumer queues + lose their binding and drain naturally. - Idempotent — deleting a non-existent queue succeeds silently. + Idempotent — deleting a non-existent topic succeeds silently. Args: - topic: Queue identifier in class:topicspace:topic format - subscription: Subscription/consumer group name + topic: Topic identifier in class:topicspace:topic format """ ... - async def queue_exists(self, topic: str, subscription: str) -> bool: + async def topic_exists(self, topic: str) -> bool: """ - Check whether a queue exists. + Check whether a topic exists. Args: - topic: Queue identifier in class:topicspace:topic format - subscription: Subscription/consumer group name + topic: Topic identifier in class:topicspace:topic format Returns: - True if the queue exists, False otherwise. + True if the topic exists, False otherwise. """ ... - async def ensure_queue(self, topic: str, subscription: str) -> None: + async def ensure_topic(self, topic: str) -> None: """ - Ensure a queue exists, creating it if necessary. + Ensure a topic exists, creating it if necessary. Convenience wrapper — checks existence, creates if missing. - Used by system services on startup. + Used by the flow service and system services on startup. Args: - topic: Queue identifier in class:topicspace:topic format - subscription: Subscription/consumer group name + topic: Topic identifier in class:topicspace:topic format """ ... diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index 2100483d..e27d16af 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -266,22 +266,22 @@ class PulsarBackend: return PulsarBackendConsumer(pulsar_consumer, schema) - async def create_queue(self, topic: str, subscription: str) -> None: + async def create_topic(self, topic: str) -> None: """No-op — Pulsar auto-creates topics on first use. TODO: Use admin REST API for explicit persistent topic creation.""" pass - async def delete_queue(self, topic: str, subscription: str) -> None: + async def delete_topic(self, topic: str) -> None: """No-op — to be replaced with admin REST API calls. - TODO: Delete subscription and persistent topic via admin API.""" + TODO: Delete persistent topic via admin API.""" pass - async def queue_exists(self, topic: str, subscription: str) -> bool: + async def topic_exists(self, topic: str) -> bool: """Returns True — Pulsar auto-creates on subscribe. TODO: Use admin REST API for actual existence check.""" return True - async def ensure_queue(self, topic: str, subscription: str) -> None: + async def ensure_topic(self, topic: str) -> None: """No-op — Pulsar auto-creates topics on first use. TODO: Use admin REST API for explicit creation.""" pass diff --git a/trustgraph-base/trustgraph/base/rabbitmq_backend.py b/trustgraph-base/trustgraph/base/rabbitmq_backend.py index 43c717c3..73b80cb9 100644 --- a/trustgraph-base/trustgraph/base/rabbitmq_backend.py +++ b/trustgraph-base/trustgraph/base/rabbitmq_backend.py @@ -1,22 +1,24 @@ """ RabbitMQ backend implementation for pub/sub abstraction. -Uses a single topic exchange per topicspace. The logical queue name -becomes the routing key. Consumer behavior is determined by the -subscription name: +Each logical topic maps to its own fanout exchange. The exchange name +encodes the full topic identity: -- Same subscription + same topic = shared queue (competing consumers) -- Different subscriptions = separate queues (broadcast / fan-out) + class:topicspace:topic → exchange topicspace.class.topic -This mirrors Pulsar's subscription model using idiomatic RabbitMQ. +Producers publish to the exchange with an empty routing key. +Consumers declare and bind their own queues: + + - flow / request: named durable/non-durable queue (competing consumers) + - response / notify: anonymous exclusive auto-delete queue (per-subscriber) + +The flow service manages topic lifecycle (create/delete exchanges). +Consumers manage their own queue lifecycle (declare + bind on connect). Architecture: - Producer --> [tg exchange] --routing key--> [named queue] --> Consumer - --routing key--> [named queue] --> Consumer - --routing key--> [exclusive q] --> Subscriber - -Uses basic_consume (push) instead of basic_get (polling) for -efficient message delivery. + Producer --> [fanout exchange] --> [named queue] --> Consumer + --> [named queue] --> Consumer + --> [exclusive queue] --> Subscriber """ import asyncio @@ -58,18 +60,16 @@ class RabbitMQMessage: class RabbitMQBackendProducer: - """Publishes messages to a topic exchange with a routing key. + """Publishes messages to a fanout exchange. Uses thread-local connections so each thread gets its own connection/channel. This avoids wire corruption from concurrent threads writing to the same socket (pika is not thread-safe). """ - def __init__(self, connection_params, exchange_name, routing_key, - durable): + def __init__(self, connection_params, exchange_name, durable): self._connection_params = connection_params self._exchange_name = exchange_name - self._routing_key = routing_key self._durable = durable self._local = threading.local() @@ -90,7 +90,7 @@ class RabbitMQBackendProducer: chan = conn.channel() chan.exchange_declare( exchange=self._exchange_name, - exchange_type='topic', + exchange_type='fanout', durable=True, ) self._local.connection = conn @@ -113,7 +113,7 @@ class RabbitMQBackendProducer: channel = self._get_channel() channel.basic_publish( exchange=self._exchange_name, - routing_key=self._routing_key, + routing_key='', body=json_data.encode('utf-8'), properties=amqp_properties, ) @@ -144,19 +144,17 @@ class RabbitMQBackendProducer: class RabbitMQBackendConsumer: - """Consumes from a queue bound to a topic exchange. + """Consumes from a queue bound to a fanout exchange. Uses basic_consume (push model) with messages delivered to an internal thread-safe queue. process_data_events() drives both message delivery and heartbeat processing. """ - def __init__(self, connection_params, exchange_name, routing_key, - queue_name, schema_cls, durable, exclusive=False, - auto_delete=False): + def __init__(self, connection_params, exchange_name, queue_name, + schema_cls, durable, exclusive=False, auto_delete=False): self._connection_params = connection_params self._exchange_name = exchange_name - self._routing_key = routing_key self._queue_name = queue_name self._schema_cls = schema_cls self._durable = durable @@ -171,17 +169,16 @@ class RabbitMQBackendConsumer: self._connection = pika.BlockingConnection(self._connection_params) self._channel = self._connection.channel() - # Declare the topic exchange (idempotent, also done by producers) + # Declare the fanout exchange (idempotent) self._channel.exchange_declare( exchange=self._exchange_name, - exchange_type='topic', + exchange_type='fanout', durable=True, ) if self._exclusive: # Anonymous ephemeral queue (response/notify class). - # These are per-consumer and must be created here — the - # broker assigns the name. + # Per-consumer, broker assigns the name. result = self._channel.queue_declare( queue='', durable=False, @@ -189,20 +186,22 @@ class RabbitMQBackendConsumer: auto_delete=True, ) self._queue_name = result.method.queue - - self._channel.queue_bind( - queue=self._queue_name, - exchange=self._exchange_name, - routing_key=self._routing_key, - ) else: - # Named queue (flow/request class). Queue must already - # exist — created by the flow service or ensure_queue. - # We just verify it exists and bind to consume. + # Named queue (flow/request class). + # Consumer owns its queue — declare and bind here. self._channel.queue_declare( - queue=self._queue_name, passive=True, + queue=self._queue_name, + durable=self._durable, + exclusive=False, + auto_delete=False, ) + # Bind queue to the fanout exchange + self._channel.queue_bind( + queue=self._queue_name, + exchange=self._exchange_name, + ) + self._channel.basic_qos(prefetch_count=1) # Register push-based consumer @@ -318,7 +317,7 @@ class RabbitMQBackendConsumer: class RabbitMQBackend: - """RabbitMQ pub/sub backend using a topic exchange per topicspace.""" + """RabbitMQ pub/sub backend using one fanout exchange per topic.""" def __init__(self, host='localhost', port=5672, username='guest', password='guest', vhost='/'): @@ -331,20 +330,23 @@ class RabbitMQBackend: ) logger.info(f"RabbitMQ backend: {host}:{port} vhost={vhost}") - def _parse_queue_id(self, queue_id: str) -> tuple[str, str, str, bool]: + def _parse_topic(self, topic_id: str) -> tuple[str, str, bool]: """ - Parse queue identifier into exchange, routing key, and durability. + Parse topic identifier into exchange name and durability. Format: class:topicspace:topic - Returns: (exchange_name, routing_key, class, durable) - """ - if ':' not in queue_id: - return 'tg', queue_id, 'flow', False + Returns: (exchange_name, class, durable) - parts = queue_id.split(':', 2) + The exchange name encodes the full topic identity: + class:topicspace:topic → topicspace.class.topic + """ + if ':' not in topic_id: + return f'tg.flow.{topic_id}', 'flow', True + + parts = topic_id.split(':', 2) if len(parts) != 3: raise ValueError( - f"Invalid queue format: {queue_id}, " + f"Invalid topic format: {topic_id}, " f"expected class:topicspace:topic" ) @@ -356,36 +358,28 @@ class RabbitMQBackend: durable = False else: raise ValueError( - f"Invalid queue class: {cls}, " + f"Invalid topic class: {cls}, " f"expected flow, request, response, or notify" ) - # Exchange per topicspace, routing key includes class - exchange_name = topicspace - routing_key = f"{cls}.{topic}" + exchange_name = f"{topicspace}.{cls}.{topic}" - return exchange_name, routing_key, cls, durable - - # Keep map_queue_name for backward compatibility with tests - def map_queue_name(self, queue_id: str) -> tuple[str, bool]: - exchange, routing_key, cls, durable = self._parse_queue_id(queue_id) - return f"{exchange}.{routing_key}", durable + return exchange_name, cls, durable def create_producer(self, topic: str, schema: type, **options) -> BackendProducer: - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) logger.debug( - f"Creating producer: exchange={exchange}, " - f"routing_key={routing_key}" + f"Creating producer: exchange={exchange}" ) return RabbitMQBackendProducer( - self._connection_params, exchange, routing_key, durable, + self._connection_params, exchange, durable, ) def create_consumer(self, topic: str, subscription: str, schema: type, initial_position: str = 'latest', **options) -> BackendConsumer: - """Create a consumer with a queue bound to the topic exchange. + """Create a consumer with a queue bound to the topic's exchange. Behaviour is determined by the topic's class prefix: - flow: named durable queue, competing consumers (round-robin) @@ -393,7 +387,7 @@ class RabbitMQBackend: - response: anonymous ephemeral queue, per-subscriber (auto-delete) - notify: anonymous ephemeral queue, per-subscriber (auto-delete) """ - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) if cls in ('response', 'notify'): # Per-subscriber: anonymous queue, auto-deleted on disconnect @@ -403,45 +397,33 @@ class RabbitMQBackend: auto_delete = True else: # Shared: named queue, competing consumers - queue_name = f"{exchange}.{routing_key}.{subscription}" + queue_name = f"{exchange}.{subscription}" queue_durable = durable exclusive = False auto_delete = False logger.debug( f"Creating consumer: exchange={exchange}, " - f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, " - f"cls={cls}" + f"queue={queue_name or '(anonymous)'}, cls={cls}" ) return RabbitMQBackendConsumer( - self._connection_params, exchange, routing_key, + self._connection_params, exchange, queue_name, schema, queue_durable, exclusive, auto_delete, ) - def _create_queue_sync(self, exchange, routing_key, queue_name, durable): - """Blocking queue creation — run via asyncio.to_thread.""" + def _create_topic_sync(self, exchange_name): + """Blocking exchange creation — run via asyncio.to_thread.""" connection = None try: connection = pika.BlockingConnection(self._connection_params) channel = connection.channel() channel.exchange_declare( - exchange=exchange, - exchange_type='topic', + exchange=exchange_name, + exchange_type='fanout', durable=True, ) - channel.queue_declare( - queue=queue_name, - durable=durable, - exclusive=False, - auto_delete=False, - ) - channel.queue_bind( - queue=queue_name, - exchange=exchange, - routing_key=routing_key, - ) - logger.info(f"Created queue: {queue_name}") + logger.info(f"Created topic (exchange): {exchange_name}") finally: if connection and connection.is_open: try: @@ -449,34 +431,30 @@ class RabbitMQBackend: except Exception: pass - async def create_queue(self, topic: str, subscription: str) -> None: - """Pre-create a named queue bound to the topic exchange. + async def create_topic(self, topic: str) -> None: + """Create the fanout exchange for a logical topic. - Only applies to shared queues (flow/request class). Response and - notify queues are anonymous/auto-delete and created by consumers. + Only applies to flow and request class topics. Response and + notify exchanges are created on demand by consumers. """ - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) if cls in ('response', 'notify'): return - queue_name = f"{exchange}.{routing_key}.{subscription}" - await asyncio.to_thread( - self._create_queue_sync, exchange, routing_key, - queue_name, durable, - ) + await asyncio.to_thread(self._create_topic_sync, exchange) - def _delete_queue_sync(self, queue_name): - """Blocking queue deletion — run via asyncio.to_thread.""" + def _delete_topic_sync(self, exchange_name): + """Blocking exchange deletion — run via asyncio.to_thread.""" connection = None try: connection = pika.BlockingConnection(self._connection_params) channel = connection.channel() - channel.queue_delete(queue=queue_name) - logger.info(f"Deleted queue: {queue_name}") + channel.exchange_delete(exchange=exchange_name) + logger.info(f"Deleted topic (exchange): {exchange_name}") except Exception as e: - # Idempotent — queue may already be gone - logger.debug(f"Queue delete for {queue_name}: {e}") + # Idempotent — exchange may already be gone + logger.debug(f"Exchange delete for {exchange_name}: {e}") finally: if connection and connection.is_open: try: @@ -484,31 +462,27 @@ class RabbitMQBackend: except Exception: pass - async def delete_queue(self, topic: str, subscription: str) -> None: - """Delete a named queue and any messages it contains. + async def delete_topic(self, topic: str) -> None: + """Delete a topic's fanout exchange. - Only applies to shared queues (flow/request class). Response and - notify queues are anonymous/auto-delete and managed by the broker. + Consumer queues lose their binding and drain naturally. """ - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) + await asyncio.to_thread(self._delete_topic_sync, exchange) - if cls in ('response', 'notify'): - return - - queue_name = f"{exchange}.{routing_key}.{subscription}" - await asyncio.to_thread(self._delete_queue_sync, queue_name) - - def _queue_exists_sync(self, queue_name): - """Blocking queue existence check — run via asyncio.to_thread. + def _topic_exists_sync(self, exchange_name): + """Blocking exchange existence check — run via asyncio.to_thread. Uses passive=True which checks without creating.""" connection = None try: connection = pika.BlockingConnection(self._connection_params) channel = connection.channel() - channel.queue_declare(queue=queue_name, passive=True) + channel.exchange_declare( + exchange=exchange_name, passive=True, + ) return True except pika.exceptions.ChannelClosedByBroker: - # 404 NOT_FOUND — queue does not exist + # 404 NOT_FOUND — exchange does not exist return False finally: if connection and connection.is_open: @@ -517,26 +491,25 @@ class RabbitMQBackend: except Exception: pass - async def queue_exists(self, topic: str, subscription: str) -> bool: - """Check whether a named queue exists. + async def topic_exists(self, topic: str) -> bool: + """Check whether a topic's exchange exists. - Only applies to shared queues (flow/request class). Response and - notify queues are anonymous/ephemeral — always returns False. + Only applies to flow and request class topics. Response and + notify topics are ephemeral — always returns False. """ - exchange, routing_key, cls, durable = self._parse_queue_id(topic) + exchange, cls, durable = self._parse_topic(topic) if cls in ('response', 'notify'): return False - queue_name = f"{exchange}.{routing_key}.{subscription}" return await asyncio.to_thread( - self._queue_exists_sync, queue_name + self._topic_exists_sync, exchange ) - async def ensure_queue(self, topic: str, subscription: str) -> None: - """Ensure a queue exists, creating it if necessary.""" - if not await self.queue_exists(topic, subscription): - await self.create_queue(topic, subscription) + async def ensure_topic(self, topic: str) -> None: + """Ensure a topic exists, creating it if necessary.""" + if not await self.topic_exists(topic): + await self.create_topic(topic) def close(self) -> None: pass diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 75232315..fe44b852 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -124,9 +124,7 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_queue( - self.config_request_topic, self.config_request_subscriber - ) + await self.pubsub.ensure_topic(self.config_request_topic) await self.push() # Startup poke: empty types = everything await self.config_request_consumer.start() diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index 400f96d1..93017c30 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -119,9 +119,7 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_queue( - self.knowledge_request_topic, self.knowledge_request_subscriber - ) + await self.pubsub.ensure_topic(self.knowledge_request_topic) await super(Processor, self).start() await self.knowledge_request_consumer.start() await self.knowledge_response_producer.start() diff --git a/trustgraph-flow/trustgraph/flow/service/flow.py b/trustgraph-flow/trustgraph/flow/service/flow.py index 477c6a2c..b864faf9 100644 --- a/trustgraph-flow/trustgraph/flow/service/flow.py +++ b/trustgraph-flow/trustgraph/flow/service/flow.py @@ -7,7 +7,7 @@ import logging # Module logger logger = logging.getLogger(__name__) -# Queue deletion retry settings +# Topic deletion retry settings DELETE_RETRIES = 5 DELETE_RETRY_DELAY = 2 # seconds @@ -215,11 +215,11 @@ class FlowConfig: return result - # Pre-create flow-level queues so the data path is wired + # Pre-create topic exchanges so the data path is wired # before processors receive their config and start connecting. - queues = self._collect_flow_queues(cls, repl_template_with_params) - for topic, subscription in queues: - await self.pubsub.create_queue(topic, subscription) + topics = self._collect_flow_topics(cls, repl_template_with_params) + for topic in topics: + await self.pubsub.create_topic(topic) # Build all processor config updates, then write in a single batch. updates = [] @@ -283,8 +283,8 @@ class FlowConfig: error = None, ) - async def ensure_existing_flow_queues(self): - """Ensure queues exist for all already-running flows. + async def ensure_existing_flow_topics(self): + """Ensure topics exist for all already-running flows. Called on startup to handle flows that were started before this version of the flow service was deployed, or before a restart. @@ -315,7 +315,7 @@ class FlowConfig: if blueprint_data is None: logger.warning( f"Blueprint '{blueprint_name}' not found for " - f"flow '{flow_id}', skipping queue creation" + f"flow '{flow_id}', skipping topic creation" ) continue @@ -333,65 +333,63 @@ class FlowConfig: ) return result - queues = self._collect_flow_queues(cls, repl_template) - for topic, subscription in queues: - await self.pubsub.ensure_queue(topic, subscription) + topics = self._collect_flow_topics(cls, repl_template) + for topic in topics: + await self.pubsub.ensure_topic(topic) logger.info( - f"Ensured queues for existing flow '{flow_id}'" + f"Ensured topics for existing flow '{flow_id}'" ) except Exception as e: logger.error( - f"Failed to ensure queues for flow '{flow_id}': {e}" + f"Failed to ensure topics for flow '{flow_id}': {e}" ) - def _collect_flow_queues(self, cls, repl_template): - """Collect (topic, subscription) pairs for all flow-level queues. + def _collect_flow_topics(self, cls, repl_template): + """Collect unique topic identifiers from the blueprint. - Iterates the blueprint's "flow" section and reads only the - "topics" dict from each processor entry. + Iterates the blueprint's "flow" section and returns a + deduplicated set of resolved topic strings. The flow service + manages topic lifecycle (create/delete exchanges), not + individual consumer queues. """ - queues = [] + topics = set() for k, v in cls["flow"].items(): - processor, variant = k.split(":", 1) - variant = repl_template(variant) - for spec_name, topic_template in v.get("topics", {}).items(): topic = repl_template(topic_template) - subscription = f"{processor}--{variant}--{spec_name}" - queues.append((topic, subscription)) + topics.add(topic) - return queues + return topics - async def _delete_queues(self, queues): - """Delete queues with retries. Best-effort — logs failures but + async def _delete_topics(self, topics): + """Delete topics with retries. Best-effort — logs failures but does not raise.""" for attempt in range(DELETE_RETRIES): remaining = [] - for topic, subscription in queues: + for topic in topics: try: - await self.pubsub.delete_queue(topic, subscription) + await self.pubsub.delete_topic(topic) except Exception as e: logger.warning( - f"Queue delete failed (attempt {attempt + 1}/" + f"Topic delete failed (attempt {attempt + 1}/" f"{DELETE_RETRIES}): {topic}: {e}" ) - remaining.append((topic, subscription)) + remaining.append(topic) if not remaining: return - queues = remaining + topics = remaining if attempt < DELETE_RETRIES - 1: await asyncio.sleep(DELETE_RETRY_DELAY) - for topic, subscription in queues: + for topic in topics: logger.error( - f"Failed to delete queue after {DELETE_RETRIES} " + f"Failed to delete topic after {DELETE_RETRIES} " f"attempts: {topic}" ) @@ -426,8 +424,8 @@ class FlowConfig: result = result.replace(f"{{{param_name}}}", str(param_value)) return result - # Collect queue identifiers before removing config - queues = self._collect_flow_queues(cls, repl_template) + # Collect topic identifiers before removing config + topics = self._collect_flow_topics(cls, repl_template) # Phase 1: Set status to "stopping" and remove processor config. # The config push tells processors to shut down their consumers. @@ -448,8 +446,8 @@ class FlowConfig: await self.config.delete_many(deletes) - # Phase 2: Delete queues with retries, then remove the flow record. - await self._delete_queues(queues) + # Phase 2: Delete topics with retries, then remove the flow record. + await self._delete_topics(topics) if msg.flow_id in await self.config.keys("flow"): await self.config.delete("flow", msg.flow_id) diff --git a/trustgraph-flow/trustgraph/flow/service/service.py b/trustgraph-flow/trustgraph/flow/service/service.py index a3f2fb6b..e1997452 100644 --- a/trustgraph-flow/trustgraph/flow/service/service.py +++ b/trustgraph-flow/trustgraph/flow/service/service.py @@ -101,11 +101,9 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_queue( - self.flow_request_topic, self.flow_request_subscriber - ) + await self.pubsub.ensure_topic(self.flow_request_topic) await self.config_client.start() - await self.flow.ensure_existing_flow_queues() + await self.flow.ensure_existing_flow_topics() await self.flow_request_consumer.start() async def on_flow_request(self, msg, consumer, flow): diff --git a/trustgraph-flow/trustgraph/librarian/service.py b/trustgraph-flow/trustgraph/librarian/service.py index 83f97bf3..ed005298 100755 --- a/trustgraph-flow/trustgraph/librarian/service.py +++ b/trustgraph-flow/trustgraph/librarian/service.py @@ -263,12 +263,8 @@ class Processor(AsyncProcessor): async def start(self): - await self.pubsub.ensure_queue( - self.librarian_request_topic, self.librarian_request_subscriber - ) - await self.pubsub.ensure_queue( - self.collection_request_topic, self.collection_request_subscriber - ) + await self.pubsub.ensure_topic(self.librarian_request_topic) + await self.pubsub.ensure_topic(self.collection_request_topic) await super(Processor, self).start() await self.librarian_request_consumer.start() await self.librarian_response_producer.start()