Close producers on flow stop to prevent stale non-persistent topics (#930)

Flow.stop() only stopped consumers, leaving response producers
connected to non-persistent Pulsar topics. After flow restart, the
orphaned producers held stale broker routing state, causing response
messages to never reach new consumers — manifesting as 120s timeouts
on document-embeddings and similar RPC paths.

Fix: Flow.stop() now explicitly stops all producers. Producer.stop()
closes the underlying Pulsar producer connection rather than just
setting a flag.

Fixes #906
This commit is contained in:
cybermaggedon 2026-05-16 16:07:16 +01:00 committed by GitHub
parent 38d9c746a8
commit 2b70a1ea8e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 5 additions and 0 deletions

View file

@ -34,6 +34,8 @@ class Flow:
async def stop(self):
for c in self.consumer.values():
await c.stop()
for p in self.producer.values():
await p.stop()
if self.librarian:
await self.librarian.stop()

View file

@ -34,6 +34,9 @@ class Producer:
async def stop(self):
self.running = False
if self.producer:
self.producer.close()
self.producer = None
async def send(self, msg, properties={}):