diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py index 1f459081..e486f613 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py @@ -22,6 +22,9 @@ class DocumentEmbeddingsImport: pulsar_client, topic = queue, schema = DocumentEmbeddings ) + async def start(self): + await self.publisher.start() + async def destroy(self): self.running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py index 70e78c87..85174460 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py @@ -22,6 +22,9 @@ class GraphEmbeddingsImport: pulsar_client, topic = queue, schema = GraphEmbeddings ) + async def start(self): + await self.publisher.start() + async def destroy(self): self.running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index 7896d588..f0cbc234 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -160,6 +160,8 @@ class DispatcherManager: queue = qconfig, ) + await dispatcher.start() + return dispatcher async def process_flow_export(self, ws, running, params): diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py index 9b59a0ed..687b424a 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py @@ -22,6 +22,9 @@ class TriplesImport: pulsar_client, topic = queue, schema = Triples ) + async def start(self): + await self.publisher.start() + async def destroy(self): self.running.stop()