From feeb92b33fcf6e17924f026c763c6b474af79a31 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Thu, 9 Apr 2026 09:55:41 +0100 Subject: [PATCH] Refactor: Derive consumer behaviour from queue class (#772) Derive consumer behaviour from queue class, remove consumer_type parameter The queue class prefix (flow, request, response, notify) now fully determines consumer behaviour in both RabbitMQ and Pulsar backends. Added 'notify' class for ephemeral broadcast (config push notifications). Response and notify classes always create per-subscriber auto-delete queues, eliminating orphaned queues that accumulated on service restarts. Change init-trustgraph to set up the 'notify' namespace in Pulsar instead of old hangover 'state'. Fixes 'stuck backlog' on RabbitMQ config notification queue. --- .../unit/test_gateway/test_config_receiver.py | 10 +-- tests/unit/test_pubsub/test_queue_naming.py | 12 ++-- .../unit/test_pubsub/test_rabbitmq_backend.py | 8 +-- trustgraph-base/trustgraph/base/backend.py | 10 ++- trustgraph-base/trustgraph/base/consumer.py | 7 +- .../trustgraph/base/librarian_client.py | 1 - .../trustgraph/base/pulsar_backend.py | 23 ++++--- .../trustgraph/base/rabbitmq_backend.py | 29 ++++----- trustgraph-base/trustgraph/base/subscriber.py | 1 - trustgraph-base/trustgraph/clients/base.py | 1 - .../trustgraph/schema/core/topic.py | 12 ++-- .../trustgraph/schema/services/config.py | 2 +- .../trustgraph/cli/init_trustgraph.py | 6 +- .../trustgraph/cli/monitor_prompts.py | 2 - .../trustgraph/gateway/config/receiver.py | 64 ++++++++++--------- 15 files changed, 93 insertions(+), 95 deletions(-) diff --git a/tests/unit/test_gateway/test_config_receiver.py b/tests/unit/test_gateway/test_config_receiver.py index 49dc48d8..c2a149d5 100644 --- a/tests/unit/test_gateway/test_config_receiver.py +++ b/tests/unit/test_gateway/test_config_receiver.py @@ -151,7 +151,7 @@ class TestConfigReceiver: mock_backend = Mock() config_receiver = ConfigReceiver(mock_backend) - # Mock config_client + # Mock _create_config_client to return a mock client mock_resp = Mock() mock_resp.error = None mock_resp.version = 5 @@ -164,7 +164,7 @@ class TestConfigReceiver: mock_client = AsyncMock() mock_client.request.return_value = mock_resp - config_receiver.config_client = mock_client + config_receiver._create_config_client = Mock(return_value=mock_client) start_flow_calls = [] async def mock_start_flow(id, flow): @@ -202,7 +202,7 @@ class TestConfigReceiver: mock_client = AsyncMock() mock_client.request.return_value = mock_resp - config_receiver.config_client = mock_client + config_receiver._create_config_client = Mock(return_value=mock_client) stop_flow_calls = [] async def mock_stop_flow(id, flow): @@ -229,7 +229,7 @@ class TestConfigReceiver: mock_client = AsyncMock() mock_client.request.return_value = mock_resp - config_receiver.config_client = mock_client + config_receiver._create_config_client = Mock(return_value=mock_client) await config_receiver.fetch_and_apply() @@ -353,7 +353,7 @@ class TestConfigReceiver: mock_client = AsyncMock() mock_client.request.return_value = mock_resp - config_receiver.config_client = mock_client + config_receiver._create_config_client = Mock(return_value=mock_client) start_calls = [] stop_calls = [] diff --git a/tests/unit/test_pubsub/test_queue_naming.py b/tests/unit/test_pubsub/test_queue_naming.py index 8ab09e5a..243b0e33 100644 --- a/tests/unit/test_pubsub/test_queue_naming.py +++ b/tests/unit/test_pubsub/test_queue_naming.py @@ -21,8 +21,8 @@ class TestQueueFunction: def test_response_class(self): assert queue('config', cls='response') == 'response:tg:config' - def test_state_class(self): - assert queue('config', cls='state') == 'state:tg:config' + def test_notify_class(self): + assert queue('config', cls='notify') == 'notify:tg:config' def test_custom_topicspace(self): assert queue('config', cls='request', topicspace='prod') == 'request:prod:config' @@ -44,9 +44,9 @@ class TestPulsarMapTopic: assert backend.map_topic('flow:tg:text-completion-request') == \ 'persistent://tg/flow/text-completion-request' - def test_state_maps_to_persistent(self, backend): - assert backend.map_topic('state:tg:config') == \ - 'persistent://tg/state/config' + def test_notify_maps_to_non_persistent(self, backend): + assert backend.map_topic('notify:tg:config') == \ + 'non-persistent://tg/notify/config' def test_request_maps_to_non_persistent(self, backend): assert backend.map_topic('request:tg:config') == \ @@ -153,7 +153,7 @@ class TestQueueDefinitions: def test_config_push(self): from trustgraph.schema.services.config import config_push_queue - assert config_push_queue == 'flow:tg:config' + assert config_push_queue == 'notify:tg:config' def test_librarian_request(self): from trustgraph.schema.services.library import librarian_request_queue diff --git a/tests/unit/test_pubsub/test_rabbitmq_backend.py b/tests/unit/test_pubsub/test_rabbitmq_backend.py index 578db3b6..ffe18fd7 100644 --- a/tests/unit/test_pubsub/test_rabbitmq_backend.py +++ b/tests/unit/test_pubsub/test_rabbitmq_backend.py @@ -24,10 +24,10 @@ class TestRabbitMQMapQueueName: assert durable is True assert name == 'tg.flow.text-completion-request' - def test_state_is_durable(self, backend): - name, durable = backend.map_queue_name('state:tg:config') - assert durable is True - assert name == 'tg.state.config' + def test_notify_is_not_durable(self, backend): + name, durable = backend.map_queue_name('notify:tg:config') + assert durable is False + assert name == 'tg.notify.config' def test_request_is_not_durable(self, backend): name, durable = backend.map_queue_name('request:tg:config') diff --git a/trustgraph-base/trustgraph/base/backend.py b/trustgraph-base/trustgraph/base/backend.py index b9f5f923..9b9a42af 100644 --- a/trustgraph-base/trustgraph/base/backend.py +++ b/trustgraph-base/trustgraph/base/backend.py @@ -124,18 +124,22 @@ class PubSubBackend(Protocol): subscription: str, schema: type, initial_position: str = 'latest', - consumer_type: str = 'shared', **options ) -> BackendConsumer: """ Create a consumer for a topic. + Consumer behaviour is determined by the topic's class prefix: + - flow: shared competing consumers, durable named queue + - request: shared competing consumers, non-durable named queue + - response: exclusive per-subscriber, anonymous auto-delete queue + - notify: exclusive per-subscriber, anonymous auto-delete queue + Args: - topic: Generic topic format (qos/tenant/namespace/queue) + topic: Queue identifier in class:topicspace:topic format subscription: Subscription/consumer group name schema: Dataclass type for messages initial_position: 'earliest' or 'latest' (some backends may ignore) - consumer_type: 'shared', 'exclusive', 'failover' (some backends may ignore) **options: Backend-specific options Returns: diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index b6c28bbe..1b9e5999 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -33,19 +33,17 @@ class Consumer: rate_limit_retry_time = 10, rate_limit_timeout = 7200, reconnect_time = 5, concurrency = 1, # Number of concurrent requests to handle - consumer_type = 'shared', + **kwargs, ): self.taskgroup = taskgroup self.flow = flow - self.backend = backend # Changed from 'client' to 'backend' + self.backend = backend self.topic = topic self.subscriber = subscriber self.schema = schema self.handler = handler - self.consumer_type = consumer_type - self.rate_limit_retry_time = rate_limit_retry_time self.rate_limit_timeout = rate_limit_timeout @@ -129,7 +127,6 @@ class Consumer: subscription = self.subscriber, schema = self.schema, initial_position = initial_pos, - consumer_type = self.consumer_type, ), ) consumers.append(c) diff --git a/trustgraph-base/trustgraph/base/librarian_client.py b/trustgraph-base/trustgraph/base/librarian_client.py index 6191cff8..5ad97f47 100644 --- a/trustgraph-base/trustgraph/base/librarian_client.py +++ b/trustgraph-base/trustgraph/base/librarian_client.py @@ -64,7 +64,6 @@ class LibrarianClient: schema=LibrarianResponse, handler=self._on_response, metrics=librarian_response_metrics, - consumer_type='exclusive', ) # Single-response requests: request_id -> asyncio.Future diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index 9480243e..a567191e 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -159,14 +159,16 @@ class PulsarBackend: cls, topicspace, topic = parts # Map class to Pulsar persistence and namespace - if cls in ('flow', 'state'): + if cls == 'flow': persistence = 'persistent' elif cls in ('request', 'response'): persistence = 'non-persistent' + elif cls == 'notify': + persistence = 'non-persistent' else: raise ValueError( f"Invalid queue class: {cls}, " - f"expected flow, request, response, or state" + f"expected flow, request, response, or notify" ) return f"{persistence}://{topicspace}/{cls}/{topic}" @@ -205,18 +207,20 @@ class PulsarBackend: subscription: str, schema: type, initial_position: str = 'latest', - consumer_type: str = 'shared', **options ) -> BackendConsumer: """ Create a Pulsar consumer. + Consumer type is derived from the topic's class prefix: + - flow/request: Shared (competing consumers) + - response/notify: Exclusive (per-subscriber) + Args: - topic: Generic topic format (qos/tenant/namespace/queue) + topic: Queue identifier in class:topicspace:topic format subscription: Subscription name schema: Dataclass type for messages initial_position: 'earliest' or 'latest' - consumer_type: 'shared', 'exclusive', or 'failover' **options: Backend-specific options Returns: @@ -224,17 +228,18 @@ class PulsarBackend: """ pulsar_topic = self.map_topic(topic) + # Extract class from topic for consumer type mapping + cls = topic.split(':', 1)[0] if ':' in topic else 'flow' + # Map initial position if initial_position == 'earliest': pos = pulsar.InitialPosition.Earliest else: pos = pulsar.InitialPosition.Latest - # Map consumer type - if consumer_type == 'exclusive': + # Map consumer type from class + if cls in ('response', 'notify'): ctype = pulsar.ConsumerType.Exclusive - elif consumer_type == 'failover': - ctype = pulsar.ConsumerType.Failover else: ctype = pulsar.ConsumerType.Shared diff --git a/trustgraph-base/trustgraph/base/rabbitmq_backend.py b/trustgraph-base/trustgraph/base/rabbitmq_backend.py index b9afe741..3fafcead 100644 --- a/trustgraph-base/trustgraph/base/rabbitmq_backend.py +++ b/trustgraph-base/trustgraph/base/rabbitmq_backend.py @@ -311,14 +311,14 @@ class RabbitMQBackend: cls, topicspace, topic = parts - if cls in ('flow', 'state'): + if cls == 'flow': durable = True - elif cls in ('request', 'response'): + elif cls in ('request', 'response', 'notify'): durable = False else: raise ValueError( f"Invalid queue class: {cls}, " - f"expected flow, request, response, or state" + f"expected flow, request, response, or notify" ) # Exchange per topicspace, routing key includes class @@ -345,26 +345,19 @@ class RabbitMQBackend: def create_consumer(self, topic: str, subscription: str, schema: type, initial_position: str = 'latest', - consumer_type: str = 'shared', **options) -> BackendConsumer: """Create a consumer with a queue bound to the topic exchange. - consumer_type='shared': Named durable queue. Multiple consumers - with the same subscription compete (round-robin). - consumer_type='exclusive': Anonymous ephemeral queue. Each - consumer gets its own copy of every message (broadcast). + Behaviour is determined by the topic's class prefix: + - flow: named durable queue, competing consumers (round-robin) + - request: named non-durable queue, competing consumers + - response: anonymous ephemeral queue, per-subscriber (auto-delete) + - notify: anonymous ephemeral queue, per-subscriber (auto-delete) """ exchange, routing_key, cls, durable = self._parse_queue_id(topic) - if consumer_type == 'exclusive' and cls == 'state': - # State broadcast: named durable queue per subscriber. - # Retains messages so late-starting processors see current state. - queue_name = f"{exchange}.{routing_key}.{subscription}" - queue_durable = True - exclusive = False - auto_delete = False - elif consumer_type == 'exclusive': - # Broadcast: anonymous queue, auto-deleted on disconnect + if cls in ('response', 'notify'): + # Per-subscriber: anonymous queue, auto-deleted on disconnect queue_name = '' queue_durable = False exclusive = True @@ -379,7 +372,7 @@ class RabbitMQBackend: logger.debug( f"Creating consumer: exchange={exchange}, " f"routing_key={routing_key}, queue={queue_name or '(anonymous)'}, " - f"type={consumer_type}" + f"cls={cls}" ) return RabbitMQBackendConsumer( diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index 6cb234b1..8c68e51c 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -84,7 +84,6 @@ class Subscriber: topic=self.topic, subscription=self.subscription, schema=self.schema, - consumer_type='exclusive', ), ) diff --git a/trustgraph-base/trustgraph/clients/base.py b/trustgraph-base/trustgraph/clients/base.py index cd4ad72e..02575c21 100644 --- a/trustgraph-base/trustgraph/clients/base.py +++ b/trustgraph-base/trustgraph/clients/base.py @@ -42,7 +42,6 @@ class BaseClient: topic=output_queue, subscription=subscriber, schema=output_schema, - consumer_type='shared', ) self.input_schema = input_schema diff --git a/trustgraph-base/trustgraph/schema/core/topic.py b/trustgraph-base/trustgraph/schema/core/topic.py index 036ea142..c42a6c9c 100644 --- a/trustgraph-base/trustgraph/schema/core/topic.py +++ b/trustgraph-base/trustgraph/schema/core/topic.py @@ -6,10 +6,10 @@ def queue(topic, cls='flow', topicspace='tg'): Args: topic: The logical queue name (e.g. 'config', 'librarian') cls: Queue class determining operational characteristics: - - 'flow' = persistent processing pipeline queue - - 'request' = non-persistent, short TTL request queue - - 'response' = non-persistent, short TTL response queue - - 'state' = persistent, last-value state broadcast + - 'flow' = persistent shared work queue (competing consumers) + - 'request' = non-persistent RPC request queue (shared) + - 'response' = non-persistent RPC response queue (per-subscriber) + - 'notify' = ephemeral broadcast (per-subscriber, auto-delete) topicspace: Deployment isolation prefix (default: 'tg') Returns: @@ -20,7 +20,7 @@ def queue(topic, cls='flow', topicspace='tg'): # flow:tg:text-completion-request queue('config', cls='request') # request:tg:config - queue('config', cls='state') - # state:tg:config + queue('config', cls='notify') + # notify:tg:config """ return f"{cls}:{topicspace}:{topic}" diff --git a/trustgraph-base/trustgraph/schema/services/config.py b/trustgraph-base/trustgraph/schema/services/config.py index fb219bd9..c08e96d7 100644 --- a/trustgraph-base/trustgraph/schema/services/config.py +++ b/trustgraph-base/trustgraph/schema/services/config.py @@ -62,7 +62,7 @@ class ConfigPush: config_request_queue = queue('config', cls='request') config_response_queue = queue('config', cls='response') -config_push_queue = queue('config', cls='flow') +config_push_queue = queue('config', cls='notify') ############################################################################ diff --git a/trustgraph-cli/trustgraph/cli/init_trustgraph.py b/trustgraph-cli/trustgraph/cli/init_trustgraph.py index 514dc75b..18c240ef 100644 --- a/trustgraph-cli/trustgraph/cli/init_trustgraph.py +++ b/trustgraph-cli/trustgraph/cli/init_trustgraph.py @@ -138,10 +138,10 @@ def init_pulsar(pulsar_admin_url, tenant): } }) - ensure_namespace(pulsar_admin_url, tenant, "state", { + ensure_namespace(pulsar_admin_url, tenant, "notify", { "retention_policies": { - "retentionSizeInMB": 10, - "retentionTimeInMinutes": -1, + "retentionSizeInMB": -1, + "retentionTimeInMinutes": 3, "subscriptionExpirationTimeMinutes": 5, } }) diff --git a/trustgraph-cli/trustgraph/cli/monitor_prompts.py b/trustgraph-cli/trustgraph/cli/monitor_prompts.py index 0cfe68ac..4e0e8456 100644 --- a/trustgraph-cli/trustgraph/cli/monitor_prompts.py +++ b/trustgraph-cli/trustgraph/cli/monitor_prompts.py @@ -123,7 +123,6 @@ async def monitor(flow, queue_type, max_lines, max_width, **config): topic=request_queue, subscription="prompt-monitor-req", schema=None, - consumer_type='shared', initial_position='latest', ) @@ -131,7 +130,6 @@ async def monitor(flow, queue_type, max_lines, max_width, **config): topic=response_queue, subscription="prompt-monitor-resp", schema=None, - consumer_type='shared', initial_position='latest', ) diff --git a/trustgraph-flow/trustgraph/gateway/config/receiver.py b/trustgraph-flow/trustgraph/gateway/config/receiver.py index 97f4e7eb..2323cd61 100755 --- a/trustgraph-flow/trustgraph/gateway/config/receiver.py +++ b/trustgraph-flow/trustgraph/gateway/config/receiver.py @@ -73,6 +73,31 @@ class ConfigReceiver: f"Config notify processing exception: {e}", exc_info=True ) + def _create_config_client(self): + """Create a short-lived config request/response client.""" + id = str(uuid.uuid4()) + + config_req_metrics = ProducerMetrics( + processor="api-gateway", flow=None, + name="config-request", + ) + config_resp_metrics = SubscriberMetrics( + processor="api-gateway", flow=None, + name="config-response", + ) + + return RequestResponse( + backend=self.backend, + subscription=f"api-gateway--config--{id}", + consumer_name="api-gateway", + request_topic=config_request_queue, + request_schema=ConfigRequest, + request_metrics=config_req_metrics, + response_topic=config_response_queue, + response_schema=ConfigResponse, + response_metrics=config_resp_metrics, + ) + async def fetch_and_apply(self, retry=False): """Fetch full config and apply flow changes. If retry=True, keeps retrying until successful.""" @@ -82,10 +107,15 @@ class ConfigReceiver: try: logger.info("Fetching config from config service...") - resp = await self.config_client.request( - ConfigRequest(operation="config"), - timeout=10, - ) + client = self._create_config_client() + try: + await client.start() + resp = await client.request( + ConfigRequest(operation="config"), + timeout=10, + ) + finally: + await client.stop() logger.info(f"Config response received") @@ -170,32 +200,6 @@ class ConfigReceiver: id = str(uuid.uuid4()) - # Config request/response client - config_req_metrics = ProducerMetrics( - processor="api-gateway", flow=None, - name="config-request", - ) - config_resp_metrics = SubscriberMetrics( - processor="api-gateway", flow=None, - name="config-response", - ) - - self.config_client = RequestResponse( - backend=self.backend, - subscription=f"api-gateway--config--{id}", - consumer_name="api-gateway", - request_topic=config_request_queue, - request_schema=ConfigRequest, - request_metrics=config_req_metrics, - response_topic=config_response_queue, - response_schema=ConfigResponse, - response_metrics=config_resp_metrics, - ) - - logger.info("Starting config request/response client...") - await self.config_client.start() - logger.info("Config request/response client started") - # Subscribe to notify queue self.config_cons = Consumer( taskgroup=tg,