mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 00:16:23 +02:00
fix: api-gateway evicts cached dispatchers when a flow stops (#841)
DispatcherManager caches one ServiceRequestor per (flow_id, kind) in
self.dispatchers, lazily created on first use. stop_flow dropped the
flow from self.flows but never touched the cached dispatchers, so
their publisher/subscriber connections persisted — bound to the
per-flow exchanges that flow-svc tears down when the flow stops.
If the same flow id was later re-created, flow-svc re-declared fresh
per-flow exchanges, but the gateway's cached dispatcher still held a
subscription queue bound to the now-gone old response exchange.
Requests went out fine (publishers target exchanges by name and the
new exchange has the right name), but responses landed on an exchange
with no binding to the dispatcher's queue and were silently dropped.
The calling CLI or websocket session hung waiting for a reply that
would never arrive.
Reproduction before fix:
tg-start-flow -i test-flow-1 ...
# any query on test-flow-1 works
tg-stop-flow -i test-flow-1
tg-start-flow -i test-flow-1 ...
tg-show-graph -f test-flow-1 -C <collection> # hangs
Flows that were never stopped (e.g. "default" in a typical session)
were unaffected — their cached dispatcher still pointed at live
plumbing. That's why the bug appeared flow-name-specific at first
glance; it's actually lifecycle-specific.
Fix: in stop_flow, evict and cleanly stop() every cached dispatcher
keyed on the stopped flow id. Next request after restart constructs
a fresh dispatcher against the freshly-declared exchanges. Tuple
shape check preserves global dispatchers, which use (None, kind) as
their key and must survive flow churn.
Uses pop(id, None) instead of del in case stop_flow is invoked
defensively for a flow the gateway never saw.
This commit is contained in:
parent
d35473f7f7
commit
8be128aa59
1 changed files with 29 additions and 1 deletions
|
|
@ -129,7 +129,35 @@ class DispatcherManager:
|
|||
|
||||
async def stop_flow(self, workspace, id, flow):
|
||||
logger.info(f"Stopping flow {workspace}/{id}")
|
||||
del self.flows[(workspace, id)]
|
||||
self.flows.pop((workspace, id), None)
|
||||
|
||||
# Drop any cached dispatchers for this (workspace, flow).
|
||||
# Their publishers and subscribers were wired to the flow's
|
||||
# per-flow exchanges, which flow-svc tears down when the flow
|
||||
# stops. Leaving the cached dispatcher in place means a
|
||||
# subsequent restart of the same flow id would reuse a
|
||||
# dispatcher whose subscription queue is still bound to the
|
||||
# torn-down (now re-created) response exchange — requests go
|
||||
# out but responses are silently dropped and the caller hangs.
|
||||
#
|
||||
# Per-flow dispatchers are keyed (workspace, flow_id, kind).
|
||||
# Global dispatchers are keyed (None, kind) — the len==3
|
||||
# check naturally excludes them.
|
||||
async with self.dispatcher_lock:
|
||||
stale_keys = [
|
||||
k for k in self.dispatchers
|
||||
if isinstance(k, tuple) and len(k) == 3
|
||||
and k[0] == workspace and k[1] == id
|
||||
]
|
||||
for key in stale_keys:
|
||||
dispatcher = self.dispatchers.pop(key)
|
||||
try:
|
||||
await dispatcher.stop()
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Error stopping cached dispatcher {key}: {e}"
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
def dispatch_global_service(self):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue