diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index 592120b1..817a87e9 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -127,7 +127,30 @@ class DispatcherManager: async def stop_flow(self, id, flow): logger.info(f"Stopping flow {id}") - del self.flows[id] + self.flows.pop(id, None) + + # Drop any cached dispatchers for this flow. Their publishers + # and subscribers were wired to the flow's per-flow exchanges, + # which flow-svc tears down when the flow stops. Leaving the + # cached dispatcher in place means a subsequent restart of the + # same flow id would reuse a dispatcher whose subscription + # queue is still bound to the torn-down (now re-created) + # response exchange — requests go out but responses are + # silently dropped and the caller hangs. + async with self.dispatcher_lock: + stale_keys = [ + k for k in self.dispatchers + if isinstance(k, tuple) and len(k) == 2 and k[0] == id + ] + for key in stale_keys: + dispatcher = self.dispatchers.pop(key) + try: + await dispatcher.stop() + except Exception as e: + logger.warning( + f"Error stopping cached dispatcher {key}: {e}" + ) + return def dispatch_global_service(self):