diff --git a/trustgraph-base/trustgraph/api/api.py b/trustgraph-base/trustgraph/api/api.py index fbff8d34..cff2b36a 100644 --- a/trustgraph-base/trustgraph/api/api.py +++ b/trustgraph-base/trustgraph/api/api.py @@ -29,6 +29,18 @@ class ConfigValue: key : str value : str +def check_error(response): + + if "error" in response: + + try: + msg = response["error"]["message"] + tp = response["error"]["type"] + except: + raise ApplicationException(response["error"]) + + raise ApplicationException(f"{tp}: {msg}") + class Api: def __init__(self, url="http://localhost:8088/"): @@ -40,19 +52,200 @@ class Api: self.url += "api/v1/" - def check_error(self, response): + def flow(self, flow="0000"): + return Flow(api=self, flow=flow) - if "error" in response: + def request(self, path, request): - try: - msg = response["error"]["message"] - tp = response["error"]["type"] - except: - raise ApplicationException( - "Error, but the error object is broken" + url = f"{self.url}{path}" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=request) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + check_error(object) + + return object + + def config_all(self): + + # The input consists of system and prompt strings + input = { + "operation": "config" + } + + object = self.request("config", input) + + try: + return object["config"], object["version"] + except: + raise ProtocolException(f"Response not formatted correctly") + + def config_get(self, keys): + + # The input consists of system and prompt strings + input = { + "operation": "get", + "keys": [ + { "type": k.type, "key": k.key } + for k in keys + ] + } + + object = self.request("config", input) + + try: + return [ + ConfigValue( + type = v["type"], + key = v["key"], + value = v["value"] ) + for v in object["values"] + ] + except: + raise ProtocolException(f"Response not formatted correctly") - raise ApplicationException(f"{tp}: {msg}") + def config_put(self, values): + + # The input consists of system and prompt strings + input = { + "operation": "put", + "values": [ + { "type": v.type, "key": v.key, "value": v.value } + for v in values + ] + } + + self.request("config", input) + + def config_list(self, type): + + # The input consists of system and prompt strings + input = { + "operation": "list", + "type": type, + } + + return self.request("config", input)["directory"] + + def config_getvalues(self, type): + + # The input consists of system and prompt strings + input = { + "operation": "getvalues", + "type": type, + } + + object = self.request("config", input)["directory"] + + try: + return [ + ConfigValue( + type = v["type"], + key = v["key"], + value = v["value"] + ) + for v in object["values"] + ] + except: + raise ProtocolException(f"Response not formatted correctly") + + def flow_list_classes(self): + + # The input consists of system and prompt strings + input = { + "operation": "list-classes", + } + + return self.request("flow", input)["class-names"] + + def flow_get_class(self, class_name): + + # The input consists of system and prompt strings + input = { + "operation": "get-class", + "class-name": class_name, + } + + return json.loads(self.request("flow", input)["class-definition"]) + + def flow_put_class(self, class_name, definition): + + # The input consists of system and prompt strings + input = { + "operation": "put-class", + "class-name": class_name, + "class-definition": json.dumps(definition), + } + + self.request("flow", input) + + def flow_delete_class(self, class_name): + + # The input consists of system and prompt strings + input = { + "operation": "delete-class", + "class-name": class_name, + } + + self.request("flow", input) + + def flow_list(self): + + # The input consists of system and prompt strings + input = { + "operation": "list-flows", + } + + return self.request("flow", input)["flow-ids"] + + def flow_get(self, id): + + # The input consists of system and prompt strings + input = { + "operation": "get-flow", + "flow-id": id, + } + + return json.loads(self.request("flow", input)["flow"]) + + def flow_start(self, class_name, id, description): + + # The input consists of system and prompt strings + input = { + "operation": "start-flow", + "flow-id": id, + "class-name": class_name, + "description": description, + } + + self.request("flow", input) + + def flow_stop(self, id): + + # The input consists of system and prompt strings + input = { + "operation": "stop-flow", + "flow-id": id, + } + + self.request("flow", input) + +class Flow: + + def __init__(self, api, flow): + self.api = api + self.flow = flow def text_completion(self, system, prompt): @@ -62,27 +255,10 @@ class Api: "prompt": prompt } - url = f"{self.url}text-completion" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["response"] - except: - raise ProtocolException(f"Response not formatted correctly") + return self.api.request( + f"flow/{self.flow}/service/text-completion", + input + )["response"] def agent(self, question): @@ -91,27 +267,10 @@ class Api: "question": question } - url = f"{self.url}agent" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["answer"] - except: - raise ProtocolException(f"Response not formatted correctly") + return self.api.request( + f"flow/{self.flow}/service/agent", + input + )["answer"] def graph_rag( self, question, user="trustgraph", collection="default", @@ -130,27 +289,10 @@ class Api: "max-path-length": max_path_length, } - url = f"{self.url}graph-rag" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["response"] - except: - raise ProtocolException(f"Response not formatted correctly") + return self.api.request( + f"flow/{self.flow}/service/graph-rag", + input + )["response"] def document_rag( self, question, user="trustgraph", collection="default", @@ -165,27 +307,10 @@ class Api: "doc-limit": doc_limit, } - url = f"{self.url}document-rag" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["response"] - except: - raise ProtocolException(f"Response not formatted correctly") + return self.api.request( + f"flow/{self.flow}/service/document-rag", + input + )["response"] def embeddings(self, text): @@ -194,27 +319,10 @@ class Api: "text": text } - url = f"{self.url}embeddings" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["vectors"] - except: - raise ProtocolException(f"Response not formatted correctly") + return self.api.request( + f"flow/{self.flow}/service/embeddings", + input + )["vectors"] def prompt(self, id, variables): @@ -224,22 +332,10 @@ class Api: "variables": variables } - url = f"{self.url}prompt" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException("Expected JSON response") - - self.check_error(object) + object = self.api.request( + f"flow/{self.flow}/service/prompt", + input + ) if "text" in object: return object["text"] @@ -254,13 +350,22 @@ class Api: raise ProtocolException("Response not formatted correctly") - def triples_query(self, s=None, p=None, o=None, limit=10000): + def triples_query( + self, s=None, p=None, o=None, + user=None, collection=None, limit=10000 + ): # The input consists of system and prompt strings input = { "limit": limit } + if user: + input["user"] = user + + if collection: + input["collection"] = collection + if s: if not isinstance(s, Uri): raise RuntimeError("s must be Uri") @@ -276,25 +381,10 @@ class Api: raise RuntimeError("o must be Uri or Literal") input["o"] = { "v": str(o), "e": isinstance(o, Uri), } - url = f"{self.url}triples-query" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException("Expected JSON response") - - self.check_error(object) - - if "response" not in object: - raise ProtocolException("Response not formatted correctly") + object = self.api.request( + f"flow/{self.flow}/service/triples", + input + ) def to_value(x): if x["e"]: return Uri(x["v"]) @@ -309,9 +399,10 @@ class Api: for t in object["response"] ] - return object["response"] - - def load_document(self, document, id=None, metadata=None): + def load_document( + self, document, id=None, metadata=None, user=None, + collection=None, + ): if id is None: @@ -344,16 +435,21 @@ class Api: "data": base64.b64encode(document).decode("utf-8"), } - url = f"{self.url}load/document" + if user: + input["user"] = user - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) + if collection: + input["collection"] = collection - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") + return self.api.request( + f"flow/{self.flow}/service/document-load", + input + ) - def load_text(self, text, id=None, metadata=None, charset="utf-8"): + def load_text( + self, text, id=None, metadata=None, charset="utf-8", + user=None, collection=None, + ): if id is None: @@ -384,411 +480,14 @@ class Api: "text": base64.b64encode(text).decode("utf-8"), } - url = f"{self.url}load/text" + if user: + input["user"] = user - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) + if collection: + input["collection"] = collection - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - def config_all(self): - - # The input consists of system and prompt strings - input = { - "operation": "config" - } - - url = f"{self.url}config" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["config"], object["version"] - except: - raise ProtocolException(f"Response not formatted correctly") - - def config_get(self, keys): - - # The input consists of system and prompt strings - input = { - "operation": "get", - "keys": [ - { "type": k.type, "key": k.key } - for k in keys - ] - } - - url = f"{self.url}config" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return [ - ConfigValue( - type = v["type"], - key = v["key"], - value = v["value"] - ) - for v in object["values"] - ] - except: - raise ProtocolException(f"Response not formatted correctly") - - def config_put(self, values): - - # The input consists of system and prompt strings - input = { - "operation": "put", - "values": [ - { "type": v.type, "key": v.key, "value": v.value } - for v in values - ] - } - - url = f"{self.url}config" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return None - except: - raise ProtocolException(f"Response not formatted correctly") - - def config_list(self, type): - - # The input consists of system and prompt strings - input = { - "operation": "list", - "type": type, - } - - url = f"{self.url}config" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["directory"] - except: - raise ProtocolException(f"Response not formatted correctly") - - def config_getvalues(self, type): - - # The input consists of system and prompt strings - input = { - "operation": "getvalues", - "type": type, - } - - url = f"{self.url}config" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return [ - ConfigValue( - type = v["type"], - key = v["key"], - value = v["value"] - ) - for v in object["values"] - ] - except: - raise ProtocolException(f"Response not formatted correctly") - - def flow_list_classes(self): - - # The input consists of system and prompt strings - input = { - "operation": "list-classes", - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["class-names"] - except: - raise ProtocolException(f"Response not formatted correctly") - - def flow_get_class(self, class_name): - - # The input consists of system and prompt strings - input = { - "operation": "get-class", - "class-name": class_name, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return json.loads(object["class-definition"]) - except Exception as e: - print(e) - raise ProtocolException(f"Response not formatted correctly") - - def flow_put_class(self, class_name, definition): - - # The input consists of system and prompt strings - input = { - "operation": "put-class", - "class-name": class_name, - "class-definition": json.dumps(definition), - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - return - - def flow_delete_class(self, class_name): - - # The input consists of system and prompt strings - input = { - "operation": "delete-class", - "class-name": class_name, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - return - - def flow_list(self): - - # The input consists of system and prompt strings - input = { - "operation": "list-flows", - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return object["flow-ids"] - except: - raise ProtocolException(f"Response not formatted correctly") - - def flow_get(self, id): - - # The input consists of system and prompt strings - input = { - "operation": "get-flow", - "flow-id": id, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - try: - return json.loads(object["flow"]) - except: - raise ProtocolException(f"Response not formatted correctly") - - def flow_start(self, class_name, id, description): - - # The input consists of system and prompt strings - input = { - "operation": "start-flow", - "flow-id": id, - "class-name": class_name, - "description": description, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - return - - def flow_stop(self, id): - - # The input consists of system and prompt strings - input = { - "operation": "stop-flow", - "flow-id": id, - } - - url = f"{self.url}flow" - - # Invoke the API, input is passed as JSON - resp = requests.post(url, json=input) - - # Should be a 200 status code - if resp.status_code != 200: - raise ProtocolException(f"Status code {resp.status_code}") - - try: - # Parse the response as JSON - object = resp.json() - except: - raise ProtocolException(f"Expected JSON response") - - self.check_error(object) - - return + return self.api.request( + f"flow/{self.flow}/service/text-load", + input + ) diff --git a/trustgraph-cli/scripts/tg-graph-show b/trustgraph-cli/scripts/tg-graph-show index a3d10283..73a67d72 100755 --- a/trustgraph-cli/scripts/tg-graph-show +++ b/trustgraph-cli/scripts/tg-graph-show @@ -12,12 +12,12 @@ default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') default_user = 'trustgraph' default_collection = 'default' -def show_graph(url, user, collection): +def show_graph(url, flow_id, user, collection): api = Api(url) - rows = api.triples_query( -# user=user, collection=collection, + rows = api.flow(flow_id).triples_query( + user=user, collection=collection, s=None, p=None, o=None, limit=10_000, ) @@ -37,6 +37,12 @@ def main(): help=f'API URL (default: {default_url})', ) + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) + parser.add_argument( '-U', '--user', default=default_user, @@ -54,9 +60,10 @@ def main(): try: show_graph( - url=args.api_url, - user=args.user, - collection=args.collection, + url = args.api_url, + flow_id = args.flow_id, + user = args.user, + collection = args.collection, ) except Exception as e: diff --git a/trustgraph-cli/scripts/tg-graph-to-turtle b/trustgraph-cli/scripts/tg-graph-to-turtle index fc17ddd0..da8a5f56 100755 --- a/trustgraph-cli/scripts/tg-graph-to-turtle +++ b/trustgraph-cli/scripts/tg-graph-to-turtle @@ -17,14 +17,14 @@ default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') default_user = 'trustgraph' default_collection = 'default' -def show_graph(url, user, collection): +def show_graph(url, flow_id, user, collection): api = Api(url) - rows = api.triples_query( + rows = api.flow(flow_id).triples_query( s=None, p=None, o=None, + user=user, collection=collection, limit=10_000) -# user=user, collection=collection, g = rdflib.Graph() @@ -69,6 +69,12 @@ def main(): help=f'API URL (default: {default_url})', ) + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) + parser.add_argument( '-U', '--user', default=default_user, @@ -86,9 +92,10 @@ def main(): try: show_graph( - url=args.api_url, - user=args.user, - collection=args.collection + url = args.api_url, + flow_id = args.flow_id, + user = args.user, + collection = args.collection, ) except Exception as e: diff --git a/trustgraph-cli/scripts/tg-invoke-agent b/trustgraph-cli/scripts/tg-invoke-agent index 5e213447..79760fec 100755 --- a/trustgraph-cli/scripts/tg-invoke-agent +++ b/trustgraph-cli/scripts/tg-invoke-agent @@ -30,7 +30,7 @@ def output(text, prefix="> ", width=78): print(out) async def question( - url, question, user, collection, + url, question, flow_id, user, collection, plan=None, state=None, verbose=False ): @@ -60,6 +60,7 @@ async def question( req = json.dumps({ "id": mid, "service": "agent", + "flow": flow_id, "request": { "question": question, } @@ -74,6 +75,9 @@ async def question( obj = json.loads(msg) + if obj["error"]: + raise RuntimeError(obj["error"]) + if obj["id"] != mid: print("Ignore message") continue @@ -104,6 +108,12 @@ def main(): help=f'API URL (default: {default_url})', ) + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) + parser.add_argument( '-q', '--question', required=True, @@ -137,12 +147,6 @@ def main(): action="store_true", help=f'Output thinking/observations' ) - - # parser.add_argument( - # '--pulsar-api-key', - # default=default_pulsar_api_key, - # help=f'Pulsar API key', - # ) args = parser.parse_args() @@ -150,13 +154,14 @@ def main(): asyncio.run( question( - url=args.url, - question=args.question, - user=args.user, - collection=args.collection, - plan=args.plan, - state=args.state, - verbose=args.verbose, + url = args.url, + flow_id = args.flow_id, + question = args.question, + user = args.user, + collection = args.collection, + plan = args.plan, + state = args.state, + verbose = args.verbose, ) ) diff --git a/trustgraph-cli/scripts/tg-invoke-document-rag b/trustgraph-cli/scripts/tg-invoke-document-rag index 759d4200..fd8d61ed 100755 --- a/trustgraph-cli/scripts/tg-invoke-document-rag +++ b/trustgraph-cli/scripts/tg-invoke-document-rag @@ -13,11 +13,11 @@ default_user = 'trustgraph' default_collection = 'default' default_doc_limit = 10 -def question(url, question, user, collection, doc_limit): +def question(url, flow_id, question, user, collection, doc_limit): - rag = Api(url) + api = Api(url) - resp = rag.document_rag( + resp = api.flow(flow_id).document_rag( question=question, user=user, collection=collection, doc_limit=doc_limit, ) @@ -37,11 +37,11 @@ def main(): help=f'API URL (default: {default_url})', ) - # parser.add_argument( - # '--pulsar-api-key', - # default=default_pulsar_api_key, - # help=f'Pulsar API key', - # ) + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) parser.add_argument( '-q', '--question', @@ -73,6 +73,7 @@ def main(): question( url=args.url, + flow_id = args.flow_id, question=args.question, user=args.user, collection=args.collection, diff --git a/trustgraph-cli/scripts/tg-invoke-graph-rag b/trustgraph-cli/scripts/tg-invoke-graph-rag index 5bbe5f59..9cccf680 100755 --- a/trustgraph-cli/scripts/tg-invoke-graph-rag +++ b/trustgraph-cli/scripts/tg-invoke-graph-rag @@ -17,13 +17,13 @@ default_max_subgraph_size = 150 default_max_path_length = 2 def question( - url, question, user, collection, entity_limit, triple_limit, + url, flow_id, question, user, collection, entity_limit, triple_limit, max_subgraph_size, max_path_length ): - rag = Api(url) + api = Api(url) - resp = rag.graph_rag( + resp = api.flow(flow_id).graph_rag( question=question, user=user, collection=collection, entity_limit=entity_limit, triple_limit=triple_limit, max_subgraph_size=max_subgraph_size, @@ -45,6 +45,12 @@ def main(): help=f'API URL (default: {default_url})', ) + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) + parser.add_argument( '-q', '--question', required=True, @@ -93,6 +99,7 @@ def main(): question( url=args.url, + flow_id = args.flow_id, question=args.question, user=args.user, collection=args.collection, diff --git a/trustgraph-cli/scripts/tg-invoke-llm b/trustgraph-cli/scripts/tg-invoke-llm index eb469b6e..21915a00 100755 --- a/trustgraph-cli/scripts/tg-invoke-llm +++ b/trustgraph-cli/scripts/tg-invoke-llm @@ -12,11 +12,11 @@ from trustgraph.api import Api default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') -def query(url, system, prompt): +def query(url, flow_id, system, prompt): api = Api(url) - resp = api.text_completion(system=system, prompt=prompt) + resp = api.flow(flow_id).text_completion(system=system, prompt=prompt) print(resp) @@ -44,12 +44,12 @@ def main(): nargs=1, help='LLM prompt e.g. What is 2 + 2?', ) - - # parser.add_argument( - # '--pulsar-api-key', - # default=default_pulsar_api_key, - # help=f'Pulsar API key', - # ) + + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) args = parser.parse_args() @@ -57,6 +57,7 @@ def main(): query( url=args.url, + flow = args.flow_id, system=args.system[0], prompt=args.prompt[0], ) diff --git a/trustgraph-cli/scripts/tg-invoke-prompt b/trustgraph-cli/scripts/tg-invoke-prompt index 426fe1ee..b86ef105 100755 --- a/trustgraph-cli/scripts/tg-invoke-prompt +++ b/trustgraph-cli/scripts/tg-invoke-prompt @@ -16,11 +16,11 @@ from trustgraph.api import Api default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') -def query(url, template_id, variables): +def query(url, flow_id, template_id, variables): api = Api(url) - resp = api.prompt(id=template_id, variables=variables) + resp = api.flow(flow_id).prompt(id=template_id, variables=variables) if isinstance(resp, str): print(resp) @@ -40,6 +40,12 @@ def main(): help=f'API URL (default: {default_url})', ) + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) + parser.add_argument( 'id', metavar='template-id', @@ -77,6 +83,7 @@ specified multiple times''', query( url=args.url, + flow_id=args.flow_id, template_id=args.id[0], variables=variables, ) diff --git a/trustgraph-cli/scripts/tg-load-doc-embeds b/trustgraph-cli/scripts/tg-load-doc-embeds index d445ec5a..c242bb74 100755 --- a/trustgraph-cli/scripts/tg-load-doc-embeds +++ b/trustgraph-cli/scripts/tg-load-doc-embeds @@ -27,7 +27,7 @@ async def load_de(running, queue, url): async with aiohttp.ClientSession() as session: - async with session.ws_connect(f"{url}load/document-embeddings") as ws: + async with session.ws_connect(url) as ws: while running.get(): @@ -141,6 +141,9 @@ async def run(running, **args): # grow to eat all memory de_q = asyncio.Queue(maxsize=10) + url = args["url"] + flow_id = args["flow_id"] + load_task = asyncio.create_task( loader( running=running, @@ -154,7 +157,8 @@ async def run(running, **args): de_task = asyncio.create_task( load_de( running=running, - queue=de_q, url=args["url"] + "api/v1/" + queue=de_q, + url = f"{url}api/v1/flow/{flow_id}/import/document-embeddings" ) ) @@ -184,6 +188,12 @@ async def main(running): help=f'TrustGraph API URL (default: {default_url})', ) + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) + parser.add_argument( '-i', '--input-file', # Make it mandatory, difficult to over-write an existing file diff --git a/trustgraph-cli/scripts/tg-load-pdf b/trustgraph-cli/scripts/tg-load-pdf index 3e960c67..77105315 100755 --- a/trustgraph-cli/scripts/tg-load-pdf +++ b/trustgraph-cli/scripts/tg-load-pdf @@ -27,13 +27,14 @@ class Loader: def __init__( self, url, + flow_id, user, collection, metadata, pulsar_api_key=None, ): - self.api = Api(url) + self.api = Api(url).flow(flow_id) self.user = user self.collection = collection @@ -60,14 +61,15 @@ class Loader: self.api.load_document( document=data, id=id, metadata=self.metadata, -# user=self.user, -# collection=self.collection, + user=self.user, + collection=self.collection, ) print(f"{file}: Loaded successfully.") except Exception as e: print(f"{file}: Failed: {str(e)}", flush=True) + raise e def main(): @@ -82,6 +84,12 @@ def main(): help=f'API URL (default: {default_url})', ) + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) + parser.add_argument( '-U', '--user', default=default_user, @@ -149,53 +157,46 @@ def main(): args = parser.parse_args() - while True: + try: - try: + document = DigitalDocument( + id, + name=args.name, + description=args.description, + copyright_notice=args.copyright_notice, + copyright_holder=args.copyright_holder, + copyright_year=args.copyright_year, + license=args.license, + url=args.document_url, + keywords=args.keyword, + ) - document = DigitalDocument( - id, - name=args.name, - description=args.description, - copyright_notice=args.copyright_notice, - copyright_holder=args.copyright_holder, - copyright_year=args.copyright_year, - license=args.license, - url=args.document_url, - keywords=args.keyword, + if args.publication_organization: + org = Organization( + id=to_uri(PREF_ORG, hash(args.publication_organization)), + name=args.publication_organization, + ) + document.publication = PublicationEvent( + id = to_uri(PREF_PUBEV, str(uuid.uuid4())), + organization=org, + description=args.publication_description, + start_date=args.publication_date, + end_date=args.publication_date, ) - if args.publication_organization: - org = Organization( - id=to_uri(PREF_ORG, hash(args.publication_organization)), - name=args.publication_organization, - ) - document.publication = PublicationEvent( - id = to_uri(PREF_PUBEV, str(uuid.uuid4())), - organization=org, - description=args.publication_description, - start_date=args.publication_date, - end_date=args.publication_date, - ) + p = Loader( + url=args.url, + flow_id = args.flow_id, + user=args.user, + collection=args.collection, + metadata=document, + ) - p = Loader( - url=args.url, - user=args.user, - collection=args.collection, - metadata=document, - ) + p.load(args.files) - p.load(args.files) + except Exception as e: - print("All done.") - break - - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) + print("Exception:", e, flush=True) main() diff --git a/trustgraph-cli/scripts/tg-load-text b/trustgraph-cli/scripts/tg-load-text index 0cc221a5..94cb5ade 100755 --- a/trustgraph-cli/scripts/tg-load-text +++ b/trustgraph-cli/scripts/tg-load-text @@ -27,12 +27,13 @@ class Loader: def __init__( self, url, + flow_id, user, collection, metadata, ): - self.api = Api(url) + self.api = Api(url).flow(flow_id) self.user = user self.collection = collection @@ -59,14 +60,15 @@ class Loader: self.api.load_text( text=data, id=id, metadata=self.metadata, -# user=self.user, -# collection=self.collection, + user=self.user, + collection=self.collection, ) print(f"{file}: Loaded successfully.") except Exception as e: print(f"{file}: Failed: {str(e)}", flush=True) + raise e def main(): @@ -80,12 +82,12 @@ def main(): default=default_url, help=f'API URL (default: {default_url})', ) - - # parser.add_argument( - # '--pulsar-api-key', - # default=default_pulsar_api_key, - # help=f'Pulsar API key', - # ) + + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) parser.add_argument( '-U', '--user', @@ -154,53 +156,50 @@ def main(): args = parser.parse_args() - while True: - try: + try: - document = DigitalDocument( - id, - name=args.name, - description=args.description, - copyright_notice=args.copyright_notice, - copyright_holder=args.copyright_holder, - copyright_year=args.copyright_year, - license=args.license, - url=args.document_url, - keywords=args.keyword, + document = DigitalDocument( + id, + name=args.name, + description=args.description, + copyright_notice=args.copyright_notice, + copyright_holder=args.copyright_holder, + copyright_year=args.copyright_year, + license=args.license, + url=args.document_url, + keywords=args.keyword, + ) + + if args.publication_organization: + org = Organization( + id=to_uri(PREF_ORG, hash(args.publication_organization)), + name=args.publication_organization, + ) + document.publication = PublicationEvent( + id = to_uri(PREF_PUBEV, str(uuid.uuid4())), + organization=org, + description=args.publication_description, + start_date=args.publication_date, + end_date=args.publication_date, ) - if args.publication_organization: - org = Organization( - id=to_uri(PREF_ORG, hash(args.publication_organization)), - name=args.publication_organization, - ) - document.publication = PublicationEvent( - id = to_uri(PREF_PUBEV, str(uuid.uuid4())), - organization=org, - description=args.publication_description, - start_date=args.publication_date, - end_date=args.publication_date, - ) + p = Loader( + url = args.url, + flow_id = args.flow_id, + user = args.user, + collection = args.collection, + metadata = document, + ) - p = Loader( - url=args.url, - user=args.user, - collection=args.collection, - metadata=document, - ) + p.load(args.files) - p.load(args.files) + print("All done.") - print("All done.") - break + except Exception as e: - except Exception as e: - - print("Exception:", e, flush=True) - print("Will retry...", flush=True) - - time.sleep(10) + print("Exception:", e, flush=True) main() + diff --git a/trustgraph-cli/scripts/tg-load-turtle b/trustgraph-cli/scripts/tg-load-turtle index 3417a87d..12238c7b 100755 --- a/trustgraph-cli/scripts/tg-load-turtle +++ b/trustgraph-cli/scripts/tg-load-turtle @@ -7,7 +7,6 @@ Loads Graph embeddings into TrustGraph processing. import pulsar from pulsar.schema import JsonSchema from trustgraph.schema import Triples, Triple, Value, Metadata -from trustgraph.schema import triples_store_queue import argparse import os import time @@ -109,6 +108,12 @@ def main(): default=default_pulsar_host, help=f'Pulsar host (default: {default_pulsar_host})', ) + + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) parser.add_argument( '--pulsar-api-key', @@ -174,5 +179,6 @@ def main(): time.sleep(10) -main() +print("Not implemented.") +#main() diff --git a/trustgraph-cli/scripts/tg-processor-state b/trustgraph-cli/scripts/tg-processor-state deleted file mode 100755 index cfab00c8..00000000 --- a/trustgraph-cli/scripts/tg-processor-state +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python3 - -""" -Dump out TrustGraph processor states. -""" - -import requests -import argparse -import tabulate - -default_prometheus_url = "http://localhost:9090" - -def dump_status(prom): - - url = f"{prom}/api/v1/query?query=processor_state%7Bprocessor_state%3D%22running%22%7D" - - resp = requests.get(url) - - obj = resp.json() - - tbl = [ - [ - m["metric"]["job"], - "running" if int(m["value"][1]) > 0 else "down" - ] - for m in obj["data"]["result"] - ] - - print(tabulate.tabulate( - tbl, tablefmt="pretty", headers=["processor", "state"], - stralign="left" - )) - - -def main(): - - parser = argparse.ArgumentParser( - prog='tg-processor-state', - description=__doc__, - ) - - parser.add_argument( - '-p', '--prometheus-url', - default=default_prometheus_url, - help=f'Prometheus URL (default: {default_prometheus_url})', - ) - - args = parser.parse_args() - - try: - - dump_status(args.prometheus_url) - - except Exception as e: - - print("Exception:", e, flush=True) - -main() - diff --git a/trustgraph-cli/scripts/tg-save-doc-embeds b/trustgraph-cli/scripts/tg-save-doc-embeds index 95f8b748..ad6e92f7 100755 --- a/trustgraph-cli/scripts/tg-save-doc-embeds +++ b/trustgraph-cli/scripts/tg-save-doc-embeds @@ -27,9 +27,7 @@ async def fetch_de(running, queue, user, collection, url): async with aiohttp.ClientSession() as session: - de_url = f"{url}stream/document-embeddings" - - async with session.ws_connect(de_url) as ws: + async with session.ws_connect(url) as ws: while running.get(): @@ -117,11 +115,14 @@ async def run(running, **args): q = asyncio.Queue() + url = args["url"] + flow_id = args["flow_id"] + de_task = asyncio.create_task( fetch_de( running=running, queue=q, user=args["user"], collection=args["collection"], - url=args["url"] + "api/v1/" + url = f"{url}api/v1/flow/{flow_id}/export/document-embeddings" ) ) @@ -158,6 +159,12 @@ async def main(running): help=f'TrustGraph API URL (default: {default_url})', ) + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) + parser.add_argument( '-o', '--output-file', # Make it mandatory, difficult to over-write an existing file diff --git a/trustgraph-cli/scripts/tg-show-flow-state b/trustgraph-cli/scripts/tg-show-flow-state new file mode 100755 index 00000000..c9a8035a --- /dev/null +++ b/trustgraph-cli/scripts/tg-show-flow-state @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 + +""" +Dump out TrustGraph processor states. +""" + +import requests +import argparse +from trustgraph.api import Api +import os + +default_metrics_url = "http://localhost:8088/api/metrics" +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') + +def dump_status(metrics_url, api_url, flow_id): + + api = Api(api_url) + + flow = api.flow_get(flow_id) + class_name = flow["class-name"] + + print() + print(f"Flow {flow_id}") + show_processors(metrics_url, flow_id) + + print() + print(f"Class {class_name}") + show_processors(metrics_url, class_name) + + print() + +def show_processors(metrics_url, flow_label): + + url = f"{metrics_url}/query" + + expr = f"consumer_state=\"running\",flow=\"{flow_label}\"" + + params = { + "query": "consumer_state{" + expr + "}" + } + + resp = requests.get(url, params=params) + + obj = resp.json() + + tbl = [ + [ + m["metric"]["job"], + "\U0001f49a" if int(m["value"][1]) > 0 else "\U0000274c" + ] + for m in obj["data"]["result"] + ] + + for row in tbl: + print(f"- {row[0]:30} {row[1]}") + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-show-flow-state', + description=__doc__, + ) + + parser.add_argument( + '-f', '--flow-id', + default="0000", + help=f'Flow ID (default: 0000)' + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + parser.add_argument( + '-m', '--metrics-url', + default=default_metrics_url, + help=f'Metrics URL (default: {default_metrics_url})', + ) + + args = parser.parse_args() + + try: + + dump_status(args.metrics_url, args.api_url, args.flow_id) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/setup.py b/trustgraph-cli/setup.py index 47258648..5e50d06c 100644 --- a/trustgraph-cli/setup.py +++ b/trustgraph-cli/setup.py @@ -44,8 +44,8 @@ setuptools.setup( "websockets", ], scripts=[ - "scripts/tg-dump-msgpack", "scripts/tg-delete-flow-class", + "scripts/tg-dump-msgpack", "scripts/tg-get-flow-class", "scripts/tg-graph-show", "scripts/tg-graph-to-turtle", @@ -61,7 +61,6 @@ setuptools.setup( "scripts/tg-load-pdf", "scripts/tg-load-text", "scripts/tg-load-turtle", - "scripts/tg-processor-state", "scripts/tg-put-flow-class", "scripts/tg-save-doc-embeds", "scripts/tg-save-kg-core", @@ -69,7 +68,9 @@ setuptools.setup( "scripts/tg-set-token-costs", "scripts/tg-show-config", "scripts/tg-show-flow-classes", + "scripts/tg-show-flow-state", "scripts/tg-show-flows", + "scripts/tg-show-processor-state", "scripts/tg-show-prompts", "scripts/tg-show-token-costs", "scripts/tg-show-tools", diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/config.py b/trustgraph-flow/trustgraph/gateway/dispatch/config.py index 4b6a0439..3aeedb6f 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/config.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/config.py @@ -6,10 +6,12 @@ from ... schema import config_response_queue from . requestor import ServiceRequestor class ConfigRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout=120): + def __init__(self, pulsar_client, consumer, subscriber, timeout=120): super(ConfigRequestor, self).__init__( pulsar_client=pulsar_client, + consumer_name = consumer, + subscription = subscriber, request_queue=config_request_queue, response_queue=config_response_queue, request_schema=ConfigRequest, diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/flow.py b/trustgraph-flow/trustgraph/gateway/dispatch/flow.py index 7a2f8a39..0b38e9be 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/flow.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/flow.py @@ -6,10 +6,12 @@ from ... schema import flow_response_queue from . requestor import ServiceRequestor class FlowRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout=120): + def __init__(self, pulsar_client, consumer, subscriber, timeout=120): super(FlowRequestor, self).__init__( pulsar_client=pulsar_client, + consumer_name = consumer, + subscription = subscriber, request_queue=flow_request_queue, response_queue=flow_response_queue, request_schema=FlowRequest, diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py b/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py index 8705614a..f280b392 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/librarian.py @@ -8,10 +8,12 @@ from . serialize import serialize_document_package, serialize_document_info from . serialize import to_document_package, to_document_info, to_criteria class LibrarianRequestor(ServiceRequestor): - def __init__(self, pulsar_client, timeout=120): + def __init__(self, pulsar_client, consumer, subscriber, timeout=120): super(LibrarianRequestor, self).__init__( pulsar_client=pulsar_client, + consumer_name = consumer, + subscription = subscriber, request_queue=librarian_request_queue, response_queue=librarian_response_queue, request_schema=LibrarianRequest, diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py index f277e255..ddd396a1 100644 --- a/trustgraph-flow/trustgraph/gateway/dispatch/manager.py +++ b/trustgraph-flow/trustgraph/gateway/dispatch/manager.py @@ -27,6 +27,8 @@ from . triples_import import TriplesImport from . graph_embeddings_import import GraphEmbeddingsImport from . document_embeddings_import import DocumentEmbeddingsImport +from . mux import Mux + request_response_dispatchers = { "agent": AgentRequestor, "text-completion": TextCompletionRequestor, @@ -35,7 +37,13 @@ request_response_dispatchers = { "document-rag": DocumentRagRequestor, "embeddings": EmbeddingsRequestor, "graph-embeddings": GraphEmbeddingsQueryRequestor, - "triples-query": TriplesQueryRequestor, + "triples": TriplesQueryRequestor, +} + +global_dispatchers = { + "config": ConfigRequestor, + "flow": FlowRequestor, + "librarian": LibrarianRequestor, } sender_dispatchers = { @@ -56,14 +64,10 @@ import_dispatchers = { } class DispatcherWrapper: - def __init__(self, mgr, name, impl): - self.mgr = mgr - self.name = name - self.impl = impl - async def process(self, data, responder): - return await self.mgr.process_impl( - data, responder, self.name, self.impl - ) + def __init__(self, handler): + self.handler = handler + async def process(self, *args): + return await self.handler(*args) class DispatcherManager: @@ -85,24 +89,26 @@ class DispatcherManager: del self.flows[id] return - def dispatch_config(self): - return DispatcherWrapper(self, "config", ConfigRequestor) + def dispatch_global_service(self): + return DispatcherWrapper(self.process_global_service) - def dispatch_flow(self): - return DispatcherWrapper(self, "flow", FlowRequestor) + async def process_global_service(self, data, responder, params): - def dispatch_librarian(self): - return DispatcherWrapper(self, "librarian", LibrarianRequestor) + kind = params.get("kind") + return await self.invoke_global_service(data, responder, kind) - async def process_impl(self, data, responder, name, impl): + async def invoke_global_service(self, data, responder, kind): - key = (None, name) + key = (None, kind) if key in self.dispatchers: return await self.dispatchers[key].process(data, responder) - dispatcher = impl( - pulsar_client = self.pulsar_client + dispatcher = global_dispatchers[kind]( + pulsar_client = self.pulsar_client, + timeout = 120, + consumer = f"api-gateway-{kind}-request", + subscriber = f"api-gateway-{kind}-request", ) await dispatcher.start() @@ -111,16 +117,19 @@ class DispatcherManager: return await dispatcher.process(data, responder) - def dispatch_service(self): - return self + def dispatch_flow_import(self): + return self.process_flow_import - def dispatch_import(self): - return self.invoke_import + def dispatch_flow_export(self): + return self.process_flow_export - def dispatch_export(self): - return self.invoke_export + def dispatch_socket(self): + return self.process_socket - async def invoke_import(self, ws, running, params): + def dispatch_flow_service(self): + return DispatcherWrapper(self.process_flow_service) + + async def process_flow_import(self, ws, running, params): flow = params.get("flow") kind = params.get("kind") @@ -151,7 +160,7 @@ class DispatcherManager: return dispatcher - async def invoke_export(self, ws, running, params): + async def process_flow_export(self, ws, running, params): flow = params.get("flow") kind = params.get("kind") @@ -184,11 +193,21 @@ class DispatcherManager: return dispatcher - async def process(self, data, responder, params): + async def process_socket(self, ws, running, params): + + dispatcher = Mux(self, ws, running) + + return dispatcher + + async def process_flow_service(self, data, responder, params): flow = params.get("flow") kind = params.get("kind") + return await self.invoke_flow_service(data, responder, flow, kind) + + async def invoke_flow_service(self, data, responder, flow, kind): + if flow not in self.flows: raise RuntimeError("Invalid flow") diff --git a/trustgraph-flow/trustgraph/gateway/dispatch/mux.py b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py new file mode 100644 index 00000000..e2c5a921 --- /dev/null +++ b/trustgraph-flow/trustgraph/gateway/dispatch/mux.py @@ -0,0 +1,167 @@ + +import asyncio +import queue +import uuid + +MAX_OUTSTANDING_REQUESTS = 15 +WORKER_CLOSE_WAIT = 0.01 +START_REQUEST_WAIT = 0.1 + +# This buffers requests until task start, so short-lived +MAX_QUEUE_SIZE = 10 + +class Mux: + + def __init__(self, dispatcher_manager, ws, running): + + self.dispatcher_manager = dispatcher_manager + self.ws = ws + self.running = running + + self.q = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) + + async def destroy(self): + + self.running.stop() + + if self.ws: + await self.ws.close() + + async def receive(self, msg): + + try: + + data = msg.json() + + if "request" not in data: + raise RuntimeError("Bad message") + + if "id" not in data: + raise RuntimeError("Bad message") + + await self.q.put(( + data["id"], data.get("flow"), + data["service"], + data["request"] + )) + + except Exception as e: + print("receive exception:", str(e), flush=True) + await self.ws.send_json({"error": str(e)}) + + async def maybe_tidy_workers(self, workers): + + while True: + + try: + + await asyncio.wait_for( + asyncio.shield(workers[0]), + WORKER_CLOSE_WAIT + ) + + # worker[0] now stopped + # FIXME: Delete reference??? + + workers.pop(0) + + if len(workers) == 0: + break + + # Loop iterates to try the next worker + + except TimeoutError: + # worker[0] still running, move on + break + + async def start_request_task(self, ws, id, flow, svc, request, workers): + + # Wait for outstanding requests to go below MAX_OUTSTANDING_REQUESTS + while len(workers) > MAX_OUTSTANDING_REQUESTS: + + # Fixes deadlock + # FIXME: Put it in its own loop + await asyncio.sleep(START_REQUEST_WAIT) + + await self.maybe_tidy_workers(workers) + + async def responder(resp, fin): + await self.ws.send_json({ + "id": id, + "response": resp, + "complete": fin, + }) + + worker = asyncio.create_task( + self.request_task(request, responder, flow, svc) + ) + + workers.append(worker) + + async def request_task(self, request, responder, flow, svc): + + try: + + if flow: + + await self.dispatcher_manager.invoke_flow_service( + request, responder, flow, svc + ) + + else: + + await self.dispatcher_manager.invoke_global_service( + request, responder, svc + ) + + except Exception as e: + await self.ws.send_json({"error": str(e)}) + + async def run(self): + + # Worker threads, servicing + workers = [] + + while self.running.get(): + + try: + + if len(workers) > 0: + await self.maybe_tidy_workers(workers) + + # Get next request on queue + item = await asyncio.wait_for(self.q.get(), 1) + id, flow, svc, request = item + + except TimeoutError: + continue + + except Exception as e: + # This is an internal working error, may not be recoverable + print("run prepare exception:", e) + await self.ws.send_json({"id": id, "error": str(e)}) + self.running.stop() + + if self.ws: + self.ws.close() + self.ws = None + + break + + try: + print(id, svc, request) + + await self.start_request_task( + self.ws, id, flow, svc, request, workers + ) + + except Exception as e: + print("Exception2:", e) + await self.ws.send_json({"error": str(e)}) + + self.running.stop() + + if self.ws: + self.ws.close() + self.ws = None + diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py index cee904a5..75a39766 100644 --- a/trustgraph-flow/trustgraph/gateway/endpoint/manager.py +++ b/trustgraph-flow/trustgraph/gateway/endpoint/manager.py @@ -23,37 +23,34 @@ class EndpointManager: } self.endpoints = [ - ConstantEndpoint( - endpoint_path = "/api/v1/librarian", auth = auth, - dispatcher = dispatcher_manager.dispatch_librarian(), - ), - ConstantEndpoint( - endpoint_path = "/api/v1/config", auth = auth, - dispatcher = dispatcher_manager.dispatch_config(), - ), - ConstantEndpoint( - endpoint_path = "/api/v1/flow", auth = auth, - dispatcher = dispatcher_manager.dispatch_flow(), - ), MetricsEndpoint( - endpoint_path = "/api/v1/metrics", + endpoint_path = "/api/metrics", prometheus_url = prometheus_url, auth = auth, ), + VariableEndpoint( + endpoint_path = "/api/v1/{kind}", auth = auth, + dispatcher = dispatcher_manager.dispatch_global_service(), + ), + SocketEndpoint( + endpoint_path = "/api/v1/socket", + auth = auth, + dispatcher = dispatcher_manager.dispatch_socket() + ), VariableEndpoint( endpoint_path = "/api/v1/flow/{flow}/service/{kind}", auth = auth, - dispatcher = dispatcher_manager.dispatch_service(), + dispatcher = dispatcher_manager.dispatch_flow_service(), ), SocketEndpoint( endpoint_path = "/api/v1/flow/{flow}/import/{kind}", auth = auth, - dispatcher = dispatcher_manager.dispatch_import() + dispatcher = dispatcher_manager.dispatch_flow_import() ), SocketEndpoint( endpoint_path = "/api/v1/flow/{flow}/export/{kind}", auth = auth, - dispatcher = dispatcher_manager.dispatch_export() + dispatcher = dispatcher_manager.dispatch_flow_export() ), ] diff --git a/trustgraph-flow/trustgraph/gateway/endpoint/mux.py b/trustgraph-flow/trustgraph/gateway/endpoint/mux.py deleted file mode 100644 index 1afc3225..00000000 --- a/trustgraph-flow/trustgraph/gateway/endpoint/mux.py +++ /dev/null @@ -1,167 +0,0 @@ - -import asyncio -import queue -import uuid -from aiohttp import web, WSMsgType - -from . socket import SocketEndpoint - -MAX_OUTSTANDING_REQUESTS = 15 -WORKER_CLOSE_WAIT = 0.01 -START_REQUEST_WAIT = 0.1 - -# This buffers requests until task start, so short-lived -MAX_QUEUE_SIZE = 10 - -class MuxEndpoint(SocketEndpoint): - - def __init__( - self, pulsar_client, auth, - services, - path="/api/v1/socket", - ): - - super(MuxEndpoint, self).__init__( - endpoint_path=path, auth=auth, - ) - - self.services = services - - async def start(self): - pass - - async def maybe_tidy_workers(self, workers): - - while True: - - try: - - await asyncio.wait_for( - asyncio.shield(workers[0]), - WORKER_CLOSE_WAIT - ) - - # worker[0] now stopped - # FIXME: Delete reference??? - - workers.pop(0) - - if len(workers) == 0: - break - - # Loop iterates to try the next worker - - except TimeoutError: - # worker[0] still running, move on - break - - async def start_request_task(self, ws, id, svc, request, workers): - - if svc not in self.services: - await ws.send_json({"id": id, "error": "Service not recognised"}) - return - - requestor = self.services[svc] - - async def responder(resp, fin): - await ws.send_json({ - "id": id, - "response": resp, - "complete": fin, - }) - - # Wait for outstanding requests to go below MAX_OUTSTANDING_REQUESTS - while len(workers) > MAX_OUTSTANDING_REQUESTS: - - # Fixes deadlock - # FIXME: Put it in its own loop - await asyncio.sleep(START_REQUEST_WAIT) - - await self.maybe_tidy_workers(workers) - - worker = asyncio.create_task( - requestor.process(request, responder) - ) - - workers.append(worker) - - async def async_thread(self, ws, running, q): - - # Worker threads, servicing - workers = [] - - while running.get(): - - try: - - if len(workers) > 0: - await self.maybe_tidy_workers(workers) - - # Get next request on queue - id, svc, request = await asyncio.wait_for(q.get(), 1) - - except TimeoutError: - continue - - except Exception as e: - # This is an internal working error, may not be recoverable - print("Exception:", e) - await ws.send_json({"id": id, "error": str(e)}) - break - - try: - print(id, svc, request) - await self.start_request_task(ws, id, svc, request, workers) - - except Exception as e: - print("Exception2:", e) - await ws.send_json({"error": str(e)}) - - running.stop() - - async def listener(self, ws, running): - - # The outstanding request queue, max size is MAX_QUEUE_SIZE - q = asyncio.Queue(maxsize=MAX_QUEUE_SIZE) - - async_task = asyncio.create_task(self.async_thread( - ws, running, q - )) - - async for msg in ws: - - # On error, finish - if msg.type == WSMsgType.TEXT: - - try: - - data = msg.json() - - if data["service"] not in self.services: - raise RuntimeError("Bad service") - - if "request" not in data: - raise RuntimeError("Bad message") - - if "id" not in data: - raise RuntimeError("Bad message") - - await q.put( - (data["id"], data["service"], data["request"]) - ) - - except Exception as e: - - await ws.send_json({"error": str(e)}) - continue - - elif msg.type == WSMsgType.ERROR: - break - elif msg.type == WSMsgType.CLOSE: - break - else: - break - - running.stop() - - await async_task