mirror of
https://github.com/trustgraph-ai/trustgraph.git
synced 2026-04-26 08:56:21 +02:00
Feature/rework kg core (#171)
* Knowledge cores with msgpack * Put it in the cli package * Tidy up msgpack dumper * Created a loader
This commit is contained in:
parent
319f9ac04a
commit
340d7a224f
6 changed files with 700 additions and 4 deletions
34
trustgraph-cli/scripts/tg-dump-msgpack
Executable file
34
trustgraph-cli/scripts/tg-dump-msgpack
Executable file
|
|
@ -0,0 +1,34 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import msgpack
|
||||
import sys
|
||||
import argparse
|
||||
|
||||
def run(input_file):
|
||||
|
||||
with open(input_file, 'rb') as f:
|
||||
|
||||
unpacker = msgpack.Unpacker(f, raw=False)
|
||||
|
||||
for unpacked in unpacker:
|
||||
print(unpacked)
|
||||
|
||||
def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='tg-load-pdf',
|
||||
description=__doc__,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-i', '--input-file',
|
||||
required=True,
|
||||
help=f'Input file'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
run(**vars(args))
|
||||
|
||||
main()
|
||||
|
||||
179
trustgraph-cli/scripts/tg-load-kg-core
Executable file
179
trustgraph-cli/scripts/tg-load-kg-core
Executable file
|
|
@ -0,0 +1,179 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import msgpack
|
||||
import json
|
||||
import sys
|
||||
import argparse
|
||||
import os
|
||||
|
||||
async def load_ge(queue, url):
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
|
||||
async with session.ws_connect(f"{url}load/graph-embeddings") as ws:
|
||||
|
||||
while True:
|
||||
|
||||
msg = await queue.get()
|
||||
|
||||
msg = {
|
||||
"metadata": {
|
||||
"id": msg["m"]["i"],
|
||||
"metadata": msg["m"]["m"],
|
||||
"user": msg["m"]["u"],
|
||||
"collection": msg["m"]["c"],
|
||||
},
|
||||
"vectors": msg["v"],
|
||||
"entity": msg["e"],
|
||||
}
|
||||
|
||||
await ws.send_json(msg)
|
||||
|
||||
async def load_triples(queue, url):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.ws_connect(f"{url}load/triples") as ws:
|
||||
|
||||
while True:
|
||||
|
||||
msg = await queue.get()
|
||||
|
||||
msg ={
|
||||
"metadata": {
|
||||
"id": msg["m"]["i"],
|
||||
"metadata": msg["m"]["m"],
|
||||
"user": msg["m"]["u"],
|
||||
"collection": msg["m"]["c"],
|
||||
},
|
||||
"triples": msg["t"],
|
||||
}
|
||||
|
||||
await ws.send_json(msg)
|
||||
|
||||
ge_counts = 0
|
||||
t_counts = 0
|
||||
|
||||
async def stats():
|
||||
|
||||
global t_counts
|
||||
global ge_counts
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(5)
|
||||
print(
|
||||
f"Graph embeddings: {ge_counts:10d} Triples: {t_counts:10d}"
|
||||
)
|
||||
|
||||
async def loader(ge_queue, t_queue, path, format, user, collection):
|
||||
|
||||
global t_counts
|
||||
global ge_counts
|
||||
|
||||
if format == "json":
|
||||
|
||||
raise RuntimeError("Not implemented")
|
||||
|
||||
else:
|
||||
|
||||
with open(path, "rb") as f:
|
||||
|
||||
unpacker = msgpack.Unpacker(f, raw=False)
|
||||
|
||||
for unpacked in unpacker:
|
||||
|
||||
if user:
|
||||
unpacked["metadata"]["user"] = user
|
||||
|
||||
if collection:
|
||||
unpacked["metadata"]["collection"] = collection
|
||||
|
||||
|
||||
if unpacked[0] == "t":
|
||||
await t_queue.put(unpacked[1])
|
||||
t_counts += 1
|
||||
else:
|
||||
if unpacked[0] == "ge":
|
||||
await ge_queue.put(unpacked[1])
|
||||
ge_counts += 1
|
||||
|
||||
async def run(**args):
|
||||
|
||||
ge_q = asyncio.Queue()
|
||||
t_q = asyncio.Queue()
|
||||
|
||||
load_task = asyncio.create_task(
|
||||
loader(
|
||||
ge_queue=ge_q, t_queue=t_q,
|
||||
path=args["input_file"], format=args["format"],
|
||||
user=args["user"], collection=args["collection"],
|
||||
)
|
||||
|
||||
)
|
||||
|
||||
ge_task = asyncio.create_task(
|
||||
load_ge(
|
||||
queue=ge_q, url=args["url"] + "api/v1/"
|
||||
)
|
||||
)
|
||||
|
||||
triples_task = asyncio.create_task(
|
||||
load_triples(
|
||||
queue=t_q, url=args["url"] + "api/v1/"
|
||||
)
|
||||
)
|
||||
|
||||
stats_task = asyncio.create_task(stats())
|
||||
|
||||
await load_task
|
||||
await triples_task
|
||||
await ge_task
|
||||
await stats_task
|
||||
|
||||
async def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='tg-load-pdf',
|
||||
description=__doc__,
|
||||
)
|
||||
|
||||
default_url = os.getenv("TRUSTGRAPH_API", "http://localhost:8088/")
|
||||
default_user = "trustgraph"
|
||||
collection = "default"
|
||||
|
||||
parser.add_argument(
|
||||
'-u', '--url',
|
||||
default=default_url,
|
||||
help=f'TrustGraph API URL (default: {default_url})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-i', '--input-file',
|
||||
# Make it mandatory, difficult to over-write an existing file
|
||||
required=True,
|
||||
help=f'Output file'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--format',
|
||||
default="msgpack",
|
||||
choices=["msgpack", "json"],
|
||||
help=f'Output format (default: msgpack)',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--user',
|
||||
help=f'User ID to load as (default: from input)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--collection',
|
||||
help=f'Collection ID to load as (default: from input)'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
await run(**vars(args))
|
||||
|
||||
asyncio.run(main())
|
||||
|
||||
190
trustgraph-cli/scripts/tg-save-kg-core
Executable file
190
trustgraph-cli/scripts/tg-save-kg-core
Executable file
|
|
@ -0,0 +1,190 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import msgpack
|
||||
import json
|
||||
import sys
|
||||
import argparse
|
||||
import os
|
||||
|
||||
async def fetch_ge(queue, user, collection, url):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.ws_connect(f"{url}stream/graph-embeddings") as ws:
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
|
||||
data = msg.json()
|
||||
|
||||
if user:
|
||||
if data["metadata"]["user"] != user:
|
||||
continue
|
||||
|
||||
if collection:
|
||||
if data["metadata"]["collection"] != collection:
|
||||
continue
|
||||
|
||||
await queue.put([
|
||||
"ge",
|
||||
{
|
||||
"m": {
|
||||
"i": data["metadata"]["id"],
|
||||
"m": data["metadata"]["metadata"],
|
||||
"u": data["metadata"]["user"],
|
||||
"c": data["metadata"]["collection"],
|
||||
},
|
||||
"v": data["vectors"],
|
||||
"e": data["entity"],
|
||||
}
|
||||
])
|
||||
if msg.type == aiohttp.WSMsgType.ERROR:
|
||||
print("Error")
|
||||
break
|
||||
|
||||
async def fetch_triples(queue, user, collection, url):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.ws_connect(f"{url}stream/triples") as ws:
|
||||
async for msg in ws:
|
||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||
|
||||
data = msg.json()
|
||||
|
||||
if user:
|
||||
if data["metadata"]["user"] != user:
|
||||
continue
|
||||
|
||||
if collection:
|
||||
if data["metadata"]["collection"] != collection:
|
||||
continue
|
||||
|
||||
await queue.put((
|
||||
"t",
|
||||
{
|
||||
"m": {
|
||||
"i": data["metadata"]["id"],
|
||||
"m": data["metadata"]["metadata"],
|
||||
"u": data["metadata"]["user"],
|
||||
"c": data["metadata"]["collection"],
|
||||
},
|
||||
"t": data["triples"],
|
||||
}
|
||||
))
|
||||
if msg.type == aiohttp.WSMsgType.ERROR:
|
||||
print("Error")
|
||||
break
|
||||
|
||||
ge_counts = 0
|
||||
t_counts = 0
|
||||
|
||||
async def stats():
|
||||
|
||||
global t_counts
|
||||
global ge_counts
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(5)
|
||||
print(
|
||||
f"Graph embeddings: {ge_counts:10d} Triples: {t_counts:10d}"
|
||||
)
|
||||
|
||||
async def output(queue, path, format):
|
||||
|
||||
global t_counts
|
||||
global ge_counts
|
||||
|
||||
with open(path, "wb") as f:
|
||||
|
||||
while True:
|
||||
|
||||
msg = await queue.get()
|
||||
|
||||
if format == "msgpack":
|
||||
f.write(msgpack.packb(msg, use_bin_type=True))
|
||||
else:
|
||||
f.write(json.dumps(msg).encode("utf-8"))
|
||||
|
||||
if msg[0] == "t":
|
||||
t_counts += 1
|
||||
else:
|
||||
if msg[0] == "ge":
|
||||
ge_counts += 1
|
||||
|
||||
async def run(**args):
|
||||
|
||||
q = asyncio.Queue()
|
||||
|
||||
ge_task = asyncio.create_task(
|
||||
fetch_ge(
|
||||
queue=q, user=args["user"], collection=args["collection"],
|
||||
url=args["url"] + "api/v1/"
|
||||
)
|
||||
)
|
||||
|
||||
triples_task = asyncio.create_task(
|
||||
fetch_triples(
|
||||
queue=q, user=args["user"], collection=args["collection"],
|
||||
url=args["url"] + "api/v1/"
|
||||
)
|
||||
)
|
||||
|
||||
output_task = asyncio.create_task(
|
||||
output(
|
||||
queue=q, path=args["output_file"], format=args["format"],
|
||||
)
|
||||
|
||||
)
|
||||
|
||||
stats_task = asyncio.create_task(stats())
|
||||
|
||||
await output_task
|
||||
await triples_task
|
||||
await ge_task
|
||||
await stats_task
|
||||
|
||||
async def main():
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='tg-load-pdf',
|
||||
description=__doc__,
|
||||
)
|
||||
|
||||
default_url = os.getenv("TRUSTGRAPH_API", "http://localhost:8088/")
|
||||
default_user = "trustgraph"
|
||||
collection = "default"
|
||||
|
||||
parser.add_argument(
|
||||
'-u', '--url',
|
||||
default=default_url,
|
||||
help=f'TrustGraph API URL (default: {default_url})',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-o', '--output-file',
|
||||
# Make it mandatory, difficult to over-write an existing file
|
||||
required=True,
|
||||
help=f'Output file'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--format',
|
||||
default="msgpack",
|
||||
choices=["msgpack", "json"],
|
||||
help=f'Output format (default: msgpack)',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--user',
|
||||
help=f'User ID to filter on (default: no filter)'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--collection',
|
||||
help=f'Collection ID to filter on (default: no filter)'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
await run(**vars(args))
|
||||
|
||||
asyncio.run(main())
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue