trustgraph/trustgraph-base/trustgraph/base/producer.py
cybermaggedon 2b70a1ea8e
Close producers on flow stop to prevent stale non-persistent topics (#930)
Flow.stop() only stopped consumers, leaving response producers
connected to non-persistent Pulsar topics. After flow restart, the
orphaned producers held stale broker routing state, causing response
messages to never reach new consumers — manifesting as 120s timeouts
on document-embeddings and similar RPC paths.

Fix: Flow.stop() now explicitly stops all producers. Producer.stop()
closes the underlying Pulsar producer connection rather than just
setting a flag.

Fixes #906
2026-05-16 16:07:16 +01:00

80 lines
2 KiB
Python

import asyncio
import logging
# Module logger
logger = logging.getLogger(__name__)
class Producer:
def __init__(self, backend, topic, schema, metrics=None,
chunking_enabled=True):
self.backend = backend # Changed from 'client' to 'backend'
self.topic = topic
self.schema = schema
self.metrics = metrics
self.running = True
self.producer = None
self.chunking_enabled = chunking_enabled
def __del__(self):
self.running = False
if hasattr(self, "producer"):
if self.producer:
self.producer.close()
async def start(self):
self.running = True
async def stop(self):
self.running = False
if self.producer:
self.producer.close()
self.producer = None
async def send(self, msg, properties={}):
if not self.running: return
while self.running and self.producer is None:
try:
logger.info(f"Connecting publisher to {self.topic}...")
self.producer = self.backend.create_producer(
topic = self.topic,
schema = self.schema,
chunking_enabled = self.chunking_enabled,
)
logger.info(f"Connected publisher to {self.topic}")
except Exception as e:
logger.error(f"Exception connecting publisher: {e}", exc_info=True)
await asyncio.sleep(2)
if not self.running: break
while self.running:
try:
await asyncio.to_thread(
self.producer.send,
msg, properties
)
if self.metrics:
self.metrics.inc()
# Delivery success, break out of loop
break
except Exception as e:
logger.error(f"Exception sending message: {e}", exc_info=True)
self.producer.close()
self.producer = None