Processors use shared queues, means there can be more than process on a queue to share load (#265)

This commit is contained in:
cybermaggedon 2025-01-11 18:10:04 +00:00 committed by GitHub
parent c603caa3cc
commit cd9a208432
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 4 additions and 0 deletions

View file

@ -1,5 +1,6 @@
from pulsar.schema import JsonSchema
import pulsar
from prometheus_client import Histogram, Info, Counter, Enum
import time
@ -51,6 +52,7 @@ class Consumer(BaseProcessor):
self.consumer = self.client.subscribe(
input_queue, subscriber,
consumer_type=pulsar.ConsumerType.Shared,
schema=JsonSchema(input_schema),
)

View file

@ -1,5 +1,6 @@
from pulsar.schema import JsonSchema
import pulsar
from prometheus_client import Histogram, Info, Counter, Enum
import time
@ -71,6 +72,7 @@ class ConsumerProducer(BaseProcessor):
self.consumer = self.client.subscribe(
input_queue, subscriber,
consumer_type=pulsar.ConsumerType.Shared,
schema=JsonSchema(input_schema),
)