- Change flawed _client timeout logic which was causing major lags

- Moved clients to trustgraph.clients to tidy the parent directory
- Version bump
This commit is contained in:
Cyber MacGeddon 2024-08-20 17:54:11 +01:00
parent bdf4bc2bf5
commit 20f983eec9
34 changed files with 232 additions and 238 deletions

View file

@ -126,52 +126,3 @@ class ConsumerProducer(BaseProcessor):
help=f'Output queue (default: {default_output_queue})'
)
class Producer(BaseProcessor):
def __init__(self, **params):
output_queue = params.get("output_queue")
output_schema = params.get("output_schema")
if not hasattr(__class__, "output_metric"):
__class__.output_metric = Counter(
'output_count', 'Output items created'
)
if not hasattr(__class__, "pubsub_metric"):
__class__.pubsub_metric = Info(
'pubsub', 'Pub/sub configuration'
)
__class__.pubsub_metric.info({
"output_queue": output_queue,
"output_schema": output_schema.__name__,
})
super(Producer, self).__init__(**params)
if output_schema == None:
raise RuntimeError("output_schema must be specified")
self.producer = self.client.create_producer(
topic=output_queue,
schema=JsonSchema(output_schema),
)
def send(self, msg, properties={}):
self.producer.send(msg, properties)
__class__.output_metric.inc()
@staticmethod
def add_args(
parser, default_input_queue, default_subscriber,
default_output_queue,
):
BaseProcessor.add_args(parser)
parser.add_argument(
'-o', '--output-queue',
default=default_output_queue,
help=f'Output queue (default: {default_output_queue})'
)