diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index eeaf83a1..f346f1bc 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -1,5 +1,6 @@ from pulsar.schema import JsonSchema +import pulsar from prometheus_client import Histogram, Info, Counter, Enum import time @@ -51,6 +52,7 @@ class Consumer(BaseProcessor): self.consumer = self.client.subscribe( input_queue, subscriber, + consumer_type=pulsar.ConsumerType.Shared, schema=JsonSchema(input_schema), ) diff --git a/trustgraph-base/trustgraph/base/consumer_producer.py b/trustgraph-base/trustgraph/base/consumer_producer.py index 31441cda..6d386894 100644 --- a/trustgraph-base/trustgraph/base/consumer_producer.py +++ b/trustgraph-base/trustgraph/base/consumer_producer.py @@ -1,5 +1,6 @@ from pulsar.schema import JsonSchema +import pulsar from prometheus_client import Histogram, Info, Counter, Enum import time @@ -71,6 +72,7 @@ class ConsumerProducer(BaseProcessor): self.consumer = self.client.subscribe( input_queue, subscriber, + consumer_type=pulsar.ConsumerType.Shared, schema=JsonSchema(input_schema), )