diff --git a/trustgraph-base/trustgraph/api/knowledge.py b/trustgraph-base/trustgraph/api/knowledge.py index c193b96b..724ac79c 100644 --- a/trustgraph-base/trustgraph/api/knowledge.py +++ b/trustgraph-base/trustgraph/api/knowledge.py @@ -39,3 +39,29 @@ class Knowledge: self.request(request = input) + def load_kg_core(self, id, user="trustgraph", flow="0000", + collection="default"): + + # The input consists of system and prompt strings + input = { + "operation": "load-kg-core", + "user": user, + "id": id, + "flow": flow, + "collection": collection, + } + + self.request(request = input) + + def unload_kg_core(self, id, user="trustgraph", flow="0000"): + + # The input consists of system and prompt strings + input = { + "operation": "unload-kg-core", + "user": user, + "id": id, + "flow": flow, + } + + self.request(request = input) + diff --git a/trustgraph-base/trustgraph/schema/knowledge.py b/trustgraph-base/trustgraph/schema/knowledge.py index 9eb7d8b6..21217153 100644 --- a/trustgraph-base/trustgraph/schema/knowledge.py +++ b/trustgraph-base/trustgraph/schema/knowledge.py @@ -25,14 +25,22 @@ from . graph import Triples, GraphEmbeddings class KnowledgeRequest(Record): # get-kg-core, delete-kg-core, list-kg-cores, put-kg-core + # load-kg-core, unload-kg-core operation = String() # list-kg-cores, delete-kg-core, put-kg-core user = String() - # get-kg-core, list-kg-cores, delete-kg-core, put-kg-core + # get-kg-core, list-kg-cores, delete-kg-core, put-kg-core, + # load-kg-core, unload-kg-core id = String() + # load-kg-core + flow = String() + + # load-kg-core + collection = String() + # put-kg-core triples = Triples() graph_embeddings = GraphEmbeddings() diff --git a/trustgraph-cli/scripts/tg-load-kg-core b/trustgraph-cli/scripts/tg-load-kg-core index cd083e4f..2ecdc588 100755 --- a/trustgraph-cli/scripts/tg-load-kg-core +++ b/trustgraph-cli/scripts/tg-load-kg-core @@ -1,303 +1,80 @@ #!/usr/bin/env python3 """ -This utility takes a knowledge core and loads it into a running TrustGraph -through the API. The knowledge core should be in msgpack format, which is the -default format produce by tg-save-kg-core. +Starts a load operation on a knowledge core which is already stored by +the knowledge manager. You could load a core with tg-put-kg-core and then +run this utility. """ -import aiohttp -import asyncio -import msgpack -import json -import sys import argparse import os -import signal +import tabulate +from trustgraph.api import Api +import json -class Running: - def __init__(self): self.running = True - def get(self): return self.running - def stop(self): self.running = False +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') +default_flow = "0000" +default_collection = "default" -ge_counts = 0 -t_counts = 0 +def load_kg_core(url, user, id, flow, collection): -async def load_ge(running, queue, url): + api = Api(url).knowledge() - global ge_counts + class_names = api.load_kg_core(user = user, id = id, flow=flow, + collection=collection) - async with aiohttp.ClientSession() as session: +def main(): - async with session.ws_connect(url) as ws: - - while running.get(): - - try: - msg = await asyncio.wait_for(queue.get(), 1) - - # End of load - if msg is None: - break - - except: - # Hopefully it's TimeoutError. Annoying to match since - # it changed in 3.11. - continue - - msg = { - "metadata": { - "id": msg["m"]["i"], - "metadata": msg["m"]["m"], - "user": msg["m"]["u"], - "collection": msg["m"]["c"], - }, - "entities": [ - { - "entity": ent["e"], - "vectors": ent["v"], - } - for ent in msg["e"] - ], - } - - try: - await ws.send_json(msg) - except Exception as e: - print(e) - - ge_counts += 1 - -async def load_triples(running, queue, url): - - global t_counts - - async with aiohttp.ClientSession() as session: - - async with session.ws_connect(url) as ws: - - while running.get(): - - try: - msg = await asyncio.wait_for(queue.get(), 1) - - # End of load - if msg is None: - break - - except: - # Hopefully it's TimeoutError. Annoying to match since - # it changed in 3.11. - continue - - msg ={ - "metadata": { - "id": msg["m"]["i"], - "metadata": msg["m"]["m"], - "user": msg["m"]["u"], - "collection": msg["m"]["c"], - }, - "triples": msg["t"], - } - - try: - await ws.send_json(msg) - except Exception as e: - print(e) - - t_counts += 1 - -async def stats(running): - - global t_counts - global ge_counts - - while running.get(): - - await asyncio.sleep(2) - - print( - f"Graph embeddings: {ge_counts:10d} Triples: {t_counts:10d}" - ) - -async def loader(running, ge_queue, t_queue, path, format, user, collection): - - if format == "json": - - raise RuntimeError("Not implemented") - - else: - - with open(path, "rb") as f: - - unpacker = msgpack.Unpacker(f, raw=False) - - while running.get(): - - try: - unpacked = unpacker.unpack() - except: - break - - if user: - unpacked["metadata"]["user"] = user - - if collection: - unpacked["metadata"]["collection"] = collection - - if unpacked[0] == "t": - qtype = t_queue - else: - if unpacked[0] == "ge": - qtype = ge_queue - - while running.get(): - - try: - await asyncio.wait_for(qtype.put(unpacked[1]), 0.5) - - # Successful put message, move on - break - - except: - # Hopefully it's TimeoutError. Annoying to match since - # it changed in 3.11. - continue - - if not running.get(): break - - # Put 'None' on end of queue to finish - while running.get(): - - try: - await asyncio.wait_for(t_queue.put(None), 1) - - # Successful put message, move on - break - - except: - # Hopefully it's TimeoutError. Annoying to match since - # it changed in 3.11. - continue - - # Put 'None' on end of queue to finish - while running.get(): - - try: - await asyncio.wait_for(ge_queue.put(None), 1) - - # Successful put message, move on - break - - except: - # Hopefully it's TimeoutError. Annoying to match since - # it changed in 3.11. - continue - -async def run(running, **args): - - # Maxsize on queues reduces back-pressure so tg-load-kg-core doesn't - # grow to eat all memory - ge_q = asyncio.Queue(maxsize=10) - t_q = asyncio.Queue(maxsize=10) - - flow_id = args["flow_id"] - url = args["url"] - - load_task = asyncio.create_task( - loader( - running=running, - ge_queue=ge_q, t_queue=t_q, - path=args["input_file"], format=args["format"], - user=args["user"], collection=args["collection"], - ) - - ) - - ge_task = asyncio.create_task( - load_ge( - running = running, - queue = ge_q, - url = f"{url}api/v1/flow/{flow_id}/import/graph-embeddings" - ) - ) - - triples_task = asyncio.create_task( - load_triples( - running = running, - queue = t_q, - url = f"{url}api/v1/flow/{flow_id}/import/triples" - ) - ) - - stats_task = asyncio.create_task(stats(running)) - - await triples_task - await ge_task - - running.stop() - - await load_task - await stats_task - -async def main(running): - parser = argparse.ArgumentParser( - prog='tg-load-kg-core', + prog='tg-delete-flow-class', description=__doc__, ) - default_url = os.getenv("TRUSTGRAPH_API", "http://localhost:8088/") - default_user = "trustgraph" - collection = "default" - parser.add_argument( - '-u', '--url', + '-u', '--api-url', default=default_url, - help=f'TrustGraph API URL (default: {default_url})', + help=f'API URL (default: {default_url})', ) parser.add_argument( - '-i', '--input-file', - # Make it mandatory, difficult to over-write an existing file + '-U', '--user', + default="trustgraph", + help='API URL (default: trustgraph)', + ) + + parser.add_argument( + '--id', '--identifier', required=True, - help=f'Output file' + help=f'Knowledge core ID', ) parser.add_argument( '-f', '--flow-id', - default="0000", - help=f'Flow ID (default: 0000)' + default=default_flow, + help=f'Flow ID (default: {default_flow}', ) parser.add_argument( - '--format', - default="msgpack", - choices=["msgpack", "json"], - help=f'Output format (default: msgpack)', - ) - - parser.add_argument( - '--user', - help=f'User ID to load as (default: from input)' - ) - - parser.add_argument( - '--collection', - help=f'Collection ID to load as (default: from input)' + '-c', '--collection', + default=default_collection, + help=f'Collection ID (default: {default_collection}', ) args = parser.parse_args() - await run(running, **vars(args)) + try: -running = Running() + load_kg_core( + url=args.api_url, + user=args.user, + id=args.id, + flow=args.flow_id, + collection=args.collection, + ) -def interrupt(sig, frame): - running.stop() - print('Interrupt') + except Exception as e: -signal.signal(signal.SIGINT, interrupt) + print("Exception:", e, flush=True) -asyncio.run(main(running)) +main() diff --git a/trustgraph-cli/scripts/tg-save-kg-core b/trustgraph-cli/scripts/tg-save-kg-core deleted file mode 100755 index 05e3adae..00000000 --- a/trustgraph-cli/scripts/tg-save-kg-core +++ /dev/null @@ -1,259 +0,0 @@ -#!/usr/bin/env python3 - -""" -This utility connects to a running TrustGraph through the API and creates -a knowledge core from the data streaming through the processing queues. -For completeness of data, tg-save-kg-core should be initiated before data -loading takes place. The default output format, msgpack should be used. -JSON output format is also available - msgpack produces a more compact -representation, which is also more performant to load. -""" - -import aiohttp -import asyncio -import msgpack -import json -import sys -import argparse -import os -import signal - -class Running: - def __init__(self): self.running = True - def get(self): return self.running - def stop(self): self.running = False - -async def fetch_ge(running, queue, user, collection, url): - - async with aiohttp.ClientSession() as session: - - async with session.ws_connect(url) as ws: - - while running.get(): - - try: - msg = await asyncio.wait_for(ws.receive(), 1) - except: - continue - - if msg.type == aiohttp.WSMsgType.TEXT: - - data = msg.json() - - if user: - if data["metadata"]["user"] != user: - continue - - if collection: - if data["metadata"]["collection"] != collection: - continue - - await queue.put([ - "ge", - { - "m": { - "i": data["metadata"]["id"], - "m": data["metadata"]["metadata"], - "u": data["metadata"]["user"], - "c": data["metadata"]["collection"], - }, - "e": [ - { - "e": ent["entity"], - "v": ent["vectors"], - } - for ent in data["entities"] - ] - } - ]) - if msg.type == aiohttp.WSMsgType.ERROR: - print("Error") - break - -async def fetch_triples(running, queue, user, collection, url): - - async with aiohttp.ClientSession() as session: - - async with session.ws_connect(url) as ws: - - while running.get(): - - try: - msg = await asyncio.wait_for(ws.receive(), 1) - except: - continue - - if msg.type == aiohttp.WSMsgType.TEXT: - - data = msg.json() - - if user: - if data["metadata"]["user"] != user: - continue - - if collection: - if data["metadata"]["collection"] != collection: - continue - - await queue.put(( - "t", - { - "m": { - "i": data["metadata"]["id"], - "m": data["metadata"]["metadata"], - "u": data["metadata"]["user"], - "c": data["metadata"]["collection"], - }, - "t": data["triples"], - } - )) - if msg.type == aiohttp.WSMsgType.ERROR: - print("Error") - break - -ge_counts = 0 -t_counts = 0 - -async def stats(running): - - global t_counts - global ge_counts - - while running.get(): - - await asyncio.sleep(2) - - print( - f"Graph embeddings: {ge_counts:10d} Triples: {t_counts:10d}" - ) - -async def output(running, queue, path, format): - - global t_counts - global ge_counts - - with open(path, "wb") as f: - - while running.get(): - - try: - msg = await asyncio.wait_for(queue.get(), 0.5) - except: - # Hopefully it's TimeoutError. Annoying to match since - # it changed in 3.11. - continue - - if format == "msgpack": - f.write(msgpack.packb(msg, use_bin_type=True)) - else: - f.write(json.dumps(msg).encode("utf-8")) - - if msg[0] == "t": - t_counts += 1 - else: - if msg[0] == "ge": - ge_counts += 1 - - print("Output file closed") - -async def run(running, **args): - - q = asyncio.Queue() - - flow_id = args["flow_id"] - url = args["url"] - - ge_task = asyncio.create_task( - fetch_ge( - running=running, - queue=q, user=args["user"], collection=args["collection"], - url = f"{url}api/v1/flow/{flow_id}/export/graph-embeddings" - ) - ) - - triples_task = asyncio.create_task( - fetch_triples( - running=running, queue=q, - user=args["user"], collection=args["collection"], - url = f"{url}api/v1/flow/{flow_id}/export/triples" - ) - ) - - output_task = asyncio.create_task( - output( - running=running, queue=q, - path=args["output_file"], format=args["format"], - ) - - ) - - stats_task = asyncio.create_task(stats(running)) - - await output_task - await triples_task - await ge_task - await stats_task - - print("Exiting") - -async def main(running): - - parser = argparse.ArgumentParser( - prog='tg-save-kg-core', - description=__doc__, - ) - - default_url = os.getenv("TRUSTGRAPH_API", "http://localhost:8088/") - default_user = "trustgraph" - collection = "default" - - parser.add_argument( - '-u', '--url', - default=default_url, - help=f'TrustGraph API URL (default: {default_url})', - ) - - parser.add_argument( - '-o', '--output-file', - # Make it mandatory, difficult to over-write an existing file - required=True, - help=f'Output file' - ) - - parser.add_argument( - '--format', - default="msgpack", - choices=["msgpack", "json"], - help=f'Output format (default: msgpack)', - ) - - parser.add_argument( - '-f', '--flow-id', - default="0000", - help=f'Flow ID (default: 0000)' - ) - - parser.add_argument( - '--user', - help=f'User ID to filter on (default: no filter)' - ) - - parser.add_argument( - '--collection', - help=f'Collection ID to filter on (default: no filter)' - ) - - args = parser.parse_args() - - await run(running, **vars(args)) - -running = Running() - -def interrupt(sig, frame): - running.stop() - print('Interrupt') - -signal.signal(signal.SIGINT, interrupt) - -asyncio.run(main(running)) - diff --git a/trustgraph-cli/scripts/tg-unload-kg-core b/trustgraph-cli/scripts/tg-unload-kg-core new file mode 100755 index 00000000..7227942d --- /dev/null +++ b/trustgraph-cli/scripts/tg-unload-kg-core @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 + +""" +Starts a load operation on a knowledge core which is already stored by +the knowledge manager. You could load a core with tg-put-kg-core and then +run this utility. +""" + +import argparse +import os +import tabulate +from trustgraph.api import Api +import json + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') +default_flow = "0000" +default_collection = "default" + +def unload_kg_core(url, user, id, flow): + + api = Api(url).knowledge() + + class_names = api.unload_kg_core(user = user, id = id, flow=flow) + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-delete-flow-class', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-U', '--user', + default="trustgraph", + help='API URL (default: trustgraph)', + ) + + parser.add_argument( + '--id', '--identifier', + required=True, + help=f'Knowledge core ID', + ) + + parser.add_argument( + '-f', '--flow-id', + default=default_flow, + help=f'Flow ID (default: {default_flow}', + ) + + args = parser.parse_args() + + try: + + unload_kg_core( + url=args.api_url, + user=args.user, + id=args.id, + flow=args.flow_id, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/setup.py b/trustgraph-cli/setup.py index 84bcc88a..b555eb92 100644 --- a/trustgraph-cli/setup.py +++ b/trustgraph-cli/setup.py @@ -67,7 +67,6 @@ setuptools.setup( "scripts/tg-put-kg-core", "scripts/tg-remove-library-document", "scripts/tg-save-doc-embeds", - "scripts/tg-save-kg-core", "scripts/tg-set-prompt", "scripts/tg-set-token-costs", "scripts/tg-show-config", @@ -83,6 +82,7 @@ setuptools.setup( "scripts/tg-show-token-costs", "scripts/tg-show-tools", "scripts/tg-start-flow", + "scripts/tg-unload-kg-core", "scripts/tg-start-library-processing", "scripts/tg-stop-flow", "scripts/tg-stop-library-processing", diff --git a/trustgraph-flow/trustgraph/config/service/config.py b/trustgraph-flow/trustgraph/config/service/config.py index 00afae1f..de684ec2 100644 --- a/trustgraph-flow/trustgraph/config/service/config.py +++ b/trustgraph-flow/trustgraph/config/service/config.py @@ -120,31 +120,31 @@ class Configuration: async def handle_getvalues(self, v): - if v.type not in self: + # if v.type not in self: - return ConfigResponse( - version = None, - values = None, - directory = None, - config = None, - error = Error( - type = "key-error", - message = f"Key error" - ) - ) + # return ConfigResponse( + # version = None, + # values = None, + # directory = None, + # config = None, + # error = Error( + # type = "key-error", + # message = f"Key error" + # ) + # ) - v = await self.table_store.get_values(v.type) + vals = await self.table_store.get_values(v.type) values = map( lambda x: ConfigValue( type = v.type, key = x[0], value = x[1] ), - v + vals ) return ConfigResponse( version = await self.get_version(), - values = values, + values = list(values), directory = None, config = None, error = None, diff --git a/trustgraph-flow/trustgraph/cores/knowledge.py b/trustgraph-flow/trustgraph/cores/knowledge.py index 34ffa542..8c082601 100644 --- a/trustgraph-flow/trustgraph/cores/knowledge.py +++ b/trustgraph-flow/trustgraph/cores/knowledge.py @@ -1,23 +1,29 @@ -from .. schema import KnowledgeResponse, Error +from .. schema import KnowledgeResponse, Error, Triples, GraphEmbeddings from .. knowledge import hash from .. exceptions import RequestError from .. tables.knowledge import KnowledgeTableStore -import base64 +from .. base import Publisher +import base64 +import asyncio import uuid class KnowledgeManager: def __init__( self, cassandra_host, cassandra_user, cassandra_password, - keyspace, + keyspace, flow_config, ): self.table_store = KnowledgeTableStore( cassandra_host, cassandra_user, cassandra_password, keyspace ) + self.loader_queue = asyncio.Queue(maxsize=20) + self.background_task = None + self.flow_config = flow_config + async def delete_kg_core(self, request, respond): print("Deleting core...", flush=True) @@ -122,3 +128,158 @@ class KnowledgeManager: ) ) + async def load_kg_core(self, request, respond): + + if self.background_task is None: + self.background_task = asyncio.create_task( + self.core_loader() + ) + # Wait for it to start (yuck) +# await asyncio.sleep(0.5) + + await self.loader_queue.put((request, respond)) + + # Not sending a response, the loader thread can do that + + async def unload_kg_core(self, request, respond): + + await respond( + KnowledgeResponse( + error = Error( + type = "not-implemented", + message = "Not implemented" + ), + ids = None, + eos = False, + triples = None, + graph_embeddings = None + ) + ) + + async def core_loader(self): + + print("Running...", flush=True) + while True: + + print("Wait for next load...", flush=True) + request, respond = await self.loader_queue.get() + + print("Loading...", request.id, flush=True) + + try: + + if request.id is None: + raise RuntimeError("Core ID must be specified") + + if request.flow is None: + raise RuntimeError("Flow ID must be specified") + + if request.flow not in self.flow_config.flows: + raise RuntimeError("Invalid flow") + + flow = self.flow_config.flows[request.flow] + + if "interfaces" not in flow: + raise RuntimeError("No defined interfaces") + + if "triples-store" not in flow["interfaces"]: + raise RuntimeError("Flow has no triples-store") + + if "graph-embeddings-store" not in flow["interfaces"]: + raise RuntimeError("Flow has no graph-embeddings-store") + + t_q = flow["interfaces"]["triples-store"] + ge_q = flow["interfaces"]["graph-embeddings-store"] + + # Got this far, it should all work + await respond( + KnowledgeResponse( + error = None, + ids = None, + eos = False, + triples = None, + graph_embeddings = None + ) + ) + + except Exception as e: + + print("Exception:", e, flush=True) + await respond( + KnowledgeResponse( + error = Error( + type = "load-error", + message = str(e), + ), + ids = None, + eos = False, + triples = None, + graph_embeddings = None + ) + ) + + + print("Going to start loading...", flush=True) + + try: + + t_pub = None + ge_pub = None + + print(t_q, flush=True) + print(ge_q, flush=True) + + t_pub = Publisher( + self.flow_config.pulsar_client, t_q, + schema=Triples, + ) + ge_pub = Publisher( + self.flow_config.pulsar_client, ge_q, + schema=GraphEmbeddings + ) + + print("Start publishers...", flush=True) + + await t_pub.start() + await ge_pub.start() + + async def publish_triples(t): + await t_pub.send(None, t) + + print("Publish triples...", flush=True) + + # Remove doc table row + await self.table_store.get_triples( + request.user, + request.id, + publish_triples, + ) + + async def publish_ge(g): + await ge_pub.send(None, g) + + print("Publish GEs...", flush=True) + + # Remove doc table row + await self.table_store.get_graph_embeddings( + request.user, + request.id, + publish_ge, + ) + + print("Completed that.", flush=True) + + except Exception as e: + + print("Exception:", e, flush=True) + + finally: + + print("Stopping publishers...", flush=True) + + if t_pub: await t_pub.stop() + if ge_pub: await ge_pub.stop() + + print("Done", flush=True) + + continue diff --git a/trustgraph-flow/trustgraph/cores/service.py b/trustgraph-flow/trustgraph/cores/service.py index 8eccb202..810d159d 100755 --- a/trustgraph-flow/trustgraph/cores/service.py +++ b/trustgraph-flow/trustgraph/cores/service.py @@ -89,6 +89,7 @@ class Processor(AsyncProcessor): cassandra_user = cassandra_user, cassandra_password = cassandra_password, keyspace = keyspace, + flow_config = self, ) self.register_config_handler(self.on_knowledge_config) @@ -128,6 +129,8 @@ class Processor(AsyncProcessor): "get-kg-core": self.knowledge.get_kg_core, "delete-kg-core": self.knowledge.delete_kg_core, "put-kg-core": self.knowledge.put_kg_core, + "load-kg-core": self.knowledge.load_kg_core, + "unload-kg-core": self.knowledge.unload_kg_core, } if v.operation not in impls: diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py index 01043a80..a35ee4f0 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/knowledge.py @@ -61,29 +61,26 @@ class KnowledgeRequestor(ServiceRequestor): operation = body.get("operation", None), user = body.get("user", None), id = body.get("id", None), + flow = body.get("flow", None), + collection = body.get("collection", None), triples = triples, graph_embeddings = ge, ) def from_response(self, message): - print("Processing message") - # Response to list, if message.ids is not None: - print("-> IDS") return { "ids": message.ids }, True if message.triples: - print("-> triples") return { "triples": serialize_triples(message.triples) }, False if message.graph_embeddings: - print("-> ge") return { "graph-embeddings": serialize_graph_embeddings( message.graph_embeddings @@ -91,7 +88,6 @@ class KnowledgeRequestor(ServiceRequestor): }, False if message.eos is True: - print("-> eos") return { "eos": True }, True diff --git a/trustgraph-flow/trustgraph/tables/config.py b/trustgraph-flow/trustgraph/tables/config.py index 3b9c2eb9..45dfc4d9 100644 --- a/trustgraph-flow/trustgraph/tables/config.py +++ b/trustgraph-flow/trustgraph/tables/config.py @@ -161,6 +161,7 @@ class ConfigTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -180,6 +181,7 @@ class ConfigTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -204,6 +206,7 @@ class ConfigTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -228,6 +231,7 @@ class ConfigTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -251,6 +255,7 @@ class ConfigTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -275,6 +280,7 @@ class ConfigTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -297,6 +303,7 @@ class ConfigTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) diff --git a/trustgraph-flow/trustgraph/tables/knowledge.py b/trustgraph-flow/trustgraph/tables/knowledge.py index 3996b5a7..36414dc4 100644 --- a/trustgraph-flow/trustgraph/tables/knowledge.py +++ b/trustgraph-flow/trustgraph/tables/knowledge.py @@ -232,6 +232,7 @@ class KnowledgeTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -276,6 +277,7 @@ class KnowledgeTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -320,6 +322,7 @@ class KnowledgeTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -340,6 +343,7 @@ class KnowledgeTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -370,6 +374,7 @@ class KnowledgeTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -386,6 +391,7 @@ class KnowledgeTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -406,6 +412,7 @@ class KnowledgeTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -463,6 +470,7 @@ class KnowledgeTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) diff --git a/trustgraph-flow/trustgraph/tables/library.py b/trustgraph-flow/trustgraph/tables/library.py index 4168fd2b..c8cdb027 100644 --- a/trustgraph-flow/trustgraph/tables/library.py +++ b/trustgraph-flow/trustgraph/tables/library.py @@ -232,6 +232,7 @@ class LibraryTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -267,6 +268,7 @@ class LibraryTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -292,6 +294,7 @@ class LibraryTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -314,6 +317,7 @@ class LibraryTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -361,6 +365,7 @@ class LibraryTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -407,6 +412,7 @@ class LibraryTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -455,6 +461,7 @@ class LibraryTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -480,6 +487,7 @@ class LibraryTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1) @@ -502,6 +510,7 @@ class LibraryTableStore: except Exception as e: print("Exception:", type(e)) + raise e print(f"{e}, retry...", flush=True) await asyncio.sleep(1)