Pub/sub abstraction: decouple from Pulsar (#751)

Remove Pulsar-specific concepts from application code so that
the pub/sub backend is swappable via configuration.

Rename translators:
- to_pulsar/from_pulsar → decode/encode across all translator
  classes, dispatch handlers, and tests (55+ files)
- from_response_with_completion → encode_with_completion
- Remove pulsar.schema.Record from translator base class

Queue naming (CLASS:TOPICSPACE:TOPIC):
- Replace topic() helper with queue() using new format:
  flow:tg:name, request:tg:name, response:tg:name, state:tg:name
- Queue class implies persistence/TTL (no QoS in names)
- Update Pulsar backend map_topic() to parse new format
- Librarian queues use flow class (persistent, for chunking)
- Config push uses state class (persistent, last-value)
- Remove 15 dead topic imports from schema files
- Update init_trustgraph.py namespace: config → state

Confine Pulsar to pulsar_backend.py:
- Delete legacy PulsarClient class from pubsub.py
- Move add_args to add_pubsub_args() with standalone flag
  for CLI tools (defaults to localhost)
- PulsarBackendConsumer.receive() catches _pulsar.Timeout,
  raises standard TimeoutError
- Remove Pulsar imports from: async_processor, flow_processor,
  log_level, all 11 client files, 4 storage writers, gateway
  service, gateway config receiver
- Remove log_level/LoggerLevel from client API
- Rewrite tg-monitor-prompts to use backend abstraction
- Update tg-dump-queues to use add_pubsub_args

Also: pubsub-abstraction.md tech spec covering problem statement,
design goals, as-is requirements, candidate broker assessment,
approach, and implementation order.
This commit is contained in:
cybermaggedon 2026-04-01 20:16:53 +01:00 committed by GitHub
parent dbf8daa74a
commit 4fb0b4d8e8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
106 changed files with 1269 additions and 788 deletions

View file

@ -1,5 +1,5 @@
from . pubsub import PulsarClient, get_pubsub
from . pubsub import get_pubsub, add_pubsub_args
from . async_processor import AsyncProcessor
from . consumer import Consumer
from . producer import Producer

View file

@ -6,7 +6,6 @@
import asyncio
import argparse
import _pulsar
import time
import uuid
import logging
@ -15,7 +14,7 @@ from prometheus_client import start_http_server, Info
from .. schema import ConfigPush, config_push_queue
from .. log_level import LogLevel
from . pubsub import PulsarClient, get_pubsub
from . pubsub import get_pubsub, add_pubsub_args
from . producer import Producer
from . consumer import Consumer
from . metrics import ProcessorMetrics, ConsumerMetrics
@ -223,8 +222,8 @@ class AsyncProcessor:
logger.info("Keyboard interrupt.")
return
except _pulsar.Interrupted:
logger.info("Pulsar Interrupted.")
except KeyboardInterrupt:
logger.info("Interrupted.")
return
# Exceptions from a taskgroup come in as an exception group
@ -250,15 +249,7 @@ class AsyncProcessor:
@staticmethod
def add_args(parser):
# Pub/sub backend selection
parser.add_argument(
'--pubsub-backend',
default=os.getenv('PUBSUB_BACKEND', 'pulsar'),
choices=['pulsar', 'mqtt'],
help='Pub/sub backend (default: pulsar, env: PUBSUB_BACKEND)',
)
PulsarClient.add_args(parser)
add_pubsub_args(parser)
add_logging_args(parser)
parser.add_argument(

View file

@ -6,8 +6,6 @@
import json
import logging
from pulsar.schema import JsonSchema
from .. schema import Error
from .. schema import config_request_queue, config_response_queue
from .. schema import config_push_queue

View file

@ -1,110 +1,72 @@
import os
import pulsar
import _pulsar
import uuid
from pulsar.schema import JsonSchema
import logging
from .. log_level import LogLevel
from .pulsar_backend import PulsarBackend
logger = logging.getLogger(__name__)
# Default connection settings from environment
DEFAULT_PULSAR_HOST = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
DEFAULT_PULSAR_API_KEY = os.getenv("PULSAR_API_KEY", None)
def get_pubsub(**config):
"""
Factory function to create a pub/sub backend based on configuration.
Args:
config: Configuration dictionary from command-line args
Must include 'pubsub_backend' key
config: Configuration dictionary from command-line args.
Key 'pubsub_backend' selects the backend (default: 'pulsar').
Returns:
Backend instance (PulsarBackend, MQTTBackend, etc.)
Example:
backend = get_pubsub(
pubsub_backend='pulsar',
pulsar_host='pulsar://localhost:6650'
)
Backend instance implementing the PubSubBackend protocol.
"""
backend_type = config.get('pubsub_backend', 'pulsar')
if backend_type == 'pulsar':
from .pulsar_backend import PulsarBackend
return PulsarBackend(
host=config.get('pulsar_host', PulsarClient.default_pulsar_host),
api_key=config.get('pulsar_api_key', PulsarClient.default_pulsar_api_key),
host=config.get('pulsar_host', DEFAULT_PULSAR_HOST),
api_key=config.get('pulsar_api_key', DEFAULT_PULSAR_API_KEY),
listener=config.get('pulsar_listener'),
)
elif backend_type == 'mqtt':
# TODO: Implement MQTT backend
raise NotImplementedError("MQTT backend not yet implemented")
else:
raise ValueError(f"Unknown pub/sub backend: {backend_type}")
class PulsarClient:
STANDALONE_PULSAR_HOST = 'pulsar://localhost:6650'
default_pulsar_host = os.getenv("PULSAR_HOST", 'pulsar://pulsar:6650')
default_pulsar_api_key = os.getenv("PULSAR_API_KEY", None)
def __init__(self, **params):
def add_pubsub_args(parser, standalone=False):
"""Add pub/sub CLI arguments to an argument parser.
self.client = None
Args:
parser: argparse.ArgumentParser
standalone: If True, default host is localhost (for CLI tools
that run outside containers)
"""
host = STANDALONE_PULSAR_HOST if standalone else DEFAULT_PULSAR_HOST
listener_default = 'localhost' if standalone else None
pulsar_host = params.get("pulsar_host", self.default_pulsar_host)
pulsar_listener = params.get("pulsar_listener", None)
pulsar_api_key = params.get(
"pulsar_api_key",
self.default_pulsar_api_key
)
# Hard-code Pulsar logging to ERROR level to minimize noise
parser.add_argument(
'--pubsub-backend',
default=os.getenv('PUBSUB_BACKEND', 'pulsar'),
help='Pub/sub backend (default: pulsar, env: PUBSUB_BACKEND)',
)
self.pulsar_host = pulsar_host
self.pulsar_api_key = pulsar_api_key
parser.add_argument(
'-p', '--pulsar-host',
default=host,
help=f'Pulsar host (default: {host})',
)
if pulsar_api_key:
auth = pulsar.AuthenticationToken(pulsar_api_key)
self.client = pulsar.Client(
pulsar_host,
authentication=auth,
logger=pulsar.ConsoleLogger(_pulsar.LoggerLevel.Error)
)
else:
self.client = pulsar.Client(
pulsar_host,
listener_name=pulsar_listener,
logger=pulsar.ConsoleLogger(_pulsar.LoggerLevel.Error)
)
parser.add_argument(
'--pulsar-api-key',
default=DEFAULT_PULSAR_API_KEY,
help='Pulsar API key',
)
self.pulsar_listener = pulsar_listener
def close(self):
self.client.close()
def __del__(self):
if hasattr(self, "client"):
if self.client:
self.client.close()
@staticmethod
def add_args(parser):
parser.add_argument(
'-p', '--pulsar-host',
default=__class__.default_pulsar_host,
help=f'Pulsar host (default: {__class__.default_pulsar_host})',
)
parser.add_argument(
'--pulsar-api-key',
default=__class__.default_pulsar_api_key,
help=f'Pulsar API key',
)
parser.add_argument(
'--pulsar-listener',
help=f'Pulsar listener (default: none)',
)
parser.add_argument(
'--pulsar-listener',
default=listener_default,
help=f'Pulsar listener (default: {listener_default or "none"})',
)

View file

@ -181,8 +181,11 @@ class PulsarBackendConsumer:
self._schema_cls = schema_cls
def receive(self, timeout_millis: int = 2000) -> Message:
"""Receive a message."""
pulsar_msg = self._consumer.receive(timeout_millis=timeout_millis)
"""Receive a message. Raises TimeoutError if no message available."""
try:
pulsar_msg = self._consumer.receive(timeout_millis=timeout_millis)
except _pulsar.Timeout:
raise TimeoutError("No message received within timeout")
return PulsarMessage(pulsar_msg, self._schema_cls)
def acknowledge(self, message: Message) -> None:
@ -237,38 +240,44 @@ class PulsarBackend:
self.client = pulsar.Client(**client_args)
logger.info(f"Pulsar client connected to {host}")
def map_topic(self, generic_topic: str) -> str:
def map_topic(self, queue_id: str) -> str:
"""
Map generic topic format to Pulsar URI.
Map queue identifier to Pulsar URI.
Format: qos/tenant/namespace/queue
Example: q1/tg/flow/my-queue -> persistent://tg/flow/my-queue
Format: class:topicspace:topic
Example: flow:tg:text-completion-request -> persistent://tg/flow/text-completion-request
Args:
generic_topic: Generic topic string or already-formatted Pulsar URI
queue_id: Queue identifier string or already-formatted Pulsar URI
Returns:
Pulsar topic URI
"""
# If already a Pulsar URI, return as-is
if '://' in generic_topic:
return generic_topic
if '://' in queue_id:
return queue_id
parts = generic_topic.split('/', 3)
if len(parts) != 4:
raise ValueError(f"Invalid topic format: {generic_topic}, expected qos/tenant/namespace/queue")
parts = queue_id.split(':', 2)
if len(parts) != 3:
raise ValueError(
f"Invalid queue format: {queue_id}, "
f"expected class:topicspace:topic"
)
qos, tenant, namespace, queue = parts
cls, topicspace, topic = parts
# Map QoS to persistence
if qos == 'q0':
persistence = 'non-persistent'
elif qos in ['q1', 'q2']:
# Map class to Pulsar persistence and namespace
if cls in ('flow', 'state'):
persistence = 'persistent'
elif cls in ('request', 'response'):
persistence = 'non-persistent'
else:
raise ValueError(f"Invalid QoS level: {qos}, expected q0, q1, or q2")
raise ValueError(
f"Invalid queue class: {cls}, "
f"expected flow, request, response, or state"
)
return f"{persistence}://{tenant}/{namespace}/{queue}"
return f"{persistence}://{topicspace}/{cls}/{topic}"
def create_producer(self, topic: str, schema: type, **options) -> BackendProducer:
"""