diff --git a/test-api/test-llm-api b/test-api/test-llm-api index c33c6634..6bee2048 100755 --- a/test-api/test-llm-api +++ b/test-api/test-llm-api @@ -19,6 +19,9 @@ resp = requests.post( json=input, ) +if resp.status_code != 200: + raise RuntimeError(f"Status code: {resp.status_code}") + resp = resp.json() if "error" in resp: diff --git a/trustgraph-cli/scripts/tg-dump-msgpack b/trustgraph-cli/scripts/tg-dump-msgpack index 2be950db..18819649 100755 --- a/trustgraph-cli/scripts/tg-dump-msgpack +++ b/trustgraph-cli/scripts/tg-dump-msgpack @@ -10,7 +10,7 @@ import msgpack import sys import argparse -def run(input_file): +def dump(input_file, action): with open(input_file, 'rb') as f: @@ -19,6 +19,43 @@ def run(input_file): for unpacked in unpacker: print(unpacked) +def summary(input_file, action): + + vector_dim = None + + triples = set() + + max_records = 1000000 + + with open(input_file, 'rb') as f: + + unpacker = msgpack.Unpacker(f, raw=False) + + rec_count = 0 + + for msg in unpacker: + + if msg[0] == "ge": + vector_dim = len(msg[1]["v"][0]) + + if msg[0] == "t": + + for elt in msg[1]["m"]["m"]: + triples.add(( + elt["s"]["v"], + elt["p"]["v"], + elt["o"]["v"], + )) + + if rec_count > max_records: break + rec_count += 1 + + print("Vector dimension:", vector_dim) + + for t in triples: + if t[1] == "http://www.w3.org/2000/01/rdf-schema#label": + print("-", t[2]) + def main(): parser = argparse.ArgumentParser( @@ -32,9 +69,24 @@ def main(): help=f'Input file' ) + parser.add_argument( + '-s', '--summary', action="store_const", const="summary", + dest="action", + help=f'Show a summary' + ) + + parser.add_argument( + '-r', '--records', action="store_const", const="records", + dest="action", + help=f'Dump individual records' + ) + args = parser.parse_args() - run(**vars(args)) + if args.action == "summary": + summary(**vars(args)) + else: + dump(**vars(args)) main() diff --git a/trustgraph-cli/scripts/tg-load-kg-core b/trustgraph-cli/scripts/tg-load-kg-core index 4e207cf1..5c2ae140 100755 --- a/trustgraph-cli/scripts/tg-load-kg-core +++ b/trustgraph-cli/scripts/tg-load-kg-core @@ -12,16 +12,25 @@ import json import sys import argparse import os +import signal -async def load_ge(queue, url): +class Running: + def __init__(self): self.running = True + def get(self): return self.running + def stop(self): self.running = False + +async def load_ge(running, queue, url): async with aiohttp.ClientSession() as session: async with session.ws_connect(f"{url}load/graph-embeddings") as ws: - while True: + while running.get(): - msg = await queue.get() + try: + msg = await asyncio.wait_for(queue.get(), 1) + except TimeoutError: + continue msg = { "metadata": { @@ -36,13 +45,18 @@ async def load_ge(queue, url): await ws.send_json(msg) -async def load_triples(queue, url): +async def load_triples(running, queue, url): + async with aiohttp.ClientSession() as session: + async with session.ws_connect(f"{url}load/triples") as ws: - while True: + while running.get(): - msg = await queue.get() + try: + msg = await asyncio.wait_for(queue.get(), 1) + except TimeoutError: + continue msg ={ "metadata": { @@ -59,18 +73,18 @@ async def load_triples(queue, url): ge_counts = 0 t_counts = 0 -async def stats(): +async def stats(running): global t_counts global ge_counts - while True: - await asyncio.sleep(5) + while running.get(): + await asyncio.sleep(2) print( f"Graph embeddings: {ge_counts:10d} Triples: {t_counts:10d}" ) -async def loader(ge_queue, t_queue, path, format, user, collection): +async def loader(running, ge_queue, t_queue, path, format, user, collection): global t_counts global ge_counts @@ -85,7 +99,12 @@ async def loader(ge_queue, t_queue, path, format, user, collection): unpacker = msgpack.Unpacker(f, raw=False) - for unpacked in unpacker: + while running.get(): + + try: + unpacked = unpacker.unpack() + except: + break if user: unpacked["metadata"]["user"] = user @@ -94,14 +113,25 @@ async def loader(ge_queue, t_queue, path, format, user, collection): unpacked["metadata"]["collection"] = collection if unpacked[0] == "t": - await t_queue.put(unpacked[1]) + qtype = t_queue t_counts += 1 else: if unpacked[0] == "ge": - await ge_queue.put(unpacked[1]) + qtype = ge_queue ge_counts += 1 -async def run(**args): + while running.get(): + + try: + await asyncio.wait_for(qtype.put(unpacked[1]), 0.5) + except TimeoutError: + continue + + if not running.get(): break + + running.stop() + +async def run(running, **args): # Maxsize on queues reduces back-pressure so tg-load-kg-core doesn't # grow to eat all memory @@ -110,6 +140,7 @@ async def run(**args): load_task = asyncio.create_task( loader( + running=running, ge_queue=ge_q, t_queue=t_q, path=args["input_file"], format=args["format"], user=args["user"], collection=args["collection"], @@ -119,24 +150,26 @@ async def run(**args): ge_task = asyncio.create_task( load_ge( + running=running, queue=ge_q, url=args["url"] + "api/v1/" ) ) triples_task = asyncio.create_task( load_triples( + running=running, queue=t_q, url=args["url"] + "api/v1/" ) ) - stats_task = asyncio.create_task(stats()) + stats_task = asyncio.create_task(stats(running)) await load_task await triples_task await ge_task await stats_task -async def main(): +async def main(running): parser = argparse.ArgumentParser( prog='tg-load-kg-core', @@ -179,7 +212,15 @@ async def main(): args = parser.parse_args() - await run(**vars(args)) + await run(running, **vars(args)) -asyncio.run(main()) +running = Running() + +def interrupt(sig, frame): + running.stop() + print('Interrupt') + +signal.signal(signal.SIGINT, interrupt) + +asyncio.run(main(running)) diff --git a/trustgraph-cli/scripts/tg-save-kg-core b/trustgraph-cli/scripts/tg-save-kg-core index 3c03383f..f2509dba 100755 --- a/trustgraph-cli/scripts/tg-save-kg-core +++ b/trustgraph-cli/scripts/tg-save-kg-core @@ -16,11 +16,26 @@ import json import sys import argparse import os +import signal + +class Running: + def __init__(self): self.running = True + def get(self): return self.running + def stop(self): self.running = False + +async def fetch_ge(running, queue, user, collection, url): -async def fetch_ge(queue, user, collection, url): async with aiohttp.ClientSession() as session: + async with session.ws_connect(f"{url}stream/graph-embeddings") as ws: - async for msg in ws: + + while running.get(): + + try: + msg = await asyncio.wait_for(ws.receive(), 1) + except: + continue + if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() @@ -50,10 +65,19 @@ async def fetch_ge(queue, user, collection, url): print("Error") break -async def fetch_triples(queue, user, collection, url): +async def fetch_triples(running, queue, user, collection, url): + async with aiohttp.ClientSession() as session: + async with session.ws_connect(f"{url}stream/triples") as ws: - async for msg in ws: + + while running.get(): + + try: + msg = await asyncio.wait_for(ws.receive(), 1) + except: + continue + if msg.type == aiohttp.WSMsgType.TEXT: data = msg.json() @@ -85,27 +109,32 @@ async def fetch_triples(queue, user, collection, url): ge_counts = 0 t_counts = 0 -async def stats(): +async def stats(running): global t_counts global ge_counts - while True: - await asyncio.sleep(5) + while running.get(): + + await asyncio.sleep(2) + print( f"Graph embeddings: {ge_counts:10d} Triples: {t_counts:10d}" ) -async def output(queue, path, format): +async def output(running, queue, path, format): global t_counts global ge_counts with open(path, "wb") as f: - while True: + while running.get(): - msg = await queue.get() + try: + msg = await asyncio.wait_for(queue.get(), 0.5) + except TimeoutError: + continue if format == "msgpack": f.write(msgpack.packb(msg, use_bin_type=True)) @@ -118,12 +147,15 @@ async def output(queue, path, format): if msg[0] == "ge": ge_counts += 1 -async def run(**args): + print("Output file closed") + +async def run(running, **args): q = asyncio.Queue() ge_task = asyncio.create_task( fetch_ge( + running=running, queue=q, user=args["user"], collection=args["collection"], url=args["url"] + "api/v1/" ) @@ -131,26 +163,30 @@ async def run(**args): triples_task = asyncio.create_task( fetch_triples( - queue=q, user=args["user"], collection=args["collection"], + running=running, queue=q, + user=args["user"], collection=args["collection"], url=args["url"] + "api/v1/" ) ) output_task = asyncio.create_task( output( - queue=q, path=args["output_file"], format=args["format"], + running=running, queue=q, + path=args["output_file"], format=args["format"], ) ) - stats_task = asyncio.create_task(stats()) + stats_task = asyncio.create_task(stats(running)) await output_task await triples_task await ge_task await stats_task -async def main(): + print("Exiting") + +async def main(running): parser = argparse.ArgumentParser( prog='tg-save-kg-core', @@ -193,7 +229,15 @@ async def main(): args = parser.parse_args() - await run(**vars(args)) + await run(running, **vars(args)) -asyncio.run(main()) +running = Running() + +def interrupt(sig, frame): + running.stop() + print('Interrupt') + +signal.signal(signal.SIGINT, interrupt) + +asyncio.run(main(running))