mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 00:16:23 +02:00
fix: uuid-ify flow-svc ConfigClient subscription to avoid Pulsar ConsumerBusy on restart (#843)
flow-svc's long-lived ConfigClient was constructed with
subscription=f"{self.id}--config--{id}", where id=params.get("id") is
the deterministic processor id. On Pulsar the config-response topic
maps to class=response -> Exclusive subscription; when the supervisor
restarts flow-svc within Pulsar's inactive-subscription TTL (minutes),
the previous process's ghost consumer still holds the subscription
and the new process's re-subscribe is rejected with ConsumerBusy,
crash-looping flow-svc.
This is a v2.2 -> v2.3 regression in practice, but not a change in
subscription semantics: the Exclusive mapping for response/notify is
identical between releases. The regression is that PR #822 split
flow-svc out of config-svc and added this new, long-lived
request/response call site — the new site simply didn't follow the
uuid convention used by the equivalent sites elsewhere
(gateway/config/receiver.py, AsyncProcessor._create_config_client).
Fix: generate a fresh uuid per process instance for the subscription
suffix, matching that convention.
This commit is contained in:
parent
f04f7fa154
commit
fad005e030
1 changed files with 10 additions and 1 deletions
|
|
@ -5,6 +5,7 @@ by coordinating with the config service via pub/sub.
|
|||
"""
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from trustgraph.schema import Error
|
||||
|
||||
|
|
@ -83,9 +84,17 @@ class Processor(AsyncProcessor):
|
|||
processor=self.id, flow=None, name="config-response",
|
||||
)
|
||||
|
||||
# Unique subscription suffix per process instance. Pulsar's
|
||||
# exclusive subscriptions reject a second consumer on the same
|
||||
# (topic, subscription-name) — so a deterministic name here
|
||||
# collides with its own ghost when the supervisor restarts the
|
||||
# process before Pulsar has timed out the previous session
|
||||
# (ConsumerBusy). Matches the uuid convention used elsewhere
|
||||
# (gateway/config/receiver.py, AsyncProcessor._create_config_client).
|
||||
config_rr_id = str(uuid.uuid4())
|
||||
self.config_client = ConfigClient(
|
||||
backend=self.pubsub,
|
||||
subscription=f"{self.id}--config--{id}",
|
||||
subscription=f"{self.id}--config--{config_rr_id}",
|
||||
consumer_name=self.id,
|
||||
request_topic=config_request_queue,
|
||||
request_schema=ConfigRequest,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue