fix: stop dropping messages on Pulsar flow restarts (#938)

consumer.py called unsubscribe() on every flow stop, deleting the
server-side subscription cursor. On restart, initial_position='latest'
skipped any messages published during the gap — causing intermittent
data loss (e.g. graph embeddings silently never reaching Qdrant).

Replace unsubscribe() with close() so the cursor survives restarts.
Move subscription cleanup to where it belongs: the Pulsar backend's
delete_topic(), called by the flow controller on deliberate flow
deletion. This was previously a no-op TODO.
This commit is contained in:
cybermaggedon 2026-05-19 13:26:39 +01:00 committed by GitHub
parent 47dfc30c1c
commit fd6e3e1269
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 151 additions and 21 deletions

View file

@ -76,8 +76,10 @@ class Consumer:
if hasattr(self, "consumer"):
if self.consumer:
self.consumer.unsubscribe()
self.consumer.close()
try:
self.consumer.close()
except Exception:
pass
self.consumer = None
async def stop(self):
@ -157,12 +159,14 @@ class Consumer:
except Exception as e:
logger.error(f"Consumer loop exception: {e}", exc_info=True)
for c in consumers:
for i, c in enumerate(consumers):
try:
c.unsubscribe()
c.close()
except Exception:
pass
except Exception as ce:
logger.warning(
f"Consumer {i} close failed (error path): "
f"{type(ce).__name__}: {ce}"
)
for ex in executors:
ex.shutdown(wait=False)
consumers = []
@ -171,12 +175,14 @@ class Consumer:
continue
finally:
for c in consumers:
for i, c in enumerate(consumers):
try:
c.unsubscribe()
c.close()
except Exception:
pass
except Exception as ce:
logger.warning(
f"Consumer {i} close failed: "
f"{type(ce).__name__}: {ce}"
)
for ex in executors:
ex.shutdown(wait=False)

View file

@ -10,6 +10,7 @@ logger = logging.getLogger(__name__)
# Default connection settings from environment
DEFAULT_PULSAR_HOST = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
DEFAULT_PULSAR_API_KEY = os.getenv("PULSAR_API_KEY", None)
DEFAULT_PULSAR_ADMIN_URL = os.getenv("PULSAR_ADMIN_URL", 'http://pulsar:8080')
DEFAULT_RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", 'rabbitmq')
DEFAULT_RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", '5672'))
@ -43,6 +44,7 @@ def get_pubsub(**config: Any) -> Any:
host=config.get('pulsar_host', DEFAULT_PULSAR_HOST),
api_key=config.get('pulsar_api_key', DEFAULT_PULSAR_API_KEY),
listener=config.get('pulsar_listener'),
admin_url=config.get('pulsar_admin_url', DEFAULT_PULSAR_ADMIN_URL),
)
elif backend_type == 'rabbitmq':
from .rabbitmq_backend import RabbitMQBackend
@ -77,6 +79,7 @@ def get_pubsub(**config: Any) -> Any:
STANDALONE_PULSAR_HOST = 'pulsar://localhost:6650'
STANDALONE_PULSAR_ADMIN_URL = 'http://localhost:8080'
def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
@ -88,6 +91,7 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
that run outside containers)
"""
pulsar_host = STANDALONE_PULSAR_HOST if standalone else DEFAULT_PULSAR_HOST
pulsar_admin_url = STANDALONE_PULSAR_ADMIN_URL if standalone else DEFAULT_PULSAR_ADMIN_URL
pulsar_listener = 'localhost' if standalone else None
rabbitmq_host = 'localhost' if standalone else DEFAULT_RABBITMQ_HOST
kafka_bootstrap = 'localhost:9092' if standalone else DEFAULT_KAFKA_BOOTSTRAP
@ -105,6 +109,12 @@ def add_pubsub_args(parser: ArgumentParser, standalone: bool = False) -> None:
help=f'Pulsar host (default: {pulsar_host})',
)
parser.add_argument(
'--pulsar-admin-url',
default=pulsar_admin_url,
help=f'Pulsar admin REST API URL (default: {pulsar_admin_url})',
)
parser.add_argument(
'--pulsar-api-key',
default=DEFAULT_PULSAR_API_KEY,

View file

@ -7,8 +7,12 @@ handling topic mapping, serialization, and Pulsar client management.
import pulsar
import _pulsar
import asyncio
import json
import logging
import urllib.request
import urllib.error
import urllib.parse
from typing import Any
from .backend import PubSubBackend, BackendProducer, BackendConsumer, Message
@ -117,7 +121,10 @@ class PulsarBackend:
producers and consumers.
"""
def __init__(self, host: str, api_key: str = None, listener: str = None):
def __init__(
self, host: str, api_key: str = None, listener: str = None,
admin_url: str = None,
):
"""
Initialize Pulsar backend.
@ -125,10 +132,12 @@ class PulsarBackend:
host: Pulsar broker URL (e.g., pulsar://localhost:6650)
api_key: Optional API key for authentication
listener: Optional listener name for multi-homed setups
admin_url: Pulsar admin REST API URL (e.g., http://pulsar:8080)
"""
self.host = host
self.api_key = api_key
self.listener = listener
self.admin_url = admin_url
# Create Pulsar client
client_args = {'service_url': host}
@ -270,24 +279,129 @@ class PulsarBackend:
return PulsarBackendConsumer(pulsar_consumer, schema)
def _admin_api_path(self, pulsar_uri: str) -> str:
"""
Convert a Pulsar topic URI to an admin REST API path.
persistent://tg/flow/triples-store:default:explain-flow
-> /admin/v2/persistent/tg/flow/triples-store%3Adefault%3Aexplain-flow
"""
scheme, rest = pulsar_uri.split('://', 1)
tenant, namespace, topic = rest.split('/', 2)
encoded_topic = urllib.parse.quote(topic, safe='')
return f"/admin/v2/{scheme}/{tenant}/{namespace}/{encoded_topic}"
def _admin_request(self, method, path):
"""
Make a synchronous admin REST API request.
Returns parsed JSON for GET, None for DELETE/PUT.
Raises urllib.error.HTTPError for non-404 errors.
404 is treated as success (idempotent deletion).
"""
url = f"{self.admin_url}{path}"
req = urllib.request.Request(url, method=method)
try:
with urllib.request.urlopen(req) as resp:
if method == 'GET':
return json.loads(resp.read().decode('utf-8'))
return None
except urllib.error.HTTPError as e:
if e.code == 404:
return None
raise
def _delete_topic_sync(self, topic: str):
"""
Delete a persistent topic and all its subscriptions.
Subscriptions must be removed first Pulsar rejects topic
deletion while subscriptions exist. Force-deletes each
subscription to disconnect any lingering consumers.
"""
pulsar_uri = self.map_topic(topic)
if pulsar_uri.startswith('non-persistent://'):
return
api_path = self._admin_api_path(pulsar_uri)
try:
subs = self._admin_request('GET', f"{api_path}/subscriptions")
except Exception as e:
logger.warning(f"Failed to list subscriptions for {topic}: {e}")
return
if subs:
for sub in subs:
encoded_sub = urllib.parse.quote(sub, safe='')
try:
self._admin_request(
'DELETE',
f"{api_path}/subscription/{encoded_sub}"
f"?force=true"
)
logger.info(
f"Deleted subscription {sub} from {topic}"
)
except Exception as e:
logger.warning(
f"Failed to delete subscription {sub} "
f"from {topic}: {e}"
)
try:
self._admin_request('DELETE', api_path)
logger.info(f"Deleted topic: {topic}")
except Exception as e:
logger.warning(f"Failed to delete topic {topic}: {e}")
def _topic_exists_sync(self, topic: str) -> bool:
"""Check topic existence via admin API."""
pulsar_uri = self.map_topic(topic)
if pulsar_uri.startswith('non-persistent://'):
return False
api_path = self._admin_api_path(pulsar_uri)
try:
result = self._admin_request('GET', f"{api_path}/stats")
return result is not None
except Exception:
return False
async def create_topic(self, topic: str) -> None:
"""No-op — Pulsar auto-creates topics on first use.
TODO: Use admin REST API for explicit persistent topic creation."""
"""No-op — Pulsar auto-creates topics on first use."""
pass
async def delete_topic(self, topic: str) -> None:
"""No-op — to be replaced with admin REST API calls.
TODO: Delete persistent topic via admin API."""
pass
"""
Delete a persistent topic and all its subscriptions via
the admin REST API.
Called by the flow controller during deliberate flow deletion.
Non-persistent topics are skipped. Idempotent.
"""
if not self.admin_url:
logger.warning(
f"Cannot delete topic {topic}: "
f"no admin URL configured"
)
return
await asyncio.to_thread(self._delete_topic_sync, topic)
async def topic_exists(self, topic: str) -> bool:
"""Returns True — Pulsar auto-creates on subscribe.
TODO: Use admin REST API for actual existence check."""
return True
"""Check whether a persistent topic exists via the admin API."""
if not self.admin_url:
return True
return await asyncio.to_thread(self._topic_exists_sync, topic)
async def ensure_topic(self, topic: str) -> None:
"""No-op — Pulsar auto-creates topics on first use.
TODO: Use admin REST API for explicit creation."""
"""No-op — Pulsar auto-creates topics on first use."""
pass
def close(self) -> None: