Core stream in/out (#404)

This commit is contained in:
cybermaggedon 2025-05-29 16:33:21 +01:00 committed by GitHub
parent 1ed228d8a3
commit 6f964e478e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 296 additions and 2 deletions

View file

@ -0,0 +1,97 @@
import asyncio
import json
import uuid
import msgpack
from . knowledge import KnowledgeRequestor
class CoreExport:
def __init__(self, pulsar_client):
self.pulsar_client = pulsar_client
async def process(self, data, error, ok, params):
id = params["id"]
user = params["user"]
response = await ok()
kr = KnowledgeRequestor(
pulsar_client = self.pulsar_client,
consumer = "api-gateway-core-export-" + str(uuid.uuid4()),
subscriber = "api-gateway-core-export-" + str(uuid.uuid4()),
)
try:
await kr.start()
async def responder(resp, fin):
if "graph-embeddings" in resp:
data = resp["graph-embeddings"]
msg = (
"ge",
{
"m": {
"i": data["metadata"]["id"],
"m": data["metadata"]["metadata"],
"u": data["metadata"]["user"],
"c": data["metadata"]["collection"],
},
"e": [
{
"e": ent["entity"],
"v": ent["vectors"],
}
for ent in data["entities"]
]
}
)
enc = msgpack.packb(msg)
await response.write(enc)
if "triples" in resp:
data = resp["triples"]
msg = (
"t",
{
"m": {
"i": data["metadata"]["id"],
"m": data["metadata"]["metadata"],
"u": data["metadata"]["user"],
"c": data["metadata"]["collection"],
},
"t": data["triples"],
}
)
enc = msgpack.packb(msg)
await response.write(enc)
await kr.process(
{
"operation": "get-kg-core",
"user": user,
"id": id,
},
responder
)
except Exception as e:
print("Exception:", e)
finally:
await kr.stop()
await response.write_eof()
return response

View file

@ -0,0 +1,94 @@
import asyncio
import json
import uuid
import msgpack
from . knowledge import KnowledgeRequestor
class CoreImport:
def __init__(self, pulsar_client):
self.pulsar_client = pulsar_client
async def process(self, data, error, ok, params):
id = params["id"]
user = params["user"]
kr = KnowledgeRequestor(
pulsar_client = self.pulsar_client,
consumer = "api-gateway-core-import-" + str(uuid.uuid4()),
subscriber = "api-gateway-core-import-" + str(uuid.uuid4()),
)
await kr.start()
try:
unpacker = msgpack.Unpacker()
while True:
buf = await data.read(128*1024)
if not buf: break
unpacker.feed(buf)
for unpacked in unpacker:
if unpacked[0] == "t":
msg = unpacked[1]
msg = {
"operation": "put-kg-core",
"user": user,
"id": id,
"triples": {
"metadata": {
"id": id,
"metadata": msg["m"]["m"],
"user": user,
"collection": "default", # Not used?
},
"triples": msg["t"],
}
}
await kr.process(msg)
elif unpacked[0] == "ge":
msg = unpacked[1]
msg = {
"operation": "put-kg-core",
"user": user,
"id": id,
"graph-embeddings": {
"metadata": {
"id": id,
"metadata": msg["m"]["m"],
"user": user,
"collection": "default", # Not used?
},
"entities": [
{
"entity": ent["e"],
"vectors": ent["v"],
}
for ent in msg["e"]
]
}
}
await kr.process(msg)
except Exception as e:
print("Exception:", e)
await error(str(e))
finally:
await kr.stop()
print("All done.")
response = await ok()
await response.write_eof()
return response

View file

@ -1,6 +1,9 @@
import asyncio
from aiohttp import web
import uuid
import json
import msgpack
from . config import ConfigRequestor
from . flow import FlowRequestor
@ -30,6 +33,9 @@ from . graph_embeddings_import import GraphEmbeddingsImport
from . document_embeddings_import import DocumentEmbeddingsImport
from . entity_contexts_import import EntityContextsImport
from . core_export import CoreExport
from . core_import import CoreImport
from . mux import Mux
request_response_dispatchers = {
@ -98,6 +104,22 @@ class DispatcherManager:
def dispatch_global_service(self):
return DispatcherWrapper(self.process_global_service)
def dispatch_core_export(self):
return DispatcherWrapper(self.process_core_export)
def dispatch_core_import(self):
return DispatcherWrapper(self.process_core_import)
async def process_core_import(self, data, error, ok, params):
ci = CoreImport(self.pulsar_client)
return await ci.process(data, error, ok, params)
async def process_core_export(self, data, error, ok, params):
ce = CoreExport(self.pulsar_client)
return await ce.process(data, error, ok, params)
async def process_global_service(self, data, responder, params):
kind = params.get("kind")

View file

@ -3,7 +3,7 @@ import asyncio
from aiohttp import web
from . constant_endpoint import ConstantEndpoint
from . stream_endpoint import StreamEndpoint
from . variable_endpoint import VariableEndpoint
from . socket import SocketEndpoint
from . metrics import MetricsEndpoint
@ -52,6 +52,16 @@ class EndpointManager:
auth = auth,
dispatcher = dispatcher_manager.dispatch_flow_export()
),
StreamEndpoint(
endpoint_path = "/api/v1/import-core/{user}/{id:.*}",
auth = auth,
dispatcher = dispatcher_manager.dispatch_core_import(),
),
StreamEndpoint(
endpoint_path = "/api/v1/export-core/{user}/{id:.*}",
auth = auth,
dispatcher = dispatcher_manager.dispatch_core_export(),
),
]
def add_routes(self, app):

View file

@ -0,0 +1,72 @@
import asyncio
from aiohttp import web
import logging
logger = logging.getLogger("endpoint")
logger.setLevel(logging.INFO)
class StreamEndpoint:
def __init__(self, endpoint_path, auth, dispatcher):
self.path = endpoint_path
self.auth = auth
self.operation = "service"
self.dispatcher = dispatcher
async def start(self):
pass
def add_routes(self, app):
app.add_routes([
web.post(self.path, self.handle),
])
async def handle(self, request):
print(request.path, "...")
try:
ht = request.headers["Authorization"]
tokens = ht.split(" ", 2)
if tokens[0] != "Bearer":
return web.HTTPUnauthorized()
token = tokens[1]
except:
token = ""
if not self.auth.permitted(token, self.operation):
return web.HTTPUnauthorized()
try:
data = request.content
async def error(err):
return web.HTTPInternalServerError(text = err)
async def ok(status=200, reason="OK", type="application/octet-stream"):
response = web.StreamResponse(
status = status, reason = reason,
headers = {"Content-Type": type}
)
await response.prepare(request)
return response
resp = await self.dispatcher.process(
data, error, ok, request.match_info
)
return resp
except Exception as e:
logging.error(f"Exception: {e}")
return web.json_response(
{ "error": str(e) }
)

View file

@ -1,7 +1,6 @@
import asyncio
from aiohttp import web
import uuid
import logging
logger = logging.getLogger("endpoint")