diff --git a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py index 82960966..6b4b4838 100644 --- a/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/document_embeddings_load.py @@ -59,6 +59,6 @@ class DocumentEmbeddingsLoadEndpoint(SocketEndpoint): ], ) - await self.publisher.send(None, elt) + self.publisher.send(None, elt) running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py index 8b5328ce..c1354ce5 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_load.py @@ -36,6 +36,7 @@ class GraphEmbeddingsLoadEndpoint(SocketEndpoint): async def listener(self, ws, running): async for msg in ws: + # On error, finish if msg.type == WSMsgType.ERROR: break @@ -59,7 +60,6 @@ class GraphEmbeddingsLoadEndpoint(SocketEndpoint): ] ) - await self.publisher.send(None, elt) - + self.publisher.send(None, elt) running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/requestor.py b/trustgraph-flow/trustgraph/gateway/requestor.py index 68ab1b58..dc74667d 100644 --- a/trustgraph-flow/trustgraph/gateway/requestor.py +++ b/trustgraph-flow/trustgraph/gateway/requestor.py @@ -53,11 +53,9 @@ class ServiceRequestor: q = self.sub.subscribe(id) - print("BOUT TO SEDN") await asyncio.to_thread( self.pub.send, id, self.to_request(request) ) - print("SENT") while True: diff --git a/trustgraph-flow/trustgraph/gateway/socket.py b/trustgraph-flow/trustgraph/gateway/socket.py index 4adc336f..c32a28af 100644 --- a/trustgraph-flow/trustgraph/gateway/socket.py +++ b/trustgraph-flow/trustgraph/gateway/socket.py @@ -19,7 +19,7 @@ class SocketEndpoint: self.operation = "socket" async def listener(self, ws, running): - + async for msg in ws: # On error, finish if msg.type == WSMsgType.TEXT: @@ -53,7 +53,7 @@ class SocketEndpoint: try: await self.listener(ws, running) except Exception as e: - print(e, flush=True) + print("Socket exception:", e, flush=True) running.stop() diff --git a/trustgraph-flow/trustgraph/gateway/triples_load.py b/trustgraph-flow/trustgraph/gateway/triples_load.py index 0c672697..bc69975e 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_load.py +++ b/trustgraph-flow/trustgraph/gateway/triples_load.py @@ -51,7 +51,7 @@ class TriplesLoadEndpoint(SocketEndpoint): triples=to_subgraph(data["triples"]), ) - await self.publisher.send(None, elt) + self.publisher.send(None, elt) running.stop()