diff --git a/trustgraph-base/trustgraph/base/consumer.py b/trustgraph-base/trustgraph/base/consumer.py index 8f262b83..162e10eb 100644 --- a/trustgraph-base/trustgraph/base/consumer.py +++ b/trustgraph-base/trustgraph/base/consumer.py @@ -45,7 +45,9 @@ class Consumer: if hasattr(self, "consumer"): if self.consumer: + self.consumer.unsubscribe() self.consumer.close() + self.consumer = None async def stop(self): @@ -108,11 +110,16 @@ class Consumer: except Exception as e: print("consumer loop exception:", e, flush=True) + self.consumer.unsubscribe() self.consumer.close() self.consumer = None await asyncio.sleep(self.reconnect_time) continue + if self.consumer: + self.consumer.unsubscribe() + self.consumer.close() + async def consume(self): while self.running: diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index dfc7f791..8467b0bf 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -21,10 +21,21 @@ class Subscriber: self.metrics = metrics self.task = None + self.consumer = None + def __del__(self): + self.running = False async def start(self): + + self.consumer = self.client.subscribe( + topic = self.topic, + subscription_name = self.subscription, + consumer_name = self.consumer_name, + schema = JsonSchema(self.schema), + ) + self.task = asyncio.create_task(self.run()) async def stop(self): @@ -41,8 +52,6 @@ class Subscriber: async def run(self): - consumer = None - while self.running: if self.metrics: @@ -50,15 +59,6 @@ class Subscriber: try: - # FIXME: Create consumer in start method so we know - # it is definitely running when start completes - consumer = self.client.subscribe( - topic = self.topic, - subscription_name = self.subscription, - consumer_name = self.consumer_name, - schema = JsonSchema(self.schema), - ) - if self.metrics: self.metrics.state("running") @@ -68,7 +68,7 @@ class Subscriber: try: msg = await asyncio.to_thread( - consumer.receive, + self.consumer.receive, timeout_millis=250 ) except _pulsar.Timeout: @@ -82,7 +82,7 @@ class Subscriber: self.metrics.received() # Acknowledge successful reception of the message - consumer.acknowledge(msg) + self.consumer.acknowledge(msg) try: id = msg.properties()["id"] @@ -124,9 +124,10 @@ class Subscriber: finally: - if consumer: - consumer.close() - consumer = None + if self.consumer: + self.consumer.unsubscribe() + self.consumer.close() + self.consumer = None if self.metrics: