From cd9a2084321ee202345e87488a30eff035e6e272 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Sat, 11 Jan 2025 18:10:04 +0000 Subject: [PATCH] Processors use shared queues, means there can be more than process on a queue to share load (#265) --- trustgraph-base/trustgraph/base/consumer.py | 2 ++ trustgraph-base/trustgraph/base/consumer_producer.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index eeaf83a1..f346f1bc 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -1,5 +1,6 @@ from pulsar.schema import JsonSchema +import pulsar from prometheus_client import Histogram, Info, Counter, Enum import time @@ -51,6 +52,7 @@ class Consumer(BaseProcessor): self.consumer = self.client.subscribe( input_queue, subscriber, + consumer_type=pulsar.ConsumerType.Shared, schema=JsonSchema(input_schema), ) diff --git a/trustgraph-base/trustgraph/base/consumer_producer.py b/trustgraph-base/trustgraph/base/consumer_producer.py index 31441cda..6d386894 100644 --- a/trustgraph-base/trustgraph/base/consumer_producer.py +++ b/trustgraph-base/trustgraph/base/consumer_producer.py @@ -1,5 +1,6 @@ from pulsar.schema import JsonSchema +import pulsar from prometheus_client import Histogram, Info, Counter, Enum import time @@ -71,6 +72,7 @@ class ConsumerProducer(BaseProcessor): self.consumer = self.client.subscribe( input_queue, subscriber, + consumer_type=pulsar.ConsumerType.Shared, schema=JsonSchema(input_schema), )