From 8be128aa59561f28ecfa9bd2df23e48afe261bad Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Wed, 22 Apr 2026 12:05:24 +0100 Subject: [PATCH] fix: api-gateway evicts cached dispatchers when a flow stops (#841) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DispatcherManager caches one ServiceRequestor per (flow_id, kind) in self.dispatchers, lazily created on first use. stop_flow dropped the flow from self.flows but never touched the cached dispatchers, so their publisher/subscriber connections persisted — bound to the per-flow exchanges that flow-svc tears down when the flow stops. If the same flow id was later re-created, flow-svc re-declared fresh per-flow exchanges, but the gateway's cached dispatcher still held a subscription queue bound to the now-gone old response exchange. Requests went out fine (publishers target exchanges by name and the new exchange has the right name), but responses landed on an exchange with no binding to the dispatcher's queue and were silently dropped. The calling CLI or websocket session hung waiting for a reply that would never arrive. Reproduction before fix: tg-start-flow -i test-flow-1 ... # any query on test-flow-1 works tg-stop-flow -i test-flow-1 tg-start-flow -i test-flow-1 ... tg-show-graph -f test-flow-1 -C # hangs Flows that were never stopped (e.g. "default" in a typical session) were unaffected — their cached dispatcher still pointed at live plumbing. That's why the bug appeared flow-name-specific at first glance; it's actually lifecycle-specific. Fix: in stop_flow, evict and cleanly stop() every cached dispatcher keyed on the stopped flow id. Next request after restart constructs a fresh dispatcher against the freshly-declared exchanges. Tuple shape check preserves global dispatchers, which use (None, kind) as their key and must survive flow churn. Uses pop(id, None) instead of del in case stop_flow is invoked defensively for a flow the gateway never saw. --- .../trustgraph/gateway/dispatch/manager.py | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index f3db3290..b238bb5b 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -129,7 +129,35 @@ class DispatcherManager: async def stop_flow(self, workspace, id, flow): logger.info(f"Stopping flow {workspace}/{id}") - del self.flows[(workspace, id)] + self.flows.pop((workspace, id), None) + + # Drop any cached dispatchers for this (workspace, flow). + # Their publishers and subscribers were wired to the flow's + # per-flow exchanges, which flow-svc tears down when the flow + # stops. Leaving the cached dispatcher in place means a + # subsequent restart of the same flow id would reuse a + # dispatcher whose subscription queue is still bound to the + # torn-down (now re-created) response exchange — requests go + # out but responses are silently dropped and the caller hangs. + # + # Per-flow dispatchers are keyed (workspace, flow_id, kind). + # Global dispatchers are keyed (None, kind) — the len==3 + # check naturally excludes them. + async with self.dispatcher_lock: + stale_keys = [ + k for k in self.dispatchers + if isinstance(k, tuple) and len(k) == 3 + and k[0] == workspace and k[1] == id + ] + for key in stale_keys: + dispatcher = self.dispatchers.pop(key) + try: + await dispatcher.stop() + except Exception as e: + logger.warning( + f"Error stopping cached dispatcher {key}: {e}" + ) + return def dispatch_global_service(self):