diff --git a/trustgraph-base/trustgraph/schema/knowledge.py b/trustgraph-base/trustgraph/schema/knowledge.py index 88892f8c..9eb7d8b6 100644 --- a/trustgraph-base/trustgraph/schema/knowledge.py +++ b/trustgraph-base/trustgraph/schema/knowledge.py @@ -7,7 +7,7 @@ from . metadata import Metadata from . documents import Document, TextDocument from . graph import Triples, GraphEmbeddings -# fetch-kg-core +# get-kg-core # -> (???) # <- () # <- (error) @@ -24,15 +24,19 @@ from . graph import Triples, GraphEmbeddings class KnowledgeRequest(Record): - # fetch-kg-core, delete-kg-core, list-kg-cores + # get-kg-core, delete-kg-core, list-kg-cores, put-kg-core operation = String() - # list-kg-cores, delete-kg-core + # list-kg-cores, delete-kg-core, put-kg-core user = String() - # fetch-kg-core, list-kg-cores, delete-kg-core + # get-kg-core, list-kg-cores, delete-kg-core, put-kg-core id = String() + # put-kg-core + triples = Triples() + graph_embeddings = GraphEmbeddings() + class KnowledgeResponse(Record): error = Error() ids = Array(String()) diff --git a/trustgraph-cli/scripts/tg-fetch-kg-core b/trustgraph-cli/scripts/tg-get-kg-core similarity index 64% rename from trustgraph-cli/scripts/tg-fetch-kg-core rename to trustgraph-cli/scripts/tg-get-kg-core index 44045b5b..41ba8533 100755 --- a/trustgraph-cli/scripts/tg-fetch-kg-core +++ b/trustgraph-cli/scripts/tg-get-kg-core @@ -1,7 +1,8 @@ #!/usr/bin/env python3 """ -Uses the agent service to answer a question +Uses the knowledge service to fetch a knowledge core which is saved +to a local file in msgpack format. """ import argparse @@ -16,6 +17,42 @@ import msgpack default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/') default_user = 'trustgraph' +def write_triple(f, data): + msg = ( + "t", + { + "m": { + "i": data["metadata"]["id"], + "m": data["metadata"]["metadata"], + "u": data["metadata"]["user"], + "c": data["metadata"]["collection"], + }, + "t": data["triples"], + } + ) + f.write(msgpack.packb(msg, use_bin_type=True)) + +def write_ge(f, data): + msg = ( + "ge", + { + "m": { + "i": data["metadata"]["id"], + "m": data["metadata"]["metadata"], + "u": data["metadata"]["user"], + "c": data["metadata"]["collection"], + }, + "e": [ + { + "e": ent["entity"], + "v": ent["vectors"], + } + for ent in data["entities"] + ] + } + ) + f.write(msgpack.packb(msg, use_bin_type=True)) + async def fetch(url, user, id, output): if not url.endswith("/"): @@ -61,21 +98,20 @@ async def fetch(url, user, id, output): if "triples" in obj["response"]: t += 1 - msg = obj["response"]["triples"] - f.write(msgpack.packb(msg, use_bin_type=True)) + write_triple(f, obj["response"]["triples"]) if "graph-embeddings" in obj["response"]: ge += 1 - msg = obj["response"]["graph-embeddings"] - f.write(msgpack.packb(msg, use_bin_type=True)) + write_ge(f, obj["response"]["graph-embeddings"]) + + print(f"Got: {t} triple, {ge} GE messages.") - print(f"Wrote: {t} triple, {ge} GE messages.") await ws.close() def main(): parser = argparse.ArgumentParser( - prog='tg-invoke-agent', + prog='tg-get-kg-core', description=__doc__, ) diff --git a/trustgraph-cli/scripts/tg-put-kg-core b/trustgraph-cli/scripts/tg-put-kg-core new file mode 100755 index 00000000..1184d6f7 --- /dev/null +++ b/trustgraph-cli/scripts/tg-put-kg-core @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 + +""" +Uses the agent service to answer a question +""" + +import argparse +import os +import textwrap +import uuid +import asyncio +import json +from websockets.asyncio.client import connect +import msgpack + +default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/') +default_user = 'trustgraph' + +def read_message(unpacked, id, user): + + if unpacked[0] == "ge": + msg = unpacked[1] + return "ge", { + "metadata": { + "id": id, + "metadata": msg["m"]["m"], + "user": user, + "collection": "default", # Not used? + }, + "entities": [ + { + "entity": ent["e"], + "vectors": ent["v"], + } + for ent in msg["e"] + ], + } + elif unpacked[0] == "t": + msg = unpacked[1] + return "t", { + "metadata": { + "id": id, + "metadata": msg["m"]["m"], + "user": user, + "collection": "default", # Not used by receiver? + }, + "triples": msg["t"], + } + else: + raise RuntimeError("Unpacked unexpected messsage type", unpacked[0]) + +async def put(url, user, id, input): + + if not url.endswith("/"): + url += "/" + + url = url + "api/v1/socket" + + async with connect(url) as ws: + + + ge = 0 + t = 0 + + with open(input, "rb") as f: + + unpacker = msgpack.Unpacker(f, raw=False) + + while True: + + try: + unpacked = unpacker.unpack() + except: + break + + kind, msg = read_message(unpacked, id, user) + + mid = str(uuid.uuid4()) + + if kind == "ge": + + ge += 1 + + req = json.dumps({ + "id": mid, + "service": "knowledge", + "request": { + "operation": "put-kg-core", + "user": user, + "id": id, + "graph-embeddings": msg + } + }) + + elif kind == "t": + + t += 1 + + req = json.dumps({ + "id": mid, + "service": "knowledge", + "request": { + "operation": "put-kg-core", + "user": user, + "id": id, + "triples": msg + } + }) + + else: + + raise RuntimeError("Unexpected message kind", kind) + + await ws.send(req) + + # Retry loop, wait for right response to come back + while True: + + msg = await ws.recv() + msg = json.loads(msg) + + if msg["id"] != mid: + continue + + if "response" in msg: + if "error" in msg["response"]: + raise RuntimeError(msg["response"]["error"]) + + break + + print(f"Put: {t} triple, {ge} GE messages.") + + await ws.close() + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-put-kg-core', + description=__doc__, + ) + + parser.add_argument( + '-u', '--url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + parser.add_argument( + '-U', '--user', + default=default_user, + help=f'User ID (default: {default_user})' + ) + + parser.add_argument( + '--id', '--identifier', + required=True, + help=f'Knowledge core ID', + ) + + parser.add_argument( + '-i', '--input', + required=True, + help=f'Input file' + ) + + args = parser.parse_args() + + try: + + asyncio.run( + put( + url = args.url, + user = args.user, + id = args.id, + input = args.input, + ) + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/setup.py b/trustgraph-cli/setup.py index 03224ce5..84bcc88a 100644 --- a/trustgraph-cli/setup.py +++ b/trustgraph-cli/setup.py @@ -48,8 +48,8 @@ setuptools.setup( "scripts/tg-delete-flow-class", "scripts/tg-delete-kg-core", "scripts/tg-dump-msgpack", - "scripts/tg-fetch-kg-core", "scripts/tg-get-flow-class", + "scripts/tg-get-kg-core", "scripts/tg-graph-to-turtle", "scripts/tg-init-trustgraph", "scripts/tg-invoke-agent", @@ -64,6 +64,7 @@ setuptools.setup( "scripts/tg-load-text", "scripts/tg-load-turtle", "scripts/tg-put-flow-class", + "scripts/tg-put-kg-core", "scripts/tg-remove-library-document", "scripts/tg-save-doc-embeds", "scripts/tg-save-kg-core", diff --git a/trustgraph-flow/trustgraph/cores/knowledge.py b/trustgraph-flow/trustgraph/cores/knowledge.py index adf9b429..34ffa542 100644 --- a/trustgraph-flow/trustgraph/cores/knowledge.py +++ b/trustgraph-flow/trustgraph/cores/knowledge.py @@ -36,9 +36,9 @@ class KnowledgeManager: ) ) - async def fetch_kg_core(self, request, respond): + async def get_kg_core(self, request, respond): - print("Fetch core...", flush=True) + print("Get core...", flush=True) async def publish_triples(t): await respond( @@ -76,7 +76,7 @@ class KnowledgeManager: publish_ge, ) - print("Fetch complete", flush=True) + print("Get complete", flush=True) await respond( KnowledgeResponse( @@ -102,3 +102,23 @@ class KnowledgeManager: ) ) + async def put_kg_core(self, request, respond): + + if request.triples: + await self.table_store.add_triples(request.triples) + + if request.graph_embeddings: + await self.table_store.add_graph_embeddings( + request.graph_embeddings + ) + + await respond( + KnowledgeResponse( + error = None, + ids = None, + eos = False, + triples = None, + graph_embeddings = None + ) + ) + diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index 93e84d1e..8eccb202 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -125,8 +125,9 @@ class Processor(AsyncProcessor): impls = { "list-kg-cores": self.knowledge.list_kg_cores, - "fetch-kg-core": self.knowledge.fetch_kg_core, + "get-kg-core": self.knowledge.get_kg_core, "delete-kg-core": self.knowledge.delete_kg_core, + "put-kg-core": self.knowledge.put_kg_core, } if v.operation not in impls: @@ -150,12 +151,10 @@ class Processor(AsyncProcessor): try: + # We don't send a response back here, the processing + # implementation sends whatever it needs to send. await self.process_request(v, id) -# await self.knowledge_response_producer.send( -# resp, properties={"id": id} -# ) - return except RequestError as e: diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py index 2e1ae43a..01043a80 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py @@ -1,13 +1,14 @@ import base64 -from ... schema import KnowledgeRequest, KnowledgeResponse +from ... schema import KnowledgeRequest, KnowledgeResponse, Triples +from ... schema import GraphEmbeddings, Metadata, EntityEmbeddings from ... schema import knowledge_request_queue from ... schema import knowledge_response_queue from . requestor import ServiceRequestor from . serialize import serialize_graph_embeddings -from . serialize import serialize_triples +from . serialize import serialize_triples, to_subgraph, to_value from . serialize import to_document_metadata, to_processing_metadata class KnowledgeRequestor(ServiceRequestor): @@ -26,10 +27,42 @@ class KnowledgeRequestor(ServiceRequestor): def to_request(self, body): + if "triples" in body: + triples = Triples( + metadata=Metadata( + id = body["triples"]["metadata"]["id"], + metadata = to_subgraph(body["triples"]["metadata"]["metadata"]), + user = body["triples"]["metadata"]["user"], + ), + triples = to_subgraph(body["triples"]["triples"]), + ) + else: + triples = None + + if "graph-embeddings" in body: + ge = GraphEmbeddings( + metadata = Metadata( + id = body["graph-embeddings"]["metadata"]["id"], + metadata = to_subgraph(body["graph-embeddings"]["metadata"]["metadata"]), + user = body["graph-embeddings"]["metadata"]["user"], + ), + entities=[ + EntityEmbeddings( + entity = to_value(ent["entity"]), + vectors = ent["vectors"], + ) + for ent in body["graph-embeddings"]["entities"] + ] + ) + else: + ge = None + return KnowledgeRequest( operation = body.get("operation", None), user = body.get("user", None), id = body.get("id", None), + triples = triples, + graph_embeddings = ge, ) def from_response(self, message):