mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-25 08:26:21 +02:00
Subscriber resilience: recreate consumer after connection failure - Move consumer creation from Subscriber.start() into the run() loop, matching the pattern used by Consumer. If the connection drops and the consumer is closed in the finally block, the loop now recreates it on the next iteration instead of spinning forever on a None consumer. Consumer thread safety: - Dedicated ThreadPoolExecutor per consumer so all pika operations (create, receive, acknowledge, negative_acknowledge) run on the same thread — pika BlockingConnection is not thread-safe - Applies to both Consumer and Subscriber classes Config handler type audit — fix four mismatched type registrations: - librarian: was ["librarian"] (non-existent type), now ["flow", "active-flow"] (matches config["flow"] that the handler reads) - cores/service: was ["kg-core"], now ["flow"] (reads config["flow"]) - metering/counter: was ["token-costs"], now ["token-cost"] (singular) - agent/mcp_tool: was ["mcp-tool"], now ["mcp"] (reads config["mcp"]) Update tests
400 lines
13 KiB
Python
400 lines
13 KiB
Python
|
|
# Base class for processors. Implements:
|
|
# - Pub/sub client, subscribe and consume basic
|
|
# - the async startup logic
|
|
# - Config notify handling with subscribe-then-fetch pattern
|
|
# - Initialising metrics
|
|
|
|
import asyncio
|
|
import argparse
|
|
import time
|
|
import uuid
|
|
import logging
|
|
import os
|
|
from prometheus_client import start_http_server, Info
|
|
|
|
from .. schema import ConfigPush, ConfigRequest, ConfigResponse
|
|
from .. schema import config_push_queue, config_request_queue
|
|
from .. schema import config_response_queue
|
|
from .. log_level import LogLevel
|
|
from . pubsub import get_pubsub, add_pubsub_args
|
|
from . producer import Producer
|
|
from . consumer import Consumer
|
|
from . subscriber import Subscriber
|
|
from . request_response_spec import RequestResponse
|
|
from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
|
|
from . metrics import SubscriberMetrics
|
|
from . logging import add_logging_args, setup_logging
|
|
|
|
default_config_queue = config_push_queue
|
|
|
|
# Module logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Async processor
|
|
class AsyncProcessor:
|
|
|
|
def __init__(self, **params):
|
|
|
|
# Store the identity
|
|
self.id = params.get("id")
|
|
|
|
# Create pub/sub backend via factory
|
|
self.pubsub_backend = get_pubsub(**params)
|
|
|
|
# Store pulsar_host for backward compatibility
|
|
self._pulsar_host = params.get("pulsar_host", "pulsar://pulsar:6650")
|
|
|
|
# Initialise metrics, records the parameters
|
|
ProcessorMetrics(processor = self.id).info({
|
|
k: str(params[k])
|
|
for k in params
|
|
if k != "id"
|
|
})
|
|
|
|
# The processor runs all activity in a taskgroup, it's mandatory
|
|
# that this is provded
|
|
self.taskgroup = params.get("taskgroup")
|
|
if self.taskgroup is None:
|
|
raise RuntimeError("Essential taskgroup missing")
|
|
|
|
# Get the configuration topic
|
|
self.config_push_queue = params.get(
|
|
"config_push_queue", default_config_queue
|
|
)
|
|
|
|
# This records registered configuration handlers, each entry is:
|
|
# { "handler": async_fn, "types": set_or_none }
|
|
self.config_handlers = []
|
|
|
|
# Track the current config version for dedup
|
|
self.config_version = 0
|
|
|
|
# Create a random ID for this subscription to the configuration
|
|
# service
|
|
config_subscriber_id = str(uuid.uuid4())
|
|
|
|
config_consumer_metrics = ConsumerMetrics(
|
|
processor = self.id, flow = None, name = "config",
|
|
)
|
|
|
|
# Subscribe to config notify queue
|
|
self.config_sub_task = Consumer(
|
|
|
|
taskgroup = self.taskgroup,
|
|
backend = self.pubsub_backend,
|
|
subscriber = config_subscriber_id,
|
|
flow = None,
|
|
|
|
topic = self.config_push_queue,
|
|
schema = ConfigPush,
|
|
|
|
handler = self.on_config_notify,
|
|
|
|
metrics = config_consumer_metrics,
|
|
|
|
start_of_messages = False,
|
|
)
|
|
|
|
self.running = True
|
|
|
|
def _create_config_client(self):
|
|
"""Create a short-lived config request/response client."""
|
|
config_rr_id = str(uuid.uuid4())
|
|
|
|
config_req_metrics = ProducerMetrics(
|
|
processor = self.id, flow = None, name = "config-request",
|
|
)
|
|
config_resp_metrics = SubscriberMetrics(
|
|
processor = self.id, flow = None, name = "config-response",
|
|
)
|
|
|
|
return RequestResponse(
|
|
backend = self.pubsub_backend,
|
|
subscription = f"{self.id}--config--{config_rr_id}",
|
|
consumer_name = self.id,
|
|
request_topic = config_request_queue,
|
|
request_schema = ConfigRequest,
|
|
request_metrics = config_req_metrics,
|
|
response_topic = config_response_queue,
|
|
response_schema = ConfigResponse,
|
|
response_metrics = config_resp_metrics,
|
|
)
|
|
|
|
async def fetch_config(self):
|
|
"""Fetch full config from config service using a short-lived
|
|
request/response client. Returns (config, version) or raises."""
|
|
client = self._create_config_client()
|
|
try:
|
|
await client.start()
|
|
resp = await client.request(
|
|
ConfigRequest(operation="config"),
|
|
timeout=10,
|
|
)
|
|
if resp.error:
|
|
raise RuntimeError(f"Config error: {resp.error.message}")
|
|
return resp.config, resp.version
|
|
finally:
|
|
await client.stop()
|
|
|
|
# This is called to start dynamic behaviour.
|
|
# Implements the subscribe-then-fetch pattern to avoid race conditions.
|
|
async def start(self):
|
|
|
|
# 1. Start the notify consumer (begins buffering incoming notifys)
|
|
await self.config_sub_task.start()
|
|
|
|
# 2. Fetch current config via request/response
|
|
await self.fetch_and_apply_config()
|
|
|
|
# 3. Any buffered notifys with version > fetched version will be
|
|
# processed by on_config_notify, which does the version check
|
|
|
|
async def fetch_and_apply_config(self):
|
|
"""Fetch full config from config service and apply to all handlers.
|
|
Retries until successful — config service may not be ready yet."""
|
|
|
|
while self.running:
|
|
|
|
try:
|
|
config, version = await self.fetch_config()
|
|
|
|
logger.info(f"Fetched config version {version}")
|
|
|
|
self.config_version = version
|
|
|
|
# Apply to all handlers (startup = invoke all)
|
|
for entry in self.config_handlers:
|
|
await entry["handler"](config, version)
|
|
|
|
return
|
|
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Config fetch failed: {e}, retrying in 2s...",
|
|
exc_info=True
|
|
)
|
|
await asyncio.sleep(2)
|
|
|
|
# This is called to stop all threads. An over-ride point for extra
|
|
# functionality
|
|
def stop(self):
|
|
self.pubsub_backend.close()
|
|
self.running = False
|
|
|
|
# Returns the pub/sub backend (new interface)
|
|
@property
|
|
def pubsub(self): return self.pubsub_backend
|
|
|
|
# Returns the pulsar host (backward compatibility)
|
|
@property
|
|
def pulsar_host(self): return self._pulsar_host
|
|
|
|
# Register a new event handler for configuration change
|
|
def register_config_handler(self, handler, types=None):
|
|
self.config_handlers.append({
|
|
"handler": handler,
|
|
"types": set(types) if types else None,
|
|
})
|
|
|
|
# Called when a config notify message arrives
|
|
async def on_config_notify(self, message, consumer, flow):
|
|
|
|
notify_version = message.value().version
|
|
notify_types = set(message.value().types)
|
|
|
|
# Skip if we already have this version or newer
|
|
if notify_version <= self.config_version:
|
|
logger.debug(
|
|
f"Ignoring config notify v{notify_version}, "
|
|
f"already at v{self.config_version}"
|
|
)
|
|
return
|
|
|
|
# Check if any handler cares about the affected types
|
|
if notify_types:
|
|
any_interested = False
|
|
for entry in self.config_handlers:
|
|
handler_types = entry["types"]
|
|
if handler_types is None or notify_types & handler_types:
|
|
any_interested = True
|
|
break
|
|
|
|
if not any_interested:
|
|
logger.debug(
|
|
f"Ignoring config notify v{notify_version}, "
|
|
f"no handlers for types {notify_types}"
|
|
)
|
|
self.config_version = notify_version
|
|
return
|
|
|
|
logger.info(
|
|
f"Config notify v{notify_version} types={list(notify_types)}, "
|
|
f"fetching config..."
|
|
)
|
|
|
|
# Fetch full config using short-lived client
|
|
try:
|
|
config, version = await self.fetch_config()
|
|
|
|
self.config_version = version
|
|
|
|
# Invoke handlers that care about the affected types
|
|
for entry in self.config_handlers:
|
|
handler_types = entry["types"]
|
|
if handler_types is None:
|
|
await entry["handler"](config, version)
|
|
elif not notify_types or notify_types & handler_types:
|
|
await entry["handler"](config, version)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to fetch config on notify: {e}", exc_info=True
|
|
)
|
|
|
|
# This is the 'main' body of the handler. It is a point to override
|
|
# if needed. By default does nothing. Processors are implemented
|
|
# by adding consumer/producer functionality so maybe nothing is needed
|
|
# in the run() body
|
|
async def run(self):
|
|
while self.running:
|
|
await asyncio.sleep(2)
|
|
|
|
# Startup fabric. This runs in 'async' mode, creates a taskgroup and
|
|
# runs the producer.
|
|
@classmethod
|
|
async def launch_async(cls, args):
|
|
|
|
try:
|
|
|
|
# Create a taskgroup. This seems complicated, when an exception
|
|
# occurs, unhandled it looks like it cancels all threads in the
|
|
# taskgroup. Needs the exception to be caught in the right
|
|
# place.
|
|
async with asyncio.TaskGroup() as tg:
|
|
|
|
|
|
# Create a processor instance, and include the taskgroup
|
|
# as a paramter. A processor identity ident is used as
|
|
# - subscriber name
|
|
# - an identifier for flow configuration
|
|
p = cls(**args | { "taskgroup": tg })
|
|
|
|
# Start the processor
|
|
await p.start()
|
|
|
|
# Run the processor
|
|
task = tg.create_task(p.run())
|
|
|
|
# The taskgroup causes everything to wait until
|
|
# all threads have stopped
|
|
|
|
# This is here to output a debug message, shouldn't be needed.
|
|
except Exception as e:
|
|
logger.error("Exception, closing taskgroup", exc_info=True)
|
|
raise e
|
|
|
|
@classmethod
|
|
def setup_logging(cls, args):
|
|
"""Configure logging for the entire application"""
|
|
setup_logging(args)
|
|
|
|
# Startup fabric. launch calls launch_async in async mode.
|
|
@classmethod
|
|
def launch(cls, ident, doc):
|
|
|
|
# Start assembling CLI arguments
|
|
parser = argparse.ArgumentParser(
|
|
prog=ident,
|
|
description=doc
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--id',
|
|
default=ident,
|
|
help=f'Configuration identity (default: {ident})',
|
|
)
|
|
|
|
# Invoke the class-specific add_args, which manages adding all the
|
|
# command-line arguments
|
|
cls.add_args(parser)
|
|
|
|
# Parse arguments
|
|
args = parser.parse_args()
|
|
args = vars(args)
|
|
|
|
# Setup logging before anything else
|
|
cls.setup_logging(args)
|
|
|
|
# Debug
|
|
logger.debug(f"Arguments: {args}")
|
|
|
|
# Start the Prometheus metrics service if needed
|
|
if args["metrics"]:
|
|
start_http_server(args["metrics_port"])
|
|
|
|
# Loop forever, exception handler
|
|
while True:
|
|
|
|
logger.info("Starting...")
|
|
|
|
try:
|
|
|
|
# Launch the processor in an asyncio handler
|
|
asyncio.run(cls.launch_async(
|
|
args
|
|
))
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Keyboard interrupt.")
|
|
return
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Interrupted.")
|
|
return
|
|
|
|
# Exceptions from a taskgroup come in as an exception group
|
|
except ExceptionGroup as e:
|
|
|
|
logger.error("Exception group:")
|
|
|
|
for se in e.exceptions:
|
|
logger.error(f" Type: {type(se)}")
|
|
logger.error(f" Exception: {se}", exc_info=se)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Type: {type(e)}")
|
|
logger.error(f"Exception: {e}", exc_info=True)
|
|
|
|
# Retry occurs here
|
|
logger.warning("Will retry...")
|
|
time.sleep(4)
|
|
logger.info("Retrying...")
|
|
|
|
# The command-line arguments are built using a stack of add_args
|
|
# invocations
|
|
@staticmethod
|
|
def add_args(parser):
|
|
|
|
add_pubsub_args(parser)
|
|
add_logging_args(parser)
|
|
|
|
parser.add_argument(
|
|
'--config-push-queue',
|
|
default=default_config_queue,
|
|
help=f'Config push queue (default: {default_config_queue})',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--metrics',
|
|
action=argparse.BooleanOptionalAction,
|
|
default=True,
|
|
help=f'Metrics enabled (default: true)',
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-P', '--metrics-port',
|
|
type=int,
|
|
default=8000,
|
|
help=f'Pulsar host (default: 8000)',
|
|
)
|