diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index f3db3290..b238bb5b 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -129,7 +129,35 @@ class DispatcherManager: async def stop_flow(self, workspace, id, flow): logger.info(f"Stopping flow {workspace}/{id}") - del self.flows[(workspace, id)] + self.flows.pop((workspace, id), None) + + # Drop any cached dispatchers for this (workspace, flow). + # Their publishers and subscribers were wired to the flow's + # per-flow exchanges, which flow-svc tears down when the flow + # stops. Leaving the cached dispatcher in place means a + # subsequent restart of the same flow id would reuse a + # dispatcher whose subscription queue is still bound to the + # torn-down (now re-created) response exchange — requests go + # out but responses are silently dropped and the caller hangs. + # + # Per-flow dispatchers are keyed (workspace, flow_id, kind). + # Global dispatchers are keyed (None, kind) — the len==3 + # check naturally excludes them. + async with self.dispatcher_lock: + stale_keys = [ + k for k in self.dispatchers + if isinstance(k, tuple) and len(k) == 3 + and k[0] == workspace and k[1] == id + ] + for key in stale_keys: + dispatcher = self.dispatchers.pop(key) + try: + await dispatcher.stop() + except Exception as e: + logger.warning( + f"Error stopping cached dispatcher {key}: {e}" + ) + return def dispatch_global_service(self):