Close producers on flow stop to prevent stale non-persistent topics (#906)

Flow.stop() only stopped consumers, leaving response producers
connected to non-persistent Pulsar topics. After flow restart, the
orphaned producers held stale broker routing state, causing response
messages to never reach new consumers — manifesting as 120s timeouts
on document-embeddings and similar RPC paths.

Fix: Flow.stop() now explicitly stops all producers. Producer.stop()
closes the underlying Pulsar producer connection rather than just
setting a flag.
This commit is contained in:
Cyber MacGeddon 2026-05-16 15:55:56 +01:00
parent 38d9c746a8
commit 4557131715
2 changed files with 5 additions and 0 deletions

View file

@ -34,6 +34,8 @@ class Flow:
async def stop(self):
for c in self.consumer.values():
await c.stop()
for p in self.producer.values():
await p.stop()
if self.librarian:
await self.librarian.stop()

View file

@ -34,6 +34,9 @@ class Producer:
async def stop(self):
self.running = False
if self.producer:
self.producer.close()
self.producer = None
async def send(self, msg, properties={}):