diff --git a/trustgraph-base/trustgraph/base/flow.py b/trustgraph-base/trustgraph/base/flow.py index 0f42bbe2..3b928d3e 100644 --- a/trustgraph-base/trustgraph/base/flow.py +++ b/trustgraph-base/trustgraph/base/flow.py @@ -34,6 +34,8 @@ class Flow: async def stop(self): for c in self.consumer.values(): await c.stop() + for p in self.producer.values(): + await p.stop() if self.librarian: await self.librarian.stop() diff --git a/trustgraph-base/trustgraph/base/producer.py b/trustgraph-base/trustgraph/base/producer.py index 20b4b0d6..9af9d22e 100644 --- a/trustgraph-base/trustgraph/base/producer.py +++ b/trustgraph-base/trustgraph/base/producer.py @@ -34,6 +34,9 @@ class Producer: async def stop(self): self.running = False + if self.producer: + self.producer.close() + self.producer = None async def send(self, msg, properties={}):