trustgraph/trustgraph-base/trustgraph/base/async_processor.py
cybermaggedon 34eb083836
Messaging fabric plugins (#592)
* Plugin architecture for messaging fabric

* Schemas use a technology neutral expression

* Schemas strictness has uncovered some incorrect schema use which is fixed
2025-12-17 21:40:43 +00:00

283 lines
8.4 KiB
Python

# Base class for processors. Implements:
# - Pulsar client, subscribe and consume basic
# - the async startup logic
# - Initialising metrics
import asyncio
import argparse
import _pulsar
import time
import uuid
import logging
import os
from prometheus_client import start_http_server, Info
from .. schema import ConfigPush, config_push_queue
from .. log_level import LogLevel
from . pubsub import PulsarClient, get_pubsub
from . producer import Producer
from . consumer import Consumer
from . metrics import ProcessorMetrics, ConsumerMetrics
from . logging import add_logging_args, setup_logging
default_config_queue = config_push_queue
# Module logger
logger = logging.getLogger(__name__)
# Async processor
class AsyncProcessor:
def __init__(self, **params):
# Store the identity
self.id = params.get("id")
# Create pub/sub backend via factory
self.pubsub_backend = get_pubsub(**params)
# Store pulsar_host for backward compatibility
self._pulsar_host = params.get("pulsar_host", "pulsar://pulsar:6650")
# Initialise metrics, records the parameters
ProcessorMetrics(processor = self.id).info({
k: str(params[k])
for k in params
if k != "id"
})
# The processor runs all activity in a taskgroup, it's mandatory
# that this is provded
self.taskgroup = params.get("taskgroup")
if self.taskgroup is None:
raise RuntimeError("Essential taskgroup missing")
# Get the configuration topic
self.config_push_queue = params.get(
"config_push_queue", default_config_queue
)
# This records registered configuration handlers
self.config_handlers = []
# Create a random ID for this subscription to the configuration
# service
config_subscriber_id = str(uuid.uuid4())
config_consumer_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "config",
)
# Subscribe to config queue
self.config_sub_task = Consumer(
taskgroup = self.taskgroup,
backend = self.pubsub_backend, # Changed from client to backend
subscriber = config_subscriber_id,
flow = None,
topic = self.config_push_queue,
schema = ConfigPush,
handler = self.on_config_change,
metrics = config_consumer_metrics,
# This causes new subscriptions to view the entire history of
# configuration
start_of_messages = True
)
self.running = True
# This is called to start dynamic behaviour. An over-ride point for
# extra functionality
async def start(self):
await self.config_sub_task.start()
# This is called to stop all threads. An over-ride point for extra
# functionality
def stop(self):
self.pubsub_backend.close()
self.running = False
# Returns the pub/sub backend (new interface)
@property
def pubsub(self): return self.pubsub_backend
# Returns the pulsar host (backward compatibility)
@property
def pulsar_host(self): return self._pulsar_host
# Register a new event handler for configuration change
def register_config_handler(self, handler):
self.config_handlers.append(handler)
# Called when a new configuration message push occurs
async def on_config_change(self, message, consumer, flow):
# Get configuration data and version number
config = message.value().config
version = message.value().version
# Invoke message handlers
logger.info(f"Config change event: version={version}")
for ch in self.config_handlers:
await ch(config, version)
# This is the 'main' body of the handler. It is a point to override
# if needed. By default does nothing. Processors are implemented
# by adding consumer/producer functionality so maybe nothing is needed
# in the run() body
async def run(self):
while self.running:
await asyncio.sleep(2)
# Startup fabric. This runs in 'async' mode, creates a taskgroup and
# runs the producer.
@classmethod
async def launch_async(cls, args):
try:
# Create a taskgroup. This seems complicated, when an exception
# occurs, unhandled it looks like it cancels all threads in the
# taskgroup. Needs the exception to be caught in the right
# place.
async with asyncio.TaskGroup() as tg:
# Create a processor instance, and include the taskgroup
# as a paramter. A processor identity ident is used as
# - subscriber name
# - an identifier for flow configuration
p = cls(**args | { "taskgroup": tg })
# Start the processor
await p.start()
# Run the processor
task = tg.create_task(p.run())
# The taskgroup causes everything to wait until
# all threads have stopped
# This is here to output a debug message, shouldn't be needed.
except Exception as e:
logger.error("Exception, closing taskgroup", exc_info=True)
raise e
@classmethod
def setup_logging(cls, args):
"""Configure logging for the entire application"""
setup_logging(args)
# Startup fabric. launch calls launch_async in async mode.
@classmethod
def launch(cls, ident, doc):
# Start assembling CLI arguments
parser = argparse.ArgumentParser(
prog=ident,
description=doc
)
parser.add_argument(
'--id',
default=ident,
help=f'Configuration identity (default: {ident})',
)
# Invoke the class-specific add_args, which manages adding all the
# command-line arguments
cls.add_args(parser)
# Parse arguments
args = parser.parse_args()
args = vars(args)
# Setup logging before anything else
cls.setup_logging(args)
# Debug
logger.debug(f"Arguments: {args}")
# Start the Prometheus metrics service if needed
if args["metrics"]:
start_http_server(args["metrics_port"])
# Loop forever, exception handler
while True:
logger.info("Starting...")
try:
# Launch the processor in an asyncio handler
asyncio.run(cls.launch_async(
args
))
except KeyboardInterrupt:
logger.info("Keyboard interrupt.")
return
except _pulsar.Interrupted:
logger.info("Pulsar Interrupted.")
return
# Exceptions from a taskgroup come in as an exception group
except ExceptionGroup as e:
logger.error("Exception group:")
for se in e.exceptions:
logger.error(f" Type: {type(se)}")
logger.error(f" Exception: {se}", exc_info=se)
except Exception as e:
logger.error(f"Type: {type(e)}")
logger.error(f"Exception: {e}", exc_info=True)
# Retry occurs here
logger.warning("Will retry...")
time.sleep(4)
logger.info("Retrying...")
# The command-line arguments are built using a stack of add_args
# invocations
@staticmethod
def add_args(parser):
# Pub/sub backend selection
parser.add_argument(
'--pubsub-backend',
default=os.getenv('PUBSUB_BACKEND', 'pulsar'),
choices=['pulsar', 'mqtt'],
help='Pub/sub backend (default: pulsar, env: PUBSUB_BACKEND)',
)
PulsarClient.add_args(parser)
add_logging_args(parser)
parser.add_argument(
'--config-push-queue',
default=default_config_queue,
help=f'Config push queue (default: {default_config_queue})',
)
parser.add_argument(
'--metrics',
action=argparse.BooleanOptionalAction,
default=True,
help=f'Metrics enabled (default: true)',
)
parser.add_argument(
'-P', '--metrics-port',
type=int,
default=8000,
help=f'Pulsar host (default: 8000)',
)