From 2b70a1ea8e1c39a706db534fb990d192679c56dc Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Sat, 16 May 2026 16:07:16 +0100 Subject: [PATCH] Close producers on flow stop to prevent stale non-persistent topics (#930) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Flow.stop() only stopped consumers, leaving response producers connected to non-persistent Pulsar topics. After flow restart, the orphaned producers held stale broker routing state, causing response messages to never reach new consumers — manifesting as 120s timeouts on document-embeddings and similar RPC paths. Fix: Flow.stop() now explicitly stops all producers. Producer.stop() closes the underlying Pulsar producer connection rather than just setting a flag. Fixes #906 --- trustgraph-base/trustgraph/base/flow.py | 2 ++ trustgraph-base/trustgraph/base/producer.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/trustgraph-base/trustgraph/base/flow.py b/trustgraph-base/trustgraph/base/flow.py index 0f42bbe2..3b928d3e 100644 --- a/trustgraph-base/trustgraph/base/flow.py +++ b/trustgraph-base/trustgraph/base/flow.py @@ -34,6 +34,8 @@ class Flow: async def stop(self): for c in self.consumer.values(): await c.stop() + for p in self.producer.values(): + await p.stop() if self.librarian: await self.librarian.stop() diff --git a/trustgraph-base/trustgraph/base/producer.py b/trustgraph-base/trustgraph/base/producer.py index 20b4b0d6..9af9d22e 100644 --- a/trustgraph-base/trustgraph/base/producer.py +++ b/trustgraph-base/trustgraph/base/producer.py @@ -34,6 +34,9 @@ class Producer: async def stop(self): self.running = False + if self.producer: + self.producer.close() + self.producer = None async def send(self, msg, properties={}):