diff --git a/test-api/test-agent-api b/test-api/test-agent-api index f36ba196..bba0e70d 100755 --- a/test-api/test-agent-api +++ b/test-api/test-agent-api @@ -4,7 +4,7 @@ import requests import json import sys -url = "http://localhost:8088/api/v1/" +url = "http://localhost:8088/api/v1/flow/0000/agent" ############################################################################ @@ -13,10 +13,11 @@ input = { } resp = requests.post( - f"{url}agent", + url, json=input, ) +print(resp.text) resp = resp.json() if "error" in resp: @@ -25,4 +26,3 @@ if "error" in resp: print(resp["answer"]) - diff --git a/test-api/test-embeddings-api b/test-api/test-embeddings-api index b1defd01..dd15af31 100755 --- a/test-api/test-embeddings-api +++ b/test-api/test-embeddings-api @@ -4,7 +4,7 @@ import requests import json import sys -url = "http://localhost:8088/api/v1/" +url = "http://localhost:8088/api/v1/flow/0000/embeddings" ############################################################################ @@ -13,7 +13,7 @@ input = { } resp = requests.post( - f"{url}embeddings", + url, json=input, ) @@ -25,4 +25,3 @@ if "error" in resp: print(resp["vectors"]) - diff --git a/test-api/test-graph-rag-api b/test-api/test-graph-rag-api index c329934c..886d0c15 100755 --- a/test-api/test-graph-rag-api +++ b/test-api/test-graph-rag-api @@ -4,7 +4,7 @@ import requests import json import sys -url = "http://localhost:8088/api/v1/" +url = "http://localhost:8088/api/v1/flow/0000/graph-rag" ############################################################################ @@ -13,7 +13,7 @@ input = { } resp = requests.post( - f"{url}graph-rag", + url, json=input, ) diff --git a/test-api/test-llm-api b/test-api/test-llm-api index 6bee2048..fa100b15 100755 --- a/test-api/test-llm-api +++ b/test-api/test-llm-api @@ -4,7 +4,7 @@ import requests import json import sys -url = "http://localhost:8088/api/v1/" +url = "http://localhost:8088/api/v1/flow/0000/text-completion" ############################################################################ @@ -15,7 +15,7 @@ input = { } resp = requests.post( - f"{url}text-completion", + url, json=input, ) diff --git a/test-api/test-prompt-api b/test-api/test-prompt-api index 4f69f09a..8cd6615b 100755 --- a/test-api/test-prompt-api +++ b/test-api/test-prompt-api @@ -4,7 +4,7 @@ import requests import json import sys -url = "http://localhost:8088/api/v1/" +url = "http://localhost:8088/api/v1/flow/0000/prompt" ############################################################################ @@ -16,7 +16,7 @@ input = { } resp = requests.post( - f"{url}prompt", + url, json=input, ) diff --git a/test-api/test-prompt2-api b/test-api/test-prompt2-api index 1e641439..8b9c55c7 100755 --- a/test-api/test-prompt2-api +++ b/test-api/test-prompt2-api @@ -4,7 +4,7 @@ import requests import json import sys -url = "http://localhost:8088/api/v1/" +url = "http://localhost:8088/api/v1/flow/0000/prompt" ############################################################################ @@ -16,7 +16,7 @@ input = { } resp = requests.post( - f"{url}prompt", + url, json=input, ) diff --git a/tests/test-load-pdf b/tests/test-load-pdf index 838a57ce..c57ebcc1 100755 --- a/tests/test-load-pdf +++ b/tests/test-load-pdf @@ -9,7 +9,7 @@ from trustgraph.schema import Document, Metadata client = pulsar.Client("pulsar://localhost:6650", listener_name="localhost") prod = client.create_producer( - topic="persistent://tg/flow/document-load:0000", + topic="persistent://tg/flow/document-load:0002", schema=JsonSchema(Document), chunking_enabled=True, ) diff --git a/trustgraph-base/trustgraph/base/publisher.py b/trustgraph-base/trustgraph/base/publisher.py index ce9e364e..bc302599 100644 --- a/trustgraph-base/trustgraph/base/publisher.py +++ b/trustgraph-base/trustgraph/base/publisher.py @@ -21,6 +21,7 @@ class Publisher: async def stop(self): self.running = False + await self.task async def join(self): await self.stop() @@ -42,7 +43,7 @@ class Publisher: try: id, item = await asyncio.wait_for( self.q.get(), - timeout=0.5 + timeout=0.25 ) except asyncio.TimeoutError: continue @@ -57,8 +58,11 @@ class Publisher: except Exception as e: print("Exception:", e, flush=True) + if not self.running: + return + # If handler drops out, sleep a retry - time.sleep(2) + await asyncio.sleep(1) async def send(self, id, item): await self.q.put((id, item)) diff --git a/trustgraph-base/trustgraph/base/subscriber.py b/trustgraph-base/trustgraph/base/subscriber.py index 127d2add..4f5d1455 100644 --- a/trustgraph-base/trustgraph/base/subscriber.py +++ b/trustgraph-base/trustgraph/base/subscriber.py @@ -28,6 +28,7 @@ class Subscriber: async def stop(self): self.running = False + await self.task async def join(self): await self.stop() @@ -35,6 +36,8 @@ class Subscriber: async def run(self): + consumer = None + while self.running: if self.metrics: @@ -59,7 +62,7 @@ class Subscriber: try: msg = await asyncio.to_thread( consumer.receive, - timeout_millis=2000 + timeout_millis=250 ) except _pulsar.Timeout: continue @@ -91,7 +94,7 @@ class Subscriber: # FIXME: Timeout means data goes missing await asyncio.wait_for( self.q[id].put(value), - timeout=2 + timeout=1 ) except Exception as e: @@ -103,7 +106,7 @@ class Subscriber: # FIXME: Timeout means data goes missing await asyncio.wait_for( q.put(value), - timeout=2 + timeout=1 ) except Exception as e: self.metrics.dropped() @@ -112,13 +115,21 @@ class Subscriber: except Exception as e: print("Subscriber exception:", e, flush=True) - consumer.close() + finally: + + if consumer: + consumer.close() + consumer = None + if self.metrics: self.metrics.state("stopped") + if not self.running: + return + # If handler drops out, sleep a retry - time.sleep(2) + await asyncio.sleep(1) async def subscribe(self, id): diff --git a/trustgraph-flow/trustgraph/gateway/agent.py b/trustgraph-flow/trustgraph/gateway/agent.py index 5a54931b..c810b2dd 100644 --- a/trustgraph-flow/trustgraph/gateway/agent.py +++ b/trustgraph-flow/trustgraph/gateway/agent.py @@ -1,20 +1,23 @@ from .. schema import AgentRequest, AgentResponse -from .. schema import agent_request_queue -from .. schema import agent_response_queue from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class AgentRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout, auth): + def __init__( + self, pulsar_client, request_queue, response_queue, timeout, auth, + consumer, subscriber, + ): super(AgentRequestor, self).__init__( pulsar_client=pulsar_client, - request_queue=agent_request_queue, - response_queue=agent_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=AgentRequest, response_schema=AgentResponse, + subscription = subscriber, + consumer_name = consumer, timeout=timeout, ) diff --git a/trustgraph-flow/trustgraph/gateway/embeddings.py b/trustgraph-flow/trustgraph/gateway/embeddings.py index 42ed91a1..3bce6010 100644 --- a/trustgraph-flow/trustgraph/gateway/embeddings.py +++ b/trustgraph-flow/trustgraph/gateway/embeddings.py @@ -1,20 +1,23 @@ from .. schema import EmbeddingsRequest, EmbeddingsResponse -from .. schema import embeddings_request_queue -from .. schema import embeddings_response_queue from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class EmbeddingsRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout, auth): + def __init__( + self, pulsar_client, request_queue, response_queue, timeout, auth, + consumer, subscriber, + ): super(EmbeddingsRequestor, self).__init__( pulsar_client=pulsar_client, - request_queue=embeddings_request_queue, - response_queue=embeddings_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=EmbeddingsRequest, response_schema=EmbeddingsResponse, + subscription = subscriber, + consumer_name = consumer, timeout=timeout, ) diff --git a/trustgraph-flow/trustgraph/gateway/flow_endpoint.py b/trustgraph-flow/trustgraph/gateway/flow_endpoint.py new file mode 100644 index 00000000..8c69af76 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/flow_endpoint.py @@ -0,0 +1,75 @@ + +import asyncio +from aiohttp import web +import uuid +import logging + +logger = logging.getLogger("flow-endpoint") +logger.setLevel(logging.INFO) + +class FlowEndpoint: + + def __init__(self, endpoint_path, auth, requestors): + + self.path = endpoint_path + + self.auth = auth + self.operation = "service" + + self.requestors = requestors + + async def start(self): + pass + + def add_routes(self, app): + + pass + app.add_routes([ + web.post(self.path, self.handle), + ]) + + async def handle(self, request): + + print(request.path, "...") + + flow_id = request.match_info['flow'] + kind = request.match_info['kind'] + k = (flow_id, kind) + + if k not in self.requestors: + raise web.HTTPBadRequest() + + requestor = self.requestors[k] + + try: + ht = request.headers["Authorization"] + tokens = ht.split(" ", 2) + if tokens[0] != "Bearer": + return web.HTTPUnauthorized() + token = tokens[1] + except: + token = "" + + if not self.auth.permitted(token, self.operation): + return web.HTTPUnauthorized() + + try: + + data = await request.json() + + print(data) + + async def responder(x, fin): + print(x) + + resp = await requestor.process(data, responder) + + return web.json_response(resp) + + except Exception as e: + logging.error(f"Exception: {e}") + + return web.json_response( + { "error": str(e) } + ) + diff --git a/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py b/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py index 8df38e97..8cba4f9a 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py +++ b/trustgraph-flow/trustgraph/gateway/graph_embeddings_query.py @@ -1,21 +1,24 @@ from .. schema import GraphEmbeddingsRequest, GraphEmbeddingsResponse -from .. schema import graph_embeddings_request_queue -from .. schema import graph_embeddings_response_queue from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor from . serialize import serialize_value class GraphEmbeddingsQueryRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout, auth): + def __init__( + self, pulsar_client, request_queue, response_queue, timeout, auth, + consumer, subscriber, + ): super(GraphEmbeddingsQueryRequestor, self).__init__( pulsar_client=pulsar_client, - request_queue=graph_embeddings_request_queue, - response_queue=graph_embeddings_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=GraphEmbeddingsRequest, response_schema=GraphEmbeddingsResponse, + subscription = subscriber, + consumer_name = consumer, timeout=timeout, ) diff --git a/trustgraph-flow/trustgraph/gateway/graph_rag.py b/trustgraph-flow/trustgraph/gateway/graph_rag.py index b2b69758..3dc9fcfe 100644 --- a/trustgraph-flow/trustgraph/gateway/graph_rag.py +++ b/trustgraph-flow/trustgraph/gateway/graph_rag.py @@ -1,20 +1,23 @@ from .. schema import GraphRagQuery, GraphRagResponse -from .. schema import graph_rag_request_queue -from .. schema import graph_rag_response_queue from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class GraphRagRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout, auth): + def __init__( + self, pulsar_client, request_queue, response_queue, timeout, auth, + consumer, subscriber, + ): super(GraphRagRequestor, self).__init__( pulsar_client=pulsar_client, - request_queue=graph_rag_request_queue, - response_queue=graph_rag_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=GraphRagQuery, response_schema=GraphRagResponse, + subscription = subscriber, + consumer_name = consumer, timeout=timeout, ) diff --git a/trustgraph-flow/trustgraph/gateway/prompt.py b/trustgraph-flow/trustgraph/gateway/prompt.py index eb50ac73..86a9e788 100644 --- a/trustgraph-flow/trustgraph/gateway/prompt.py +++ b/trustgraph-flow/trustgraph/gateway/prompt.py @@ -2,21 +2,24 @@ import json from .. schema import PromptRequest, PromptResponse -from .. schema import prompt_request_queue -from .. schema import prompt_response_queue from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class PromptRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout, auth): + def __init__( + self, pulsar_client, request_queue, response_queue, timeout, auth, + consumer, subscriber, + ): super(PromptRequestor, self).__init__( pulsar_client=pulsar_client, - request_queue=prompt_request_queue, - response_queue=prompt_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=PromptRequest, response_schema=PromptResponse, + subscription = subscriber, + consumer_name = consumer, timeout=timeout, ) diff --git a/trustgraph-flow/trustgraph/gateway/requestor.py b/trustgraph-flow/trustgraph/gateway/requestor.py index 63395203..04837518 100644 --- a/trustgraph-flow/trustgraph/gateway/requestor.py +++ b/trustgraph-flow/trustgraph/gateway/requestor.py @@ -34,10 +34,13 @@ class ServiceRequestor: self.timeout = timeout async def start(self): - await self.pub.start() await self.sub.start() + async def stop(self): + await self.pub.stop() + await self.sub.stop() + def to_request(self, request): raise RuntimeError("Not defined") diff --git a/trustgraph-flow/trustgraph/gateway/service.py b/trustgraph-flow/trustgraph/gateway/service.py index d7df3240..322d32d0 100755 --- a/trustgraph-flow/trustgraph/gateway/service.py +++ b/trustgraph-flow/trustgraph/gateway/service.py @@ -17,6 +17,8 @@ from aiohttp import web import logging import os import base64 +import uuid +import json import pulsar from prometheus_client import start_http_server @@ -26,15 +28,17 @@ from .. log_level import LogLevel from . serialize import to_subgraph from . running import Running -#from . text_completion import TextCompletionRequestor -#from . prompt import PromptRequestor -#from . graph_rag import GraphRagRequestor +from .. schema import ConfigPush, config_push_queue + +from . text_completion import TextCompletionRequestor +from . prompt import PromptRequestor +from . graph_rag import GraphRagRequestor #from . document_rag import DocumentRagRequestor -#from . triples_query import TriplesQueryRequestor -#from . graph_embeddings_query import GraphEmbeddingsQueryRequestor -#from . embeddings import EmbeddingsRequestor +from . triples_query import TriplesQueryRequestor +from . graph_embeddings_query import GraphEmbeddingsQueryRequestor +from . embeddings import EmbeddingsRequestor #from . encyclopedia import EncyclopediaRequestor -#from . agent import AgentRequestor +from . agent import AgentRequestor #from . dbpedia import DbpediaRequestor #from . internet_search import InternetSearchRequestor #from . librarian import LibrarianRequestor @@ -52,7 +56,10 @@ from . mux import MuxEndpoint from . metrics import MetricsEndpoint from . endpoint import ServiceEndpoint +from . flow_endpoint import FlowEndpoint from . auth import Authenticator +from .. base import Subscriber +from .. base import Consumer logger = logging.getLogger("api") logger.setLevel(logging.INFO) @@ -68,11 +75,6 @@ class Api: def __init__(self, **config): - self.app = web.Application( - middlewares=[], - client_max_size=256 * 1024 * 1024 - ) - self.port = int(config.get("port", default_port)) self.timeout = int(config.get("timeout", default_timeout)) self.pulsar_host = config.get("pulsar_host", default_pulsar_host) @@ -143,11 +145,11 @@ class Api: # pulsar_client=self.pulsar_client, timeout=self.timeout, # auth = self.auth, # ), - "config": ConfigRequestor( + (None, "config"): ConfigRequestor( pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), - "flow": FlowRequestor( + (None, "flow"): FlowRequestor( pulsar_client=self.pulsar_client, timeout=self.timeout, auth = self.auth, ), @@ -211,11 +213,16 @@ class Api: # ), ServiceEndpoint( endpoint_path = "/api/v1/config", auth=self.auth, - requestor = self.services["config"], + requestor = self.services[(None, "config")], ), ServiceEndpoint( endpoint_path = "/api/v1/flow", auth=self.auth, - requestor = self.services["flow"], + requestor = self.services[(None, "flow")], + ), + FlowEndpoint( + endpoint_path = "/api/v1/flow/{flow}/{kind}", + auth=self.auth, + requestors = self.services, ), # ServiceEndpoint( # endpoint_path = "/api/v1/encyclopedia", auth=self.auth, @@ -273,10 +280,117 @@ class Api: ), ] - for ep in self.endpoints: - ep.add_routes(self.app) + self.flows = {} + + async def on_config(self, msg, proc, flow): + + try: + + v = msg.value() + + print(f"Config version", v.version) + + if "flows" in v.config: + + flows = v.config["flows"] + + wanted = list(flows.keys()) + current = list(self.flows.keys()) + + for k in wanted: + if k not in current: + self.flows[k] = json.loads(flows[k]) + await self.start_flow(k, self.flows[k]) + + for k in current: + if k not in wanted: + await self.stop_flow(k, self.flows[k]) + del self.flows[k] + + except Exception as e: + print(f"Exception: {e}", flush=True) + + async def start_flow(self, id, flow): + + print("Start flow", id) + intf = flow["interfaces"] + + kinds = { + "agent": AgentRequestor, + "text-completion": TextCompletionRequestor, + "prompt": PromptRequestor, + "graph-rag": GraphRagRequestor, + "embeddings": EmbeddingsRequestor, + "graph-embeddings": GraphEmbeddingsQueryRequestor, + "triples-query": TriplesQueryRequestor, + } + + for api_kind, requestor in kinds.items(): + + if api_kind in intf: + k = (id, api_kind) + if k in self.services: + await self.services[k].stop() + del self.services[k] + + self.services[k] = requestor( + pulsar_client=self.pulsar_client, timeout=self.timeout, + request_queue = intf[api_kind]["request"], + response_queue = intf[api_kind]["response"], + consumer = f"api-gateway-{id}-{api_kind}-request", + subscriber = f"api-gateway-{id}-{api_kind}-request", + auth = self.auth, + ) + await self.services[k].start() + + async def stop_flow(self, id, flow): + print("Stop flow", id) + intf = flow["interfaces"] + + svc_list = list(self.services.keys()) + + for k in svc_list: + + kid, kkind = k + + if id == kid: + await self.services[k].stop() + del self.services[k] + + async def config_loader(self): + + async with asyncio.TaskGroup() as tg: + + id = str(uuid.uuid4()) + + self.config_cons = Consumer( + taskgroup = tg, + flow = None, + client = self.pulsar_client, + subscriber = f"gateway-{id}", + topic = config_push_queue, + schema = ConfigPush, + handler = self.on_config, + start_of_messages = True, + ) + + await self.config_cons.start() + + print("Waiting...") + + print("Config consumer done. :/") async def app_factory(self): + + self.app = web.Application( + middlewares=[], + client_max_size=256 * 1024 * 1024 + ) + + asyncio.create_task(self.config_loader()) + + for ep in self.endpoints: + ep.add_routes(self.app) for ep in self.endpoints: await ep.start() diff --git a/trustgraph-flow/trustgraph/gateway/text_completion.py b/trustgraph-flow/trustgraph/gateway/text_completion.py index ec84e5d6..3c6d1c38 100644 --- a/trustgraph-flow/trustgraph/gateway/text_completion.py +++ b/trustgraph-flow/trustgraph/gateway/text_completion.py @@ -1,20 +1,23 @@ from .. schema import TextCompletionRequest, TextCompletionResponse -from .. schema import text_completion_request_queue -from .. schema import text_completion_response_queue from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor class TextCompletionRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout, auth): + def __init__( + self, pulsar_client, request_queue, response_queue, timeout, auth, + consumer, subscriber, + ): super(TextCompletionRequestor, self).__init__( pulsar_client=pulsar_client, - request_queue=text_completion_request_queue, - response_queue=text_completion_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=TextCompletionRequest, response_schema=TextCompletionResponse, + subscription = subscriber, + consumer_name = consumer, timeout=timeout, ) diff --git a/trustgraph-flow/trustgraph/gateway/triples_query.py b/trustgraph-flow/trustgraph/gateway/triples_query.py index 061bd4d8..3775b270 100644 --- a/trustgraph-flow/trustgraph/gateway/triples_query.py +++ b/trustgraph-flow/trustgraph/gateway/triples_query.py @@ -1,21 +1,24 @@ from .. schema import TriplesQueryRequest, TriplesQueryResponse, Triples -from .. schema import triples_request_queue -from .. schema import triples_response_queue from . endpoint import ServiceEndpoint from . requestor import ServiceRequestor from . serialize import to_value, serialize_subgraph class TriplesQueryRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout, auth): + def __init__( + self, pulsar_client, request_queue, response_queue, timeout, auth, + consumer, subscriber, + ): super(TriplesQueryRequestor, self).__init__( pulsar_client=pulsar_client, - request_queue=triples_request_queue, - response_queue=triples_response_queue, + request_queue=request_queue, + response_queue=response_queue, request_schema=TriplesQueryRequest, response_schema=TriplesQueryResponse, + subscription = subscriber, + consumer_name = consumer, timeout=timeout, )