From 7e78aa6d91aba84fcfc66db157a89b76847b1586 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 3 Dec 2024 14:13:40 +0000 Subject: [PATCH] Reduc pulsar connections (#189) --- .../trustgraph/api/gateway/endpoint.py | 11 +++-- .../api/gateway/graph_embeddings_load.py | 4 +- .../api/gateway/graph_embeddings_stream.py | 4 +- .../trustgraph/api/gateway/publisher.py | 26 ++++++------ .../trustgraph/api/gateway/service.py | 33 ++++++++++++--- .../trustgraph/api/gateway/socket.py | 6 +++ .../trustgraph/api/gateway/subscriber.py | 42 +++++++++---------- .../trustgraph/api/gateway/triples_load.py | 4 +- .../trustgraph/api/gateway/triples_stream.py | 4 +- 9 files changed, 82 insertions(+), 52 deletions(-) diff --git a/trustgraph-flow/trustgraph/api/gateway/endpoint.py b/trustgraph-flow/trustgraph/api/gateway/endpoint.py index dc380f4b..c7cd6b04 100644 --- a/trustgraph-flow/trustgraph/api/gateway/endpoint.py +++ b/trustgraph-flow/trustgraph/api/gateway/endpoint.py @@ -41,10 +41,15 @@ class ServiceEndpoint: self.operation = "service" - async def start(self): + async def start(self, client): - self.pub_task = asyncio.create_task(self.pub.run()) - self.sub_task = asyncio.create_task(self.sub.run()) + self.pub_task = asyncio.create_task(self.pub.run(client)) + self.sub_task = asyncio.create_task(self.sub.run(client)) + + async def join(self): + + await self.pub_task + await self.sub_task def add_routes(self, app): diff --git a/trustgraph-flow/trustgraph/api/gateway/graph_embeddings_load.py b/trustgraph-flow/trustgraph/api/gateway/graph_embeddings_load.py index 15efdf5b..764e7210 100644 --- a/trustgraph-flow/trustgraph/api/gateway/graph_embeddings_load.py +++ b/trustgraph-flow/trustgraph/api/gateway/graph_embeddings_load.py @@ -29,10 +29,10 @@ class GraphEmbeddingsLoadEndpoint(SocketEndpoint): schema=JsonSchema(GraphEmbeddings) ) - async def start(self): + async def start(self, client): self.task = asyncio.create_task( - self.publisher.run() + self.publisher.run(client) ) async def listener(self, ws, running): diff --git a/trustgraph-flow/trustgraph/api/gateway/graph_embeddings_stream.py b/trustgraph-flow/trustgraph/api/gateway/graph_embeddings_stream.py index 7f3e5e18..12647547 100644 --- a/trustgraph-flow/trustgraph/api/gateway/graph_embeddings_stream.py +++ b/trustgraph-flow/trustgraph/api/gateway/graph_embeddings_stream.py @@ -28,10 +28,10 @@ class GraphEmbeddingsStreamEndpoint(SocketEndpoint): schema=JsonSchema(GraphEmbeddings) ) - async def start(self): + async def start(self, client): self.task = asyncio.create_task( - self.subscriber.run() + self.subscriber.run(client) ) async def async_thread(self, ws, running): diff --git a/trustgraph-flow/trustgraph/api/gateway/publisher.py b/trustgraph-flow/trustgraph/api/gateway/publisher.py index 1bff44dd..2bbf05d9 100644 --- a/trustgraph-flow/trustgraph/api/gateway/publisher.py +++ b/trustgraph-flow/trustgraph/api/gateway/publisher.py @@ -1,6 +1,5 @@ import asyncio -import aiopulsar class Publisher: @@ -12,24 +11,23 @@ class Publisher: self.q = asyncio.Queue(maxsize=max_size) self.chunking_enabled = chunking_enabled - async def run(self): + async def run(self, client): while True: try: - async with aiopulsar.connect(self.pulsar_host) as client: - async with client.create_producer( - topic=self.topic, - schema=self.schema, - chunking_enabled=self.chunking_enabled, - ) as producer: - while True: - id, item = await self.q.get() + async with client.create_producer( + topic=self.topic, + schema=self.schema, + chunking_enabled=self.chunking_enabled, + ) as producer: + while True: + id, item = await self.q.get() - if id: - await producer.send(item, { "id": id }) - else: - await producer.send(item) + if id: + await producer.send(item, { "id": id }) + else: + await producer.send(item) except Exception as e: print("Exception:", e, flush=True) diff --git a/trustgraph-flow/trustgraph/api/gateway/service.py b/trustgraph-flow/trustgraph/api/gateway/service.py index a25dd9dc..38a86a51 100755 --- a/trustgraph-flow/trustgraph/api/gateway/service.py +++ b/trustgraph-flow/trustgraph/api/gateway/service.py @@ -17,6 +17,7 @@ from aiohttp import web import logging import os import base64 +import aiopulsar import pulsar from pulsar.schema import JsonSchema @@ -237,13 +238,35 @@ class Api: { "error": str(e) } ) + async def run_endpoints(self): + + async with aiopulsar.connect(self.pulsar_host) as client: + + for ep in self.endpoints: + await ep.start(client) + + self.doc_ingest_pub_task = asyncio.create_task( + self.document_out.run(client) + ) + + self.text_ingest_pub_task = asyncio.create_task( + self.text_out.run(client) + ) + + print("Endpoints are running...") + + # They never exit + for ep in self.endpoints: + await ep.join() + + await self.doc_ingest_pub_task + await self.text_ingest_pub_task + + print("Endpoints are stopped.") + async def app_factory(self): - for ep in self.endpoints: - await ep.start() - - self.doc_ingest_pub_task = asyncio.create_task(self.document_out.run()) - self.text_ingest_pub_task = asyncio.create_task(self.text_out.run()) + self.endpoint_task = asyncio.create_task(self.run_endpoints()) return self.app diff --git a/trustgraph-flow/trustgraph/api/gateway/socket.py b/trustgraph-flow/trustgraph/api/gateway/socket.py index 869792b7..a4cb0feb 100644 --- a/trustgraph-flow/trustgraph/api/gateway/socket.py +++ b/trustgraph-flow/trustgraph/api/gateway/socket.py @@ -76,6 +76,12 @@ class SocketEndpoint: async def start(self): pass + async def join(self): + + # Nothing to wait for + while True: + await asyncio.sleep(100) + def add_routes(self, app): app.add_routes([ diff --git a/trustgraph-flow/trustgraph/api/gateway/subscriber.py b/trustgraph-flow/trustgraph/api/gateway/subscriber.py index 3d8840f6..ba53bab6 100644 --- a/trustgraph-flow/trustgraph/api/gateway/subscriber.py +++ b/trustgraph-flow/trustgraph/api/gateway/subscriber.py @@ -1,6 +1,5 @@ import asyncio -import aiopulsar class Subscriber: @@ -14,33 +13,32 @@ class Subscriber: self.q = {} self.full = {} - async def run(self): + async def run(self, client): while True: try: - async with aiopulsar.connect(self.pulsar_host) as client: - async with client.subscribe( - topic=self.topic, - subscription_name=self.subscription, - consumer_name=self.consumer_name, - schema=self.schema, - ) as consumer: - while True: - msg = await consumer.receive() + async with client.subscribe( + topic=self.topic, + subscription_name=self.subscription, + consumer_name=self.consumer_name, + schema=self.schema, + ) as consumer: + while True: + msg = await consumer.receive() - # Acknowledge successful reception of the message - await consumer.acknowledge(msg) + # Acknowledge successful reception of the message + await consumer.acknowledge(msg) - try: - id = msg.properties()["id"] - except: - id = None + try: + id = msg.properties()["id"] + except: + id = None - value = msg.value() - if id in self.q: - await self.q[id].put(value) + value = msg.value() + if id in self.q: + await self.q[id].put(value) - for q in self.full.values(): - await q.put(value) + for q in self.full.values(): + await q.put(value) except Exception as e: print("Exception:", e, flush=True) diff --git a/trustgraph-flow/trustgraph/api/gateway/triples_load.py b/trustgraph-flow/trustgraph/api/gateway/triples_load.py index 7f4561b1..0460d1e4 100644 --- a/trustgraph-flow/trustgraph/api/gateway/triples_load.py +++ b/trustgraph-flow/trustgraph/api/gateway/triples_load.py @@ -27,10 +27,10 @@ class TriplesLoadEndpoint(SocketEndpoint): schema=JsonSchema(Triples) ) - async def start(self): + async def start(self, client): self.task = asyncio.create_task( - self.publisher.run() + self.publisher.run(client) ) async def listener(self, ws, running): diff --git a/trustgraph-flow/trustgraph/api/gateway/triples_stream.py b/trustgraph-flow/trustgraph/api/gateway/triples_stream.py index 6ecd2bdb..571d5e61 100644 --- a/trustgraph-flow/trustgraph/api/gateway/triples_stream.py +++ b/trustgraph-flow/trustgraph/api/gateway/triples_stream.py @@ -26,10 +26,10 @@ class TriplesStreamEndpoint(SocketEndpoint): schema=JsonSchema(Triples) ) - async def start(self): + async def start(self, client): self.task = asyncio.create_task( - self.subscriber.run() + self.subscriber.run(client) ) async def async_thread(self, ws, running):