diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/core_export.py b/trustgraph-flow/trustgraph/gateway/dispatch/core_export.py new file mode 100644 index 00000000..506a4d7a --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/core_export.py @@ -0,0 +1,97 @@ + +import asyncio +import json +import uuid +import msgpack +from . knowledge import KnowledgeRequestor + +class CoreExport: + + def __init__(self, pulsar_client): + self.pulsar_client = pulsar_client + + async def process(self, data, error, ok, params): + + id = params["id"] + user = params["user"] + + response = await ok() + + kr = KnowledgeRequestor( + pulsar_client = self.pulsar_client, + consumer = "api-gateway-core-export-" + str(uuid.uuid4()), + subscriber = "api-gateway-core-export-" + str(uuid.uuid4()), + ) + + try: + + await kr.start() + + async def responder(resp, fin): + + if "graph-embeddings" in resp: + + data = resp["graph-embeddings"] + + msg = ( + "ge", + { + "m": { + "i": data["metadata"]["id"], + "m": data["metadata"]["metadata"], + "u": data["metadata"]["user"], + "c": data["metadata"]["collection"], + }, + "e": [ + { + "e": ent["entity"], + "v": ent["vectors"], + } + for ent in data["entities"] + ] + } + ) + + enc = msgpack.packb(msg) + await response.write(enc) + + if "triples" in resp: + + data = resp["triples"] + msg = ( + "t", + { + "m": { + "i": data["metadata"]["id"], + "m": data["metadata"]["metadata"], + "u": data["metadata"]["user"], + "c": data["metadata"]["collection"], + }, + "t": data["triples"], + } + ) + + enc = msgpack.packb(msg) + await response.write(enc) + + await kr.process( + { + "operation": "get-kg-core", + "user": user, + "id": id, + }, + responder + ) + + except Exception as e: + + print("Exception:", e) + + finally: + + await kr.stop() + + await response.write_eof() + + return response + diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/core_import.py b/trustgraph-flow/trustgraph/gateway/dispatch/core_import.py new file mode 100644 index 00000000..8fbaa9fb --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/core_import.py @@ -0,0 +1,94 @@ + +import asyncio +import json +import uuid +import msgpack +from . knowledge import KnowledgeRequestor + +class CoreImport: + + def __init__(self, pulsar_client): + self.pulsar_client = pulsar_client + + async def process(self, data, error, ok, params): + + id = params["id"] + user = params["user"] + + kr = KnowledgeRequestor( + pulsar_client = self.pulsar_client, + consumer = "api-gateway-core-import-" + str(uuid.uuid4()), + subscriber = "api-gateway-core-import-" + str(uuid.uuid4()), + ) + + await kr.start() + + try: + + unpacker = msgpack.Unpacker() + + while True: + buf = await data.read(128*1024) + if not buf: break + + unpacker.feed(buf) + + for unpacked in unpacker: + + if unpacked[0] == "t": + msg = unpacked[1] + msg = { + "operation": "put-kg-core", + "user": user, + "id": id, + "triples": { + "metadata": { + "id": id, + "metadata": msg["m"]["m"], + "user": user, + "collection": "default", # Not used? + }, + "triples": msg["t"], + } + } + + await kr.process(msg) + + elif unpacked[0] == "ge": + msg = unpacked[1] + msg = { + "operation": "put-kg-core", + "user": user, + "id": id, + "graph-embeddings": { + "metadata": { + "id": id, + "metadata": msg["m"]["m"], + "user": user, + "collection": "default", # Not used? + }, + "entities": [ + { + "entity": ent["e"], + "vectors": ent["v"], + } + for ent in msg["e"] + ] + } + } + + await kr.process(msg) + + except Exception as e: + print("Exception:", e) + await error(str(e)) + + finally: + + await kr.stop() + + print("All done.") + response = await ok() + await response.write_eof() + + return response diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index 8223461a..05251a6c 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -1,6 +1,9 @@ import asyncio +from aiohttp import web import uuid +import json +import msgpack from . config import ConfigRequestor from . flow import FlowRequestor @@ -30,6 +33,9 @@ from . graph_embeddings_import import GraphEmbeddingsImport from . document_embeddings_import import DocumentEmbeddingsImport from . entity_contexts_import import EntityContextsImport +from . core_export import CoreExport +from . core_import import CoreImport + from . mux import Mux request_response_dispatchers = { @@ -98,6 +104,22 @@ class DispatcherManager: def dispatch_global_service(self): return DispatcherWrapper(self.process_global_service) + def dispatch_core_export(self): + return DispatcherWrapper(self.process_core_export) + + def dispatch_core_import(self): + return DispatcherWrapper(self.process_core_import) + + async def process_core_import(self, data, error, ok, params): + + ci = CoreImport(self.pulsar_client) + return await ci.process(data, error, ok, params) + + async def process_core_export(self, data, error, ok, params): + + ce = CoreExport(self.pulsar_client) + return await ce.process(data, error, ok, params) + async def process_global_service(self, data, responder, params): kind = params.get("kind") diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py index 75a39766..53c5aa3a 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py @@ -3,7 +3,7 @@ import asyncio from aiohttp import web -from . constant_endpoint import ConstantEndpoint +from . stream_endpoint import StreamEndpoint from . variable_endpoint import VariableEndpoint from . socket import SocketEndpoint from . metrics import MetricsEndpoint @@ -52,6 +52,16 @@ class EndpointManager: auth = auth, dispatcher = dispatcher_manager.dispatch_flow_export() ), + StreamEndpoint( + endpoint_path = "/api/v1/import-core/{user}/{id:.*}", + auth = auth, + dispatcher = dispatcher_manager.dispatch_core_import(), + ), + StreamEndpoint( + endpoint_path = "/api/v1/export-core/{user}/{id:.*}", + auth = auth, + dispatcher = dispatcher_manager.dispatch_core_export(), + ), ] def add_routes(self, app): diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/stream_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/stream_endpoint.py new file mode 100644 index 00000000..733ad08e --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/endpoint/stream_endpoint.py @@ -0,0 +1,72 @@ + +import asyncio +from aiohttp import web +import logging + +logger = logging.getLogger("endpoint") +logger.setLevel(logging.INFO) + +class StreamEndpoint: + + def __init__(self, endpoint_path, auth, dispatcher): + + self.path = endpoint_path + + self.auth = auth + self.operation = "service" + + self.dispatcher = dispatcher + + async def start(self): + pass + + def add_routes(self, app): + + app.add_routes([ + web.post(self.path, self.handle), + ]) + + async def handle(self, request): + + print(request.path, "...") + + try: + ht = request.headers["Authorization"] + tokens = ht.split(" ", 2) + if tokens[0] != "Bearer": + return web.HTTPUnauthorized() + token = tokens[1] + except: + token = "" + + if not self.auth.permitted(token, self.operation): + return web.HTTPUnauthorized() + + try: + + data = request.content + + async def error(err): + return web.HTTPInternalServerError(text = err) + + async def ok(status=200, reason="OK", type="application/octet-stream"): + response = web.StreamResponse( + status = status, reason = reason, + headers = {"Content-Type": type} + ) + await response.prepare(request) + return response + + resp = await self.dispatcher.process( + data, error, ok, request.match_info + ) + + return resp + + except Exception as e: + logging.error(f"Exception: {e}") + + return web.json_response( + { "error": str(e) } + ) + diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py b/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py index 4a131d9d..ae0ae8fb 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/variable_endpoint.py @@ -1,7 +1,6 @@ import asyncio from aiohttp import web -import uuid import logging logger = logging.getLogger("endpoint")