From a9228520177fc8c749e132c22f38778a42ec9275 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Sat, 15 Mar 2025 12:38:10 +0000 Subject: [PATCH] Fix async/sync load issues with knowledge streaming APIs (#315) --- .../trustgraph/gateway/document_embeddings_load.py | 2 +- trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py | 4 ++-- trustgraph-flow/trustgraph/gateway/requestor.py | 2 -- trustgraph-flow/trustgraph/gateway/socket.py | 4 ++-- trustgraph-flow/trustgraph/gateway/triples_load.py | 2 +- 5 files changed, 6 insertions(+), 8 deletions(-) diff --git a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py index 82960966..6b4b4838 100644 --- a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py @@ -59,6 +59,6 @@ class DocumentEmbeddingsLoadEndpoint(SocketEndpoint): ], ) - await self.publisher.send(None, elt) + self.publisher.send(None, elt) running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py index 8b5328ce..c1354ce5 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py @@ -36,6 +36,7 @@ class GraphEmbeddingsLoadEndpoint(SocketEndpoint): async def listener(self, ws, running): async for msg in ws: + # On error, finish if msg.type == WSMsgType.ERROR: break @@ -59,7 +60,6 @@ class GraphEmbeddingsLoadEndpoint(SocketEndpoint): ] ) - await self.publisher.send(None, elt) - + self.publisher.send(None, elt) running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/requestor.py b/trustgraph-flow/trustgraph/gateway/requestor.py index 68ab1b58..dc74667d 100644 --- a/trustgraph-flow/trustgraph/gateway/requestor.py +++ b/trustgraph-flow/trustgraph/gateway/requestor.py @@ -53,11 +53,9 @@ class ServiceRequestor: q = self.sub.subscribe(id) - print("BOUT TO SEDN") await asyncio.to_thread( self.pub.send, id, self.to_request(request) ) - print("SENT") while True: diff --git a/trustgraph-flow/trustgraph/gateway/socket.py b/trustgraph-flow/trustgraph/gateway/socket.py index 4adc336f..c32a28af 100644 --- a/trustgraph-flow/trustgraph/gateway/socket.py +++ b/trustgraph-flow/trustgraph/gateway/socket.py @@ -19,7 +19,7 @@ class SocketEndpoint: self.operation = "socket" async def listener(self, ws, running): - + async for msg in ws: # On error, finish if msg.type == WSMsgType.TEXT: @@ -53,7 +53,7 @@ class SocketEndpoint: try: await self.listener(ws, running) except Exception as e: - print(e, flush=True) + print("Socket exception:", e, flush=True) running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/triples_load.py b/trustgraph-flow/trustgraph/gateway/triples_load.py index 0c672697..bc69975e 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_load.py +++ b/trustgraph-flow/trustgraph/gateway/triples_load.py @@ -51,7 +51,7 @@ class TriplesLoadEndpoint(SocketEndpoint): triples=to_subgraph(data["triples"]), ) - await self.publisher.send(None, elt) + self.publisher.send(None, elt) running.stop()