Merge pull request #381 from trustgraph-ai/fix/import-queues-not-working

Fix missing queue initialisation
This commit is contained in:
cybermaggedon 2025-05-17 13:02:50 +01:00 committed by GitHub
commit 7d90696ec1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 11 additions and 0 deletions

View file

@ -22,6 +22,9 @@ class DocumentEmbeddingsImport:
pulsar_client, topic = queue, schema = DocumentEmbeddings pulsar_client, topic = queue, schema = DocumentEmbeddings
) )
async def start(self):
await self.publisher.start()
async def destroy(self): async def destroy(self):
self.running.stop() self.running.stop()

View file

@ -22,6 +22,9 @@ class GraphEmbeddingsImport:
pulsar_client, topic = queue, schema = GraphEmbeddings pulsar_client, topic = queue, schema = GraphEmbeddings
) )
async def start(self):
await self.publisher.start()
async def destroy(self): async def destroy(self):
self.running.stop() self.running.stop()

View file

@ -160,6 +160,8 @@ class DispatcherManager:
queue = qconfig, queue = qconfig,
) )
await dispatcher.start()
return dispatcher return dispatcher
async def process_flow_export(self, ws, running, params): async def process_flow_export(self, ws, running, params):

View file

@ -22,6 +22,9 @@ class TriplesImport:
pulsar_client, topic = queue, schema = Triples pulsar_client, topic = queue, schema = Triples
) )
async def start(self):
await self.publisher.start()
async def destroy(self): async def destroy(self):
self.running.stop() self.running.stop()