tg-put-kg-core command (#369)

This commit is contained in:
cybermaggedon 2025-05-07 11:13:21 +01:00 committed by GitHub
parent 8080b54328
commit f7123ac57f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 298 additions and 22 deletions

View file

@ -7,7 +7,7 @@ from . metadata import Metadata
from . documents import Document, TextDocument
from . graph import Triples, GraphEmbeddings
# fetch-kg-core
# get-kg-core
# -> (???)
# <- ()
# <- (error)
@ -24,15 +24,19 @@ from . graph import Triples, GraphEmbeddings
class KnowledgeRequest(Record):
# fetch-kg-core, delete-kg-core, list-kg-cores
# get-kg-core, delete-kg-core, list-kg-cores, put-kg-core
operation = String()
# list-kg-cores, delete-kg-core
# list-kg-cores, delete-kg-core, put-kg-core
user = String()
# fetch-kg-core, list-kg-cores, delete-kg-core
# get-kg-core, list-kg-cores, delete-kg-core, put-kg-core
id = String()
# put-kg-core
triples = Triples()
graph_embeddings = GraphEmbeddings()
class KnowledgeResponse(Record):
error = Error()
ids = Array(String())

View file

@ -1,7 +1,8 @@
#!/usr/bin/env python3
"""
Uses the agent service to answer a question
Uses the knowledge service to fetch a knowledge core which is saved
to a local file in msgpack format.
"""
import argparse
@ -16,6 +17,42 @@ import msgpack
default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/')
default_user = 'trustgraph'
def write_triple(f, data):
msg = (
"t",
{
"m": {
"i": data["metadata"]["id"],
"m": data["metadata"]["metadata"],
"u": data["metadata"]["user"],
"c": data["metadata"]["collection"],
},
"t": data["triples"],
}
)
f.write(msgpack.packb(msg, use_bin_type=True))
def write_ge(f, data):
msg = (
"ge",
{
"m": {
"i": data["metadata"]["id"],
"m": data["metadata"]["metadata"],
"u": data["metadata"]["user"],
"c": data["metadata"]["collection"],
},
"e": [
{
"e": ent["entity"],
"v": ent["vectors"],
}
for ent in data["entities"]
]
}
)
f.write(msgpack.packb(msg, use_bin_type=True))
async def fetch(url, user, id, output):
if not url.endswith("/"):
@ -61,21 +98,20 @@ async def fetch(url, user, id, output):
if "triples" in obj["response"]:
t += 1
msg = obj["response"]["triples"]
f.write(msgpack.packb(msg, use_bin_type=True))
write_triple(f, obj["response"]["triples"])
if "graph-embeddings" in obj["response"]:
ge += 1
msg = obj["response"]["graph-embeddings"]
f.write(msgpack.packb(msg, use_bin_type=True))
write_ge(f, obj["response"]["graph-embeddings"])
print(f"Got: {t} triple, {ge} GE messages.")
print(f"Wrote: {t} triple, {ge} GE messages.")
await ws.close()
def main():
parser = argparse.ArgumentParser(
prog='tg-invoke-agent',
prog='tg-get-kg-core',
description=__doc__,
)

View file

@ -0,0 +1,183 @@
#!/usr/bin/env python3
"""
Uses the agent service to answer a question
"""
import argparse
import os
import textwrap
import uuid
import asyncio
import json
from websockets.asyncio.client import connect
import msgpack
default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/')
default_user = 'trustgraph'
def read_message(unpacked, id, user):
if unpacked[0] == "ge":
msg = unpacked[1]
return "ge", {
"metadata": {
"id": id,
"metadata": msg["m"]["m"],
"user": user,
"collection": "default", # Not used?
},
"entities": [
{
"entity": ent["e"],
"vectors": ent["v"],
}
for ent in msg["e"]
],
}
elif unpacked[0] == "t":
msg = unpacked[1]
return "t", {
"metadata": {
"id": id,
"metadata": msg["m"]["m"],
"user": user,
"collection": "default", # Not used by receiver?
},
"triples": msg["t"],
}
else:
raise RuntimeError("Unpacked unexpected messsage type", unpacked[0])
async def put(url, user, id, input):
if not url.endswith("/"):
url += "/"
url = url + "api/v1/socket"
async with connect(url) as ws:
ge = 0
t = 0
with open(input, "rb") as f:
unpacker = msgpack.Unpacker(f, raw=False)
while True:
try:
unpacked = unpacker.unpack()
except:
break
kind, msg = read_message(unpacked, id, user)
mid = str(uuid.uuid4())
if kind == "ge":
ge += 1
req = json.dumps({
"id": mid,
"service": "knowledge",
"request": {
"operation": "put-kg-core",
"user": user,
"id": id,
"graph-embeddings": msg
}
})
elif kind == "t":
t += 1
req = json.dumps({
"id": mid,
"service": "knowledge",
"request": {
"operation": "put-kg-core",
"user": user,
"id": id,
"triples": msg
}
})
else:
raise RuntimeError("Unexpected message kind", kind)
await ws.send(req)
# Retry loop, wait for right response to come back
while True:
msg = await ws.recv()
msg = json.loads(msg)
if msg["id"] != mid:
continue
if "response" in msg:
if "error" in msg["response"]:
raise RuntimeError(msg["response"]["error"])
break
print(f"Put: {t} triple, {ge} GE messages.")
await ws.close()
def main():
parser = argparse.ArgumentParser(
prog='tg-put-kg-core',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'--id', '--identifier',
required=True,
help=f'Knowledge core ID',
)
parser.add_argument(
'-i', '--input',
required=True,
help=f'Input file'
)
args = parser.parse_args()
try:
asyncio.run(
put(
url = args.url,
user = args.user,
id = args.id,
input = args.input,
)
)
except Exception as e:
print("Exception:", e, flush=True)
main()

View file

@ -48,8 +48,8 @@ setuptools.setup(
"scripts/tg-delete-flow-class",
"scripts/tg-delete-kg-core",
"scripts/tg-dump-msgpack",
"scripts/tg-fetch-kg-core",
"scripts/tg-get-flow-class",
"scripts/tg-get-kg-core",
"scripts/tg-graph-to-turtle",
"scripts/tg-init-trustgraph",
"scripts/tg-invoke-agent",
@ -64,6 +64,7 @@ setuptools.setup(
"scripts/tg-load-text",
"scripts/tg-load-turtle",
"scripts/tg-put-flow-class",
"scripts/tg-put-kg-core",
"scripts/tg-remove-library-document",
"scripts/tg-save-doc-embeds",
"scripts/tg-save-kg-core",

View file

@ -36,9 +36,9 @@ class KnowledgeManager:
)
)
async def fetch_kg_core(self, request, respond):
async def get_kg_core(self, request, respond):
print("Fetch core...", flush=True)
print("Get core...", flush=True)
async def publish_triples(t):
await respond(
@ -76,7 +76,7 @@ class KnowledgeManager:
publish_ge,
)
print("Fetch complete", flush=True)
print("Get complete", flush=True)
await respond(
KnowledgeResponse(
@ -102,3 +102,23 @@ class KnowledgeManager:
)
)
async def put_kg_core(self, request, respond):
if request.triples:
await self.table_store.add_triples(request.triples)
if request.graph_embeddings:
await self.table_store.add_graph_embeddings(
request.graph_embeddings
)
await respond(
KnowledgeResponse(
error = None,
ids = None,
eos = False,
triples = None,
graph_embeddings = None
)
)

View file

@ -125,8 +125,9 @@ class Processor(AsyncProcessor):
impls = {
"list-kg-cores": self.knowledge.list_kg_cores,
"fetch-kg-core": self.knowledge.fetch_kg_core,
"get-kg-core": self.knowledge.get_kg_core,
"delete-kg-core": self.knowledge.delete_kg_core,
"put-kg-core": self.knowledge.put_kg_core,
}
if v.operation not in impls:
@ -150,12 +151,10 @@ class Processor(AsyncProcessor):
try:
# We don't send a response back here, the processing
# implementation sends whatever it needs to send.
await self.process_request(v, id)
# await self.knowledge_response_producer.send(
# resp, properties={"id": id}
# )
return
except RequestError as e:

View file

@ -1,13 +1,14 @@
import base64
from ... schema import KnowledgeRequest, KnowledgeResponse
from ... schema import KnowledgeRequest, KnowledgeResponse, Triples
from ... schema import GraphEmbeddings, Metadata, EntityEmbeddings
from ... schema import knowledge_request_queue
from ... schema import knowledge_response_queue
from . requestor import ServiceRequestor
from . serialize import serialize_graph_embeddings
from . serialize import serialize_triples
from . serialize import serialize_triples, to_subgraph, to_value
from . serialize import to_document_metadata, to_processing_metadata
class KnowledgeRequestor(ServiceRequestor):
@ -26,10 +27,42 @@ class KnowledgeRequestor(ServiceRequestor):
def to_request(self, body):
if "triples" in body:
triples = Triples(
metadata=Metadata(
id = body["triples"]["metadata"]["id"],
metadata = to_subgraph(body["triples"]["metadata"]["metadata"]),
user = body["triples"]["metadata"]["user"],
),
triples = to_subgraph(body["triples"]["triples"]),
)
else:
triples = None
if "graph-embeddings" in body:
ge = GraphEmbeddings(
metadata = Metadata(
id = body["graph-embeddings"]["metadata"]["id"],
metadata = to_subgraph(body["graph-embeddings"]["metadata"]["metadata"]),
user = body["graph-embeddings"]["metadata"]["user"],
),
entities=[
EntityEmbeddings(
entity = to_value(ent["entity"]),
vectors = ent["vectors"],
)
for ent in body["graph-embeddings"]["entities"]
]
)
else:
ge = None
return KnowledgeRequest(
operation = body.get("operation", None),
user = body.get("user", None),
id = body.get("id", None),
triples = triples,
graph_embeddings = ge,
)
def from_response(self, message):