Feature/track processor state (#78)

* Add a Prom metric to consumers & consumer/producers to track the running
state.

* New script, gets processor state using prometheus

* Bump version, add tg-processor-state to package

* Update templates
This commit is contained in:
cybermaggedon 2024-09-29 23:50:57 +01:00 committed by GitHub
parent efc364583b
commit 74a14639bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 384 additions and 336 deletions

View file

@ -1,6 +1,6 @@
from pulsar.schema import JsonSchema
from prometheus_client import start_http_server, Histogram, Info, Counter
from prometheus_client import Histogram, Info, Counter, Enum
import time
from . base_processor import BaseProcessor
@ -10,6 +10,15 @@ class Consumer(BaseProcessor):
def __init__(self, **params):
if not hasattr(__class__, "state_metric"):
__class__.state_metric = Enum(
'processor_state', 'Processor state',
states=['starting', 'running', 'stopped']
)
__class__.state_metric.state('starting')
__class__.state_metric.state('starting')
super(Consumer, self).__init__(**params)
input_queue = params.get("input_queue")
@ -47,6 +56,8 @@ class Consumer(BaseProcessor):
def run(self):
__class__.state_metric.state('running')
while True:
msg = self.consumer.receive()

View file

@ -1,6 +1,6 @@
from pulsar.schema import JsonSchema
from prometheus_client import Histogram, Info, Counter
from prometheus_client import Histogram, Info, Counter, Enum
import time
from . base_processor import BaseProcessor
@ -12,6 +12,15 @@ class ConsumerProducer(BaseProcessor):
def __init__(self, **params):
if not hasattr(__class__, "state_metric"):
__class__.state_metric = Enum(
'processor_state', 'Processor state',
states=['starting', 'running', 'stopped']
)
__class__.state_metric.state('starting')
__class__.state_metric.state('starting')
input_queue = params.get("input_queue")
output_queue = params.get("output_queue")
subscriber = params.get("subscriber")
@ -66,6 +75,8 @@ class ConsumerProducer(BaseProcessor):
def run(self):
__class__.state_metric.state('running')
while True:
msg = self.consumer.receive()