diff --git a/trustgraph-flow/trustgraph/flow/service/service.py b/trustgraph-flow/trustgraph/flow/service/service.py index 74077ccb..3dacf47d 100644 --- a/trustgraph-flow/trustgraph/flow/service/service.py +++ b/trustgraph-flow/trustgraph/flow/service/service.py @@ -5,6 +5,7 @@ by coordinating with the config service via pub/sub. """ import logging +import uuid from trustgraph.schema import Error @@ -83,9 +84,17 @@ class Processor(AsyncProcessor): processor=self.id, flow=None, name="config-response", ) + # Unique subscription suffix per process instance. Pulsar's + # exclusive subscriptions reject a second consumer on the same + # (topic, subscription-name) — so a deterministic name here + # collides with its own ghost when the supervisor restarts the + # process before Pulsar has timed out the previous session + # (ConsumerBusy). Matches the uuid convention used elsewhere + # (gateway/config/receiver.py, AsyncProcessor._create_config_client). + config_rr_id = str(uuid.uuid4()) self.config_client = ConfigClient( backend=self.pubsub, - subscription=f"{self.id}--config--{id}", + subscription=f"{self.id}--config--{config_rr_id}", consumer_name=self.id, request_topic=config_request_queue, request_schema=ConfigRequest,