trustgraph/trustgraph-base/trustgraph/base/async_processor.py
cybermaggedon 2f8d6a3ffb
Fix agent config handler registration, remove debug prints, disable RabbitMQ heartbeats (#764)
- Fix agent react and orchestrator services appending bare methods
  to config_handlers instead of using register_config_handler() —
  caused 'method object is not subscriptable' on config notify
- Add exc_info to config fetch retry logging for proper tracebacks
- Remove debug print statements from collection management
  dispatcher and translator
- Disable RabbitMQ heartbeats (heartbeat=0) to prevent broker
  closing idle producer connections that can't process heartbeat
  frames from BlockingConnection
2026-04-07 12:11:12 +01:00

401 lines
13 KiB
Python

# Base class for processors. Implements:
# - Pub/sub client, subscribe and consume basic
# - the async startup logic
# - Config notify handling with subscribe-then-fetch pattern
# - Initialising metrics
import asyncio
import argparse
import time
import uuid
import logging
import os
from prometheus_client import start_http_server, Info
from .. schema import ConfigPush, ConfigRequest, ConfigResponse
from .. schema import config_push_queue, config_request_queue
from .. schema import config_response_queue
from .. log_level import LogLevel
from . pubsub import get_pubsub, add_pubsub_args
from . producer import Producer
from . consumer import Consumer
from . subscriber import Subscriber
from . request_response_spec import RequestResponse
from . metrics import ProcessorMetrics, ConsumerMetrics, ProducerMetrics
from . metrics import SubscriberMetrics
from . logging import add_logging_args, setup_logging
default_config_queue = config_push_queue
# Module logger
logger = logging.getLogger(__name__)
# Async processor
class AsyncProcessor:
def __init__(self, **params):
# Store the identity
self.id = params.get("id")
# Create pub/sub backend via factory
self.pubsub_backend = get_pubsub(**params)
# Store pulsar_host for backward compatibility
self._pulsar_host = params.get("pulsar_host", "pulsar://pulsar:6650")
# Initialise metrics, records the parameters
ProcessorMetrics(processor = self.id).info({
k: str(params[k])
for k in params
if k != "id"
})
# The processor runs all activity in a taskgroup, it's mandatory
# that this is provded
self.taskgroup = params.get("taskgroup")
if self.taskgroup is None:
raise RuntimeError("Essential taskgroup missing")
# Get the configuration topic
self.config_push_queue = params.get(
"config_push_queue", default_config_queue
)
# This records registered configuration handlers, each entry is:
# { "handler": async_fn, "types": set_or_none }
self.config_handlers = []
# Track the current config version for dedup
self.config_version = 0
# Create a random ID for this subscription to the configuration
# service
config_subscriber_id = str(uuid.uuid4())
config_consumer_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "config",
)
# Subscribe to config notify queue
self.config_sub_task = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub_backend,
subscriber = config_subscriber_id,
flow = None,
topic = self.config_push_queue,
schema = ConfigPush,
handler = self.on_config_notify,
metrics = config_consumer_metrics,
start_of_messages = False,
consumer_type = 'exclusive',
)
self.running = True
def _create_config_client(self):
"""Create a short-lived config request/response client."""
config_rr_id = str(uuid.uuid4())
config_req_metrics = ProducerMetrics(
processor = self.id, flow = None, name = "config-request",
)
config_resp_metrics = SubscriberMetrics(
processor = self.id, flow = None, name = "config-response",
)
return RequestResponse(
backend = self.pubsub_backend,
subscription = f"{self.id}--config--{config_rr_id}",
consumer_name = self.id,
request_topic = config_request_queue,
request_schema = ConfigRequest,
request_metrics = config_req_metrics,
response_topic = config_response_queue,
response_schema = ConfigResponse,
response_metrics = config_resp_metrics,
)
async def fetch_config(self):
"""Fetch full config from config service using a short-lived
request/response client. Returns (config, version) or raises."""
client = self._create_config_client()
try:
await client.start()
resp = await client.request(
ConfigRequest(operation="config"),
timeout=10,
)
if resp.error:
raise RuntimeError(f"Config error: {resp.error.message}")
return resp.config, resp.version
finally:
await client.stop()
# This is called to start dynamic behaviour.
# Implements the subscribe-then-fetch pattern to avoid race conditions.
async def start(self):
# 1. Start the notify consumer (begins buffering incoming notifys)
await self.config_sub_task.start()
# 2. Fetch current config via request/response
await self.fetch_and_apply_config()
# 3. Any buffered notifys with version > fetched version will be
# processed by on_config_notify, which does the version check
async def fetch_and_apply_config(self):
"""Fetch full config from config service and apply to all handlers.
Retries until successful — config service may not be ready yet."""
while self.running:
try:
config, version = await self.fetch_config()
logger.info(f"Fetched config version {version}")
self.config_version = version
# Apply to all handlers (startup = invoke all)
for entry in self.config_handlers:
await entry["handler"](config, version)
return
except Exception as e:
logger.warning(
f"Config fetch failed: {e}, retrying in 2s...",
exc_info=True
)
await asyncio.sleep(2)
# This is called to stop all threads. An over-ride point for extra
# functionality
def stop(self):
self.pubsub_backend.close()
self.running = False
# Returns the pub/sub backend (new interface)
@property
def pubsub(self): return self.pubsub_backend
# Returns the pulsar host (backward compatibility)
@property
def pulsar_host(self): return self._pulsar_host
# Register a new event handler for configuration change
def register_config_handler(self, handler, types=None):
self.config_handlers.append({
"handler": handler,
"types": set(types) if types else None,
})
# Called when a config notify message arrives
async def on_config_notify(self, message, consumer, flow):
notify_version = message.value().version
notify_types = set(message.value().types)
# Skip if we already have this version or newer
if notify_version <= self.config_version:
logger.debug(
f"Ignoring config notify v{notify_version}, "
f"already at v{self.config_version}"
)
return
# Check if any handler cares about the affected types
if notify_types:
any_interested = False
for entry in self.config_handlers:
handler_types = entry["types"]
if handler_types is None or notify_types & handler_types:
any_interested = True
break
if not any_interested:
logger.debug(
f"Ignoring config notify v{notify_version}, "
f"no handlers for types {notify_types}"
)
self.config_version = notify_version
return
logger.info(
f"Config notify v{notify_version} types={list(notify_types)}, "
f"fetching config..."
)
# Fetch full config using short-lived client
try:
config, version = await self.fetch_config()
self.config_version = version
# Invoke handlers that care about the affected types
for entry in self.config_handlers:
handler_types = entry["types"]
if handler_types is None:
await entry["handler"](config, version)
elif not notify_types or notify_types & handler_types:
await entry["handler"](config, version)
except Exception as e:
logger.error(
f"Failed to fetch config on notify: {e}", exc_info=True
)
# This is the 'main' body of the handler. It is a point to override
# if needed. By default does nothing. Processors are implemented
# by adding consumer/producer functionality so maybe nothing is needed
# in the run() body
async def run(self):
while self.running:
await asyncio.sleep(2)
# Startup fabric. This runs in 'async' mode, creates a taskgroup and
# runs the producer.
@classmethod
async def launch_async(cls, args):
try:
# Create a taskgroup. This seems complicated, when an exception
# occurs, unhandled it looks like it cancels all threads in the
# taskgroup. Needs the exception to be caught in the right
# place.
async with asyncio.TaskGroup() as tg:
# Create a processor instance, and include the taskgroup
# as a paramter. A processor identity ident is used as
# - subscriber name
# - an identifier for flow configuration
p = cls(**args | { "taskgroup": tg })
# Start the processor
await p.start()
# Run the processor
task = tg.create_task(p.run())
# The taskgroup causes everything to wait until
# all threads have stopped
# This is here to output a debug message, shouldn't be needed.
except Exception as e:
logger.error("Exception, closing taskgroup", exc_info=True)
raise e
@classmethod
def setup_logging(cls, args):
"""Configure logging for the entire application"""
setup_logging(args)
# Startup fabric. launch calls launch_async in async mode.
@classmethod
def launch(cls, ident, doc):
# Start assembling CLI arguments
parser = argparse.ArgumentParser(
prog=ident,
description=doc
)
parser.add_argument(
'--id',
default=ident,
help=f'Configuration identity (default: {ident})',
)
# Invoke the class-specific add_args, which manages adding all the
# command-line arguments
cls.add_args(parser)
# Parse arguments
args = parser.parse_args()
args = vars(args)
# Setup logging before anything else
cls.setup_logging(args)
# Debug
logger.debug(f"Arguments: {args}")
# Start the Prometheus metrics service if needed
if args["metrics"]:
start_http_server(args["metrics_port"])
# Loop forever, exception handler
while True:
logger.info("Starting...")
try:
# Launch the processor in an asyncio handler
asyncio.run(cls.launch_async(
args
))
except KeyboardInterrupt:
logger.info("Keyboard interrupt.")
return
except KeyboardInterrupt:
logger.info("Interrupted.")
return
# Exceptions from a taskgroup come in as an exception group
except ExceptionGroup as e:
logger.error("Exception group:")
for se in e.exceptions:
logger.error(f" Type: {type(se)}")
logger.error(f" Exception: {se}", exc_info=se)
except Exception as e:
logger.error(f"Type: {type(e)}")
logger.error(f"Exception: {e}", exc_info=True)
# Retry occurs here
logger.warning("Will retry...")
time.sleep(4)
logger.info("Retrying...")
# The command-line arguments are built using a stack of add_args
# invocations
@staticmethod
def add_args(parser):
add_pubsub_args(parser)
add_logging_args(parser)
parser.add_argument(
'--config-push-queue',
default=default_config_queue,
help=f'Config push queue (default: {default_config_queue})',
)
parser.add_argument(
'--metrics',
action=argparse.BooleanOptionalAction,
default=True,
help=f'Metrics enabled (default: true)',
)
parser.add_argument(
'-P', '--metrics-port',
type=int,
default=8000,
help=f'Pulsar host (default: 8000)',
)