From fd6e3e1269d954cd76b5047a3ad97ff678e6e534 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 19 May 2026 13:26:39 +0100 Subject: [PATCH] fix: stop dropping messages on Pulsar flow restarts (#938) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit consumer.py called unsubscribe() on every flow stop, deleting the server-side subscription cursor. On restart, initial_position='latest' skipped any messages published during the gap — causing intermittent data loss (e.g. graph embeddings silently never reaching Qdrant). Replace unsubscribe() with close() so the cursor survives restarts. Move subscription cleanup to where it belongs: the Pulsar backend's delete_topic(), called by the flow controller on deliberate flow deletion. This was previously a no-op TODO. --- trustgraph-base/trustgraph/base/consumer.py | 26 ++-- trustgraph-base/trustgraph/base/pubsub.py | 10 ++ .../trustgraph/base/pulsar_backend.py | 136 ++++++++++++++++-- 3 files changed, 151 insertions(+), 21 deletions(-) diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 86cc4ceb..b9f2ee0b 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -76,8 +76,10 @@ class Consumer: if hasattr(self, "consumer"): if self.consumer: - self.consumer.unsubscribe() - self.consumer.close() + try: + self.consumer.close() + except Exception: + pass self.consumer = None async def stop(self): @@ -157,12 +159,14 @@ class Consumer: except Exception as e: logger.error(f"Consumer loop exception: {e}", exc_info=True) - for c in consumers: + for i, c in enumerate(consumers): try: - c.unsubscribe() c.close() - except Exception: - pass + except Exception as ce: + logger.warning( + f"Consumer {i} close failed (error path): " + f"{type(ce).__name__}: {ce}" + ) for ex in executors: ex.shutdown(wait=False) consumers = [] @@ -171,12 +175,14 @@ class Consumer: continue finally: - for c in consumers: + for i, c in enumerate(consumers): try: - c.unsubscribe() c.close() - except Exception: - pass + except Exception as ce: + logger.warning( + f"Consumer {i} close failed: " + f"{type(ce).__name__}: {ce}" + ) for ex in executors: ex.shutdown(wait=False) diff --git a/trustgraph-base/trustgraph/base/pubsub.py b/trustgraph-base/trustgraph/base/pubsub.py index fb4765c1..4ae8d2d0 100644 --- a/trustgraph-base/trustgraph/base/pubsub.py +++ b/trustgraph-base/trustgraph/base/pubsub.py @@ -10,6 +10,7 @@ logger = logging.getLogger(__name__) # Default connection settings from environment DEFAULT_PULSAR_HOST = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650') DEFAULT_PULSAR_API_KEY = os.getenv("PULSAR_API_KEY", None) +DEFAULT_PULSAR_ADMIN_URL = os.getenv("PULSAR_ADMIN_URL", 'http://pulsar:8080') DEFAULT_RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", 'rabbitmq') DEFAULT_RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", '5672')) @@ -43,6 +44,7 @@ def get_pubsub(**config: Any) -> Any: host=config.get('pulsar_host', DEFAULT_PULSAR_HOST), api_key=config.get('pulsar_api_key', DEFAULT_PULSAR_API_KEY), listener=config.get('pulsar_listener'), + admin_url=config.get('pulsar_admin_url', DEFAULT_PULSAR_ADMIN_URL), ) elif backend_type == 'rabbitmq': from .rabbitmq_backend import RabbitMQBackend @@ -77,6 +79,7 @@ def get_pubsub(**config: Any) -> Any: STANDALONE_PULSAR_HOST = 'pulsar://localhost:6650' +STANDALONE_PULSAR_ADMIN_URL = 'http://localhost:8080' def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: @@ -88,6 +91,7 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: that run outside containers) """ pulsar_host = STANDALONE_PULSAR_HOST if standalone else DEFAULT_PULSAR_HOST + pulsar_admin_url = STANDALONE_PULSAR_ADMIN_URL if standalone else DEFAULT_PULSAR_ADMIN_URL pulsar_listener = 'localhost' if standalone else None rabbitmq_host = 'localhost' if standalone else DEFAULT_RABBITMQ_HOST kafka_bootstrap = 'localhost:9092' if standalone else DEFAULT_KAFKA_BOOTSTRAP @@ -105,6 +109,12 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None: help=f'Pulsar host (default: {pulsar_host})', ) + parser.add_argument( + '--pulsar-admin-url', + default=pulsar_admin_url, + help=f'Pulsar admin REST API URL (default: {pulsar_admin_url})', + ) + parser.add_argument( '--pulsar-api-key', default=DEFAULT_PULSAR_API_KEY, diff --git a/trustgraph-base/trustgraph/base/pulsar_backend.py b/trustgraph-base/trustgraph/base/pulsar_backend.py index dc5e4083..e85dfbef 100644 --- a/trustgraph-base/trustgraph/base/pulsar_backend.py +++ b/trustgraph-base/trustgraph/base/pulsar_backend.py @@ -7,8 +7,12 @@ handling topic mapping, serialization, and Pulsar client management. import pulsar import _pulsar +import asyncio import json import logging +import urllib.request +import urllib.error +import urllib.parse from typing import Any from .backend import PubSubBackend, BackendProducer, BackendConsumer, Message @@ -117,7 +121,10 @@ class PulsarBackend: producers and consumers. """ - def __init__(self, host: str, api_key: str = None, listener: str = None): + def __init__( + self, host: str, api_key: str = None, listener: str = None, + admin_url: str = None, + ): """ Initialize Pulsar backend. @@ -125,10 +132,12 @@ class PulsarBackend: host: Pulsar broker URL (e.g., pulsar://localhost:6650) api_key: Optional API key for authentication listener: Optional listener name for multi-homed setups + admin_url: Pulsar admin REST API URL (e.g., http://pulsar:8080) """ self.host = host self.api_key = api_key self.listener = listener + self.admin_url = admin_url # Create Pulsar client client_args = {'service_url': host} @@ -270,24 +279,129 @@ class PulsarBackend: return PulsarBackendConsumer(pulsar_consumer, schema) + def _admin_api_path(self, pulsar_uri: str) -> str: + """ + Convert a Pulsar topic URI to an admin REST API path. + + persistent://tg/flow/triples-store:default:explain-flow + -> /admin/v2/persistent/tg/flow/triples-store%3Adefault%3Aexplain-flow + """ + scheme, rest = pulsar_uri.split('://', 1) + tenant, namespace, topic = rest.split('/', 2) + encoded_topic = urllib.parse.quote(topic, safe='') + return f"/admin/v2/{scheme}/{tenant}/{namespace}/{encoded_topic}" + + def _admin_request(self, method, path): + """ + Make a synchronous admin REST API request. + + Returns parsed JSON for GET, None for DELETE/PUT. + Raises urllib.error.HTTPError for non-404 errors. + 404 is treated as success (idempotent deletion). + """ + url = f"{self.admin_url}{path}" + req = urllib.request.Request(url, method=method) + + try: + with urllib.request.urlopen(req) as resp: + if method == 'GET': + return json.loads(resp.read().decode('utf-8')) + return None + except urllib.error.HTTPError as e: + if e.code == 404: + return None + raise + + def _delete_topic_sync(self, topic: str): + """ + Delete a persistent topic and all its subscriptions. + + Subscriptions must be removed first — Pulsar rejects topic + deletion while subscriptions exist. Force-deletes each + subscription to disconnect any lingering consumers. + """ + pulsar_uri = self.map_topic(topic) + + if pulsar_uri.startswith('non-persistent://'): + return + + api_path = self._admin_api_path(pulsar_uri) + + try: + subs = self._admin_request('GET', f"{api_path}/subscriptions") + except Exception as e: + logger.warning(f"Failed to list subscriptions for {topic}: {e}") + return + + if subs: + for sub in subs: + encoded_sub = urllib.parse.quote(sub, safe='') + try: + self._admin_request( + 'DELETE', + f"{api_path}/subscription/{encoded_sub}" + f"?force=true" + ) + logger.info( + f"Deleted subscription {sub} from {topic}" + ) + except Exception as e: + logger.warning( + f"Failed to delete subscription {sub} " + f"from {topic}: {e}" + ) + + try: + self._admin_request('DELETE', api_path) + logger.info(f"Deleted topic: {topic}") + except Exception as e: + logger.warning(f"Failed to delete topic {topic}: {e}") + + def _topic_exists_sync(self, topic: str) -> bool: + """Check topic existence via admin API.""" + pulsar_uri = self.map_topic(topic) + + if pulsar_uri.startswith('non-persistent://'): + return False + + api_path = self._admin_api_path(pulsar_uri) + + try: + result = self._admin_request('GET', f"{api_path}/stats") + return result is not None + except Exception: + return False + async def create_topic(self, topic: str) -> None: - """No-op — Pulsar auto-creates topics on first use. - TODO: Use admin REST API for explicit persistent topic creation.""" + """No-op — Pulsar auto-creates topics on first use.""" pass async def delete_topic(self, topic: str) -> None: - """No-op — to be replaced with admin REST API calls. - TODO: Delete persistent topic via admin API.""" - pass + """ + Delete a persistent topic and all its subscriptions via + the admin REST API. + + Called by the flow controller during deliberate flow deletion. + Non-persistent topics are skipped. Idempotent. + """ + if not self.admin_url: + logger.warning( + f"Cannot delete topic {topic}: " + f"no admin URL configured" + ) + return + + await asyncio.to_thread(self._delete_topic_sync, topic) async def topic_exists(self, topic: str) -> bool: - """Returns True — Pulsar auto-creates on subscribe. - TODO: Use admin REST API for actual existence check.""" - return True + """Check whether a persistent topic exists via the admin API.""" + if not self.admin_url: + return True + + return await asyncio.to_thread(self._topic_exists_sync, topic) async def ensure_topic(self, topic: str) -> None: - """No-op — Pulsar auto-creates topics on first use. - TODO: Use admin REST API for explicit creation.""" + """No-op — Pulsar auto-creates topics on first use.""" pass def close(self) -> None: