diff --git a/templates/components/trustgraph.jsonnet b/templates/components/trustgraph.jsonnet index e178cc27..37c05dae 100644 --- a/templates/components/trustgraph.jsonnet +++ b/templates/components/trustgraph.jsonnet @@ -5,9 +5,51 @@ local prompt = import "prompt-template.jsonnet"; { + "api-gateway-port":: 8088, + "api-gateway-timeout":: 600, + "chunk-size":: 250, "chunk-overlap":: 15, + "api-gateway" +: { + + create:: function(engine) + + local port = $["api-gateway-port"]; + + local container = + engine.container("api-gateway") + .with_image(images.trustgraph) + .with_command([ + "api-gateway", + "-p", + url.pulsar, + "--timeout", + std.toString($["api-gateway-timeout"]), + "--port", + std.toString(port), + ]) + .with_limits("0.5", "256M") + .with_reservations("0.1", "256M") + .with_port(8000, 8000, "metrics") + .with_port(port, port, "api"); + + local containerSet = engine.containers( + "api-gateway", [ container ] + ); + + local service = + engine.internalService(containerSet) + .with_port(8000, 8000, "metrics") + .with_port(port, port, "api"); + + engine.resources([ + containerSet, + service, + ]) + + }, + "chunker" +: { create:: function(engine) diff --git a/trustgraph-flow/scripts/api-gateway b/trustgraph-flow/scripts/api-gateway index 748b5c7d..dd7d54ac 100755 --- a/trustgraph-flow/scripts/api-gateway +++ b/trustgraph-flow/scripts/api-gateway @@ -1,10 +1,17 @@ #!/usr/bin/env python3 +# FIXME: Subscribes to Pulsar unnecessarily, should only do it when there +# are active listeners + +# FIXME: Connection errors in publishers / subscribers cause those threads +# to fail and are not failed or retried + import asyncio from aiohttp import web import json import logging import uuid +import os import pulsar from pulsar.asyncio import Client @@ -42,7 +49,7 @@ from trustgraph.schema import embeddings_response_queue logger = logging.getLogger("api") logger.setLevel(logging.INFO) -pulsar_host = "pulsar://localhost:6650" +pulsar_host = os.getenv("PULSAR_HOST", "pulsar://pulsar:6650") TIME_OUT = 600 class Publisher: @@ -54,15 +61,18 @@ class Publisher: self.q = asyncio.Queue(maxsize=max_size) async def run(self): - async with aiopulsar.connect(self.pulsar_host) as client: - async with client.create_producer( - topic=self.topic, - schema=self.schema, - ) as producer: - while True: - id, item = await self.q.get() - await producer.send(item, { "id": id }) -# print("message out") + try: + async with aiopulsar.connect(self.pulsar_host) as client: + async with client.create_producer( + topic=self.topic, + schema=self.schema, + ) as producer: + while True: + id, item = await self.q.get() + await producer.send(item, { "id": id }) + # print("message out") + except Exception as e: + print("Exception:", e, flush=True) async def send(self, id, msg): await self.q.put((id, msg)) @@ -79,20 +89,23 @@ class Subscriber: self.q = {} async def run(self): - async with aiopulsar.connect(pulsar_host) as client: - async with client.subscribe( - topic=self.topic, - subscription_name=self.subscription, - consumer_name=self.consumer_name, - schema=self.schema, - ) as consumer: - while True: - msg = await consumer.receive() -# print("message in", self.topic) - id = msg.properties()["id"] - value = msg.value() - if id in self.q: - await self.q[id].put(value) + try: + async with aiopulsar.connect(pulsar_host) as client: + async with client.subscribe( + topic=self.topic, + subscription_name=self.subscription, + consumer_name=self.consumer_name, + schema=self.schema, + ) as consumer: + while True: + msg = await consumer.receive() + # print("message in", self.topic) + id = msg.properties()["id"] + value = msg.value() + if id in self.q: + await self.q[id].put(value) + except Exception as e: + print("Exception:", e, flush=True) async def subscribe(self, id): q = asyncio.Queue() diff --git a/trustgraph-flow/setup.py b/trustgraph-flow/setup.py index 4f7b3383..44901119 100644 --- a/trustgraph-flow/setup.py +++ b/trustgraph-flow/setup.py @@ -59,8 +59,10 @@ setuptools.setup( "ibis", "jsonschema", "aiohttp", + "aiopulsar-py", ], scripts=[ + "scripts/api-gateway", "scripts/agent-manager-react", "scripts/chunker-recursive", "scripts/chunker-token",