From 7521e152b9d66ed5724cbe919aaa9667f857341e Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Wed, 22 Apr 2026 15:17:17 +0100 Subject: [PATCH] fix: uuid-ify flow-svc ConfigClient subscription to avoid Pulsar ConsumerBusy on restart (#843) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit flow-svc's long-lived ConfigClient was constructed with subscription=f"{self.id}--config--{id}", where id=params.get("id") is the deterministic processor id. On Pulsar the config-response topic maps to class=response -> Exclusive subscription; when the supervisor restarts flow-svc within Pulsar's inactive-subscription TTL (minutes), the previous process's ghost consumer still holds the subscription and the new process's re-subscribe is rejected with ConsumerBusy, crash-looping flow-svc. This is a v2.2 -> v2.3 regression in practice, but not a change in subscription semantics: the Exclusive mapping for response/notify is identical between releases. The regression is that PR #822 split flow-svc out of config-svc and added this new, long-lived request/response call site — the new site simply didn't follow the uuid convention used by the equivalent sites elsewhere (gateway/config/receiver.py, AsyncProcessor._create_config_client). Fix: generate a fresh uuid per process instance for the subscription suffix, matching that convention. --- trustgraph-flow/trustgraph/flow/service/service.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/trustgraph-flow/trustgraph/flow/service/service.py b/trustgraph-flow/trustgraph/flow/service/service.py index 74077ccb..3dacf47d 100644 --- a/trustgraph-flow/trustgraph/flow/service/service.py +++ b/trustgraph-flow/trustgraph/flow/service/service.py @@ -5,6 +5,7 @@ by coordinating with the config service via pub/sub. """ import logging +import uuid from trustgraph.schema import Error @@ -83,9 +84,17 @@ class Processor(AsyncProcessor): processor=self.id, flow=None, name="config-response", ) + # Unique subscription suffix per process instance. Pulsar's + # exclusive subscriptions reject a second consumer on the same + # (topic, subscription-name) — so a deterministic name here + # collides with its own ghost when the supervisor restarts the + # process before Pulsar has timed out the previous session + # (ConsumerBusy). Matches the uuid convention used elsewhere + # (gateway/config/receiver.py, AsyncProcessor._create_config_client). + config_rr_id = str(uuid.uuid4()) self.config_client = ConfigClient( backend=self.pubsub, - subscription=f"{self.id}--config--{id}", + subscription=f"{self.id}--config--{config_rr_id}", consumer_name=self.id, request_topic=config_request_queue, request_schema=ConfigRequest,