From 410636b409e3f199d00cbf0f19647a294dd74031 Mon Sep 17 00:00:00 2001 From: Cyber MacGeddon Date: Sat, 17 May 2025 13:01:52 +0100 Subject: [PATCH] Fix missing queue initialisation --- .../trustgraph/gateway/dispatch/document_embeddings_import.py | 3 +++ .../trustgraph/gateway/dispatch/graph_embeddings_import.py | 3 +++ trustgraph-flow/trustgraph/gateway/dispatch/manager.py | 2 ++ trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py | 3 +++ 4 files changed, 11 insertions(+) diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py index 1f459081..e486f613 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/document_embeddings_import.py @@ -22,6 +22,9 @@ class DocumentEmbeddingsImport: pulsar_client, topic = queue, schema = DocumentEmbeddings ) + async def start(self): + await self.publisher.start() + async def destroy(self): self.running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py index 70e78c87..85174460 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/graph_embeddings_import.py @@ -22,6 +22,9 @@ class GraphEmbeddingsImport: pulsar_client, topic = queue, schema = GraphEmbeddings ) + async def start(self): + await self.publisher.start() + async def destroy(self): self.running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index 7896d588..f0cbc234 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -160,6 +160,8 @@ class DispatcherManager: queue = qconfig, ) + await dispatcher.start() + return dispatcher async def process_flow_export(self, ws, running, params): diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py index 9b59a0ed..687b424a 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/triples_import.py @@ -22,6 +22,9 @@ class TriplesImport: pulsar_client, topic = queue, schema = Triples ) + async def start(self): + await self.publisher.start() + async def destroy(self): self.running.stop()