Feature/flow management cli (#346)

Flow management API + various flow management commands

trustgraph-cli/scripts/tg-delete-flow-class
trustgraph-cli/scripts/tg-get-flow-class
trustgraph-cli/scripts/tg-put-flow-class
trustgraph-cli/scripts/tg-show-flow-classes
trustgraph-cli/scripts/tg-show-flows
trustgraph-cli/scripts/tg-start-flow
trustgraph-cli/scripts/tg-stop-flow
This commit is contained in:
cybermaggedon 2025-04-24 18:57:33 +01:00 committed by GitHub
parent a9197d11ee
commit 3b021720c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
39 changed files with 1706 additions and 335 deletions

View file

@ -17,7 +17,7 @@ from .. exceptions import TooManyRequests
from . pubsub import PulsarClient
from . producer import Producer
from . consumer import Consumer
from . metrics import ProcessorMetrics
from . metrics import ProcessorMetrics, ConsumerMetrics
default_config_queue = config_push_queue
@ -30,10 +30,10 @@ class AsyncProcessor:
self.id = params.get("id")
# Register a pulsar client
self.pulsar_client = PulsarClient(**params)
self.pulsar_client_object = PulsarClient(**params)
# Initialise metrics, records the parameters
ProcessorMetrics(id=self.id).info({
ProcessorMetrics(processor = self.id).info({
k: str(params[k])
for k in params
if k != "id"
@ -57,11 +57,15 @@ class AsyncProcessor:
# service
config_subscriber_id = str(uuid.uuid4())
config_consumer_metrics = ConsumerMetrics(
processor = self.id, flow = None, name = "config",
)
# Subscribe to config queue
self.config_sub_task = Consumer(
taskgroup = self.taskgroup,
client = self.client,
client = self.pulsar_client,
subscriber = config_subscriber_id,
flow = None,
@ -70,6 +74,8 @@ class AsyncProcessor:
handler = self.on_config_change,
metrics = config_consumer_metrics,
# This causes new subscriptions to view the entire history of
# configuration
start_of_messages = True
@ -85,31 +91,28 @@ class AsyncProcessor:
# This is called to stop all threads. An over-ride point for extra
# functionality
def stop(self):
self.client.close()
self.pulsar_client.close()
self.running = False
# Returns the pulsar host
@property
def pulsar_host(self): return self.client.pulsar_host
def pulsar_host(self): return self.pulsar_client_object.pulsar_host
# Returns the pulsar client
@property
def client(self): return self.pulsar_client.client
def pulsar_client(self): return self.pulsar_client_object.client
# Register a new event handler for configuration change
def register_config_handler(self, handler):
self.config_handlers.append(handler)
# Called when a new configuration message push occurs
async def on_config_change(self, message, consumer):
async def on_config_change(self, message, consumer, flow):
# Get configuration data and version number
config = message.value().config
version = message.value().version
# Acknowledge the message
consumer.acknowledge(message)
# Invoke message handlers
print("Config change event", config, version, flush=True)
for ch in self.config_handlers:
@ -234,7 +237,7 @@ class AsyncProcessor:
PulsarClient.add_args(parser)
parser.add_argument(
'--config-push-queue',
'--config-queue',
default=default_config_queue,
help=f'Config push queue {default_config_queue}',
)

View file

@ -156,7 +156,7 @@ class Consumer:
await self.handler(msg, self, self.flow)
else:
await self.handler(msg, self.consumer)
await self.handler(msg, self, self.flow)
print("Handled.", flush=True)

View file

@ -12,13 +12,13 @@ class ConsumerSpec(Spec):
def add(self, flow, processor, definition):
consumer_metrics = ConsumerMetrics(
flow.id, f"{flow.name}-{self.name}"
processor = flow.id, flow = flow.name, name = self.name,
)
consumer = Consumer(
taskgroup = processor.taskgroup,
flow = flow,
client = processor.client,
client = processor.pulsar_client,
topic = definition[self.name],
subscriber = processor.id + "--" + self.name,
schema = self.schema,

View file

@ -4,79 +4,133 @@ from prometheus_client import Counter
class ConsumerMetrics:
def __init__(self, id, flow=None):
def __init__(self, processor, flow, name):
self.id = id
self.processor = processor
self.flow = flow
self.name = name
if not hasattr(__class__, "state_metric"):
__class__.state_metric = Enum(
'consumer_state', 'Consumer state',
["id", "flow"],
["processor", "flow", "name"],
states=['stopped', 'running']
)
if not hasattr(__class__, "request_metric"):
__class__.request_metric = Histogram(
'request_latency', 'Request latency (seconds)',
["id", "flow"],
["processor", "flow", "name"],
)
if not hasattr(__class__, "processing_metric"):
__class__.processing_metric = Counter(
'processing_count', 'Processing count',
["id", "flow", "status"]
["processor", "flow", "name", "status"],
)
if not hasattr(__class__, "rate_limit_metric"):
__class__.rate_limit_metric = Counter(
'rate_limit_count', 'Rate limit event count',
["id", "flow"]
["processor", "flow", "name"],
)
def process(self, status):
__class__.processing_metric.labels(
id=self.id, flow=self.flow, status=status
processor = self.processor, flow = self.flow, name = self.name,
status=status
).inc()
def rate_limit(self):
__class__.rate_limit_metric.labels(
id=self.id, flow=self.flow
processor = self.processor, flow = self.flow, name = self.name,
).inc()
def state(self, state):
__class__.state_metric.labels(
id=self.id, flow=self.flow
processor = self.processor, flow = self.flow, name = self.name,
).state(state)
def record_time(self):
return __class__.request_metric.labels(
id=self.id, flow=self.flow
processor = self.processor, flow = self.flow, name = self.name,
).time()
class ProducerMetrics:
def __init__(self, id, flow=None):
self.id = id
def __init__(self, processor, flow, name):
self.processor = processor
self.flow = flow
self.name = name
if not hasattr(__class__, "output_metric"):
__class__.output_metric = Counter(
'output_count', 'Output items created',
["id", "flow"]
if not hasattr(__class__, "producer_metric"):
__class__.producer_metric = Counter(
'producer_count', 'Output items produced',
["processor", "flow", "name"],
)
def inc(self):
__class__.output_metric.labels(id=self.id, flow=self.flow).inc()
__class__.producer_metric.labels(
processor = self.processor, flow = self.flow, name = self.name
).inc()
class ProcessorMetrics:
def __init__(self, id):
def __init__(self, processor):
self.id = id
self.processor = processor
if not hasattr(__class__, "processor_metric"):
__class__.processor_metric = Info(
'processor', 'Processor configuration',
["id"]
["processor"]
)
def info(self, info):
__class__.processor_metric.labels(id=self.id).info(info)
__class__.processor_metric.labels(
processor = self.processor
).info(info)
class SubscriberMetrics:
def __init__(self, processor, flow, name):
self.processor = processor
self.flow = flow
self.name = name
if not hasattr(__class__, "state_metric"):
__class__.state_metric = Enum(
'subscriber_state', 'Subscriber state',
["processor", "flow", "name"],
states=['stopped', 'running']
)
if not hasattr(__class__, "received_metric"):
__class__.received_metric = Counter(
'received_count', 'Received count',
["processor", "flow", "name"],
)
if not hasattr(__class__, "dropped_metric"):
__class__.dropped_metric = Counter(
'dropped_count', 'Dropped messages count',
["processor", "flow", "name"],
)
def received(self):
__class__.received_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
).inc()
def state(self, state):
__class__.state_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
).state(state)
def dropped(self, state):
__class__.dropped_metric.labels(
processor = self.processor, flow = self.flow, name = self.name,
).inc()

View file

@ -11,11 +11,11 @@ class ProducerSpec(Spec):
def add(self, flow, processor, definition):
producer_metrics = ProducerMetrics(
flow.id, f"{flow.name}-{self.name}"
processor = flow.id, flow = flow.name, name = self.name
)
producer = Producer(
client = processor.client,
client = processor.pulsar_client,
topic = definition[self.name],
schema = self.schema,
metrics = producer_metrics,

View file

@ -5,7 +5,7 @@ import asyncio
from . subscriber import Subscriber
from . producer import Producer
from . spec import Spec
from . metrics import ConsumerMetrics, ProducerMetrics
from . metrics import ConsumerMetrics, ProducerMetrics, SubscriberMetrics
class RequestResponse(Subscriber):
@ -23,6 +23,7 @@ class RequestResponse(Subscriber):
consumer_name = consumer_name,
topic = response_topic,
schema = response_schema,
metrics = response_metrics,
)
self.producer = Producer(
@ -116,20 +117,24 @@ class RequestResponseSpec(Spec):
def add(self, flow, processor, definition):
producer_metrics = ProducerMetrics(
flow.id, f"{flow.name}-{self.response_name}"
request_metrics = ProducerMetrics(
processor = flow.id, flow = flow.name, name = self.request_name
)
response_metrics = SubscriberMetrics(
processor = flow.id, flow = flow.name, name = self.request_name
)
rr = self.impl(
client = processor.client,
client = processor.pulsar_client,
subscription = flow.id,
consumer_name = flow.id,
request_topic = definition[self.request_name],
request_schema = self.request_schema,
request_metrics = producer_metrics,
request_metrics = request_metrics,
response_topic = definition[self.response_name],
response_schema = self.response_schema,
response_metrics = None,
response_metrics = response_metrics,
)
flow.consumer[self.request_name] = rr

View file

@ -7,7 +7,7 @@ import time
class Subscriber:
def __init__(self, client, topic, subscription, consumer_name,
schema=None, max_size=100):
schema=None, max_size=100, metrics=None):
self.client = client
self.topic = topic
self.subscription = subscription
@ -18,6 +18,7 @@ class Subscriber:
self.max_size = max_size
self.lock = asyncio.Lock()
self.running = True
self.metrics = metrics
async def __del__(self):
self.running = False
@ -36,6 +37,9 @@ class Subscriber:
while self.running:
if self.metrics:
self.metrics.state("stopped")
try:
consumer = self.client.subscribe(
@ -45,6 +49,9 @@ class Subscriber:
schema = JsonSchema(self.schema),
)
if self.metrics:
self.metrics.state("running")
print("Subscriber running...", flush=True)
while self.running:
@ -61,6 +68,9 @@ class Subscriber:
print(type(e))
raise e
if self.metrics:
self.metrics.received()
# Acknowledge successful reception of the message
consumer.acknowledge(msg)
@ -83,7 +93,9 @@ class Subscriber:
self.q[id].put(value),
timeout=2
)
except Exception as e:
self.metrics.dropped()
print("Q Put:", e, flush=True)
for q in self.full.values():
@ -94,6 +106,7 @@ class Subscriber:
timeout=2
)
except Exception as e:
self.metrics.dropped()
print("Q Put:", e, flush=True)
except Exception as e:
@ -101,6 +114,9 @@ class Subscriber:
consumer.close()
if self.metrics:
self.metrics.state("stopped")
# If handler drops out, sleep a retry
time.sleep(2)

View file

@ -1,5 +1,5 @@
from . metrics import ConsumerMetrics
from . metrics import SubscriberMetrics
from . subscriber import Subscriber
from . spec import Spec
@ -11,17 +11,17 @@ class SubscriberSpec(Spec):
def add(self, flow, processor, definition):
# FIXME: Metrics not used
subscriber_metrics = ConsumerMetrics(
flow.id, f"{flow.name}-{self.name}"
subscriber_metrics = SubscriberMetrics(
processor = flow.id, flow = flow.name, name = self.name
)
subscriber = Subscriber(
client = processor.client,
client = processor.pulsar_client,
topic = definition[self.name],
subscription = flow.id,
consumer_name = flow.id,
schema = self.schema,
metrics = subscriber_metrics,
)
# Put it in the consumer map, does that work?