From 1d222235d3ae6498856f83657d315f50cd767778 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Wed, 2 Apr 2025 13:52:33 +0100 Subject: [PATCH] Configuration initialisation (#335) * - Fixed error reporting in config - Updated tg-init-pulsar to be able to load initial config to config-svc - Tweaked API naming and added more config calls * Tools to dump out prompts and agent tools --- Makefile | 6 + trustgraph-base/trustgraph/api/api.py | 171 +++++++++++++++++- .../trustgraph/clients/config_client.py | 22 +-- trustgraph-cli/scripts/tg-init-pulsar | 109 ++++++++++- trustgraph-cli/scripts/tg-show-config | 2 +- trustgraph-cli/scripts/tg-show-prompts | 96 ++++++++++ trustgraph-cli/scripts/tg-show-tools | 86 +++++++++ trustgraph-cli/setup.py | 2 + .../trustgraph/config/service/service.py | 14 +- 9 files changed, 471 insertions(+), 37 deletions(-) create mode 100755 trustgraph-cli/scripts/tg-show-prompts create mode 100755 trustgraph-cli/scripts/tg-show-tools diff --git a/Makefile b/Makefile index 1fae97f6..4f4de9d2 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,12 @@ container: update-package-versions ${DOCKER} build -f containers/Containerfile.ocr \ -t ${CONTAINER_BASE}/trustgraph-ocr:${VERSION} . +basic-containers: update-package-versions + ${DOCKER} build -f containers/Containerfile.base \ + -t ${CONTAINER_BASE}/trustgraph-base:${VERSION} . + ${DOCKER} build -f containers/Containerfile.flow \ + -t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} . + container.ocr: ${DOCKER} build -f containers/Containerfile.ocr \ -t ${CONTAINER_BASE}/trustgraph-ocr:${VERSION} . diff --git a/trustgraph-base/trustgraph/api/api.py b/trustgraph-base/trustgraph/api/api.py index 08742ee9..ddc6b2c3 100644 --- a/trustgraph-base/trustgraph/api/api.py +++ b/trustgraph-base/trustgraph/api/api.py @@ -18,6 +18,17 @@ class Triple: p : str o : str +@dataclasses.dataclass +class ConfigKey: + type : str + key : str + +@dataclasses.dataclass +class ConfigValue: + type : str + key : str + value : str + class Api: def __init__(self, url="http://localhost:8088/"): @@ -35,7 +46,7 @@ class Api: try: msg = response["error"]["message"] - tp = response["error"]["message"] + tp = response["error"]["type"] except: raise ApplicationException( "Error, but the error object is broken" @@ -66,7 +77,7 @@ class Api: except: raise ProtocolException(f"Expected JSON response") - self.check_error(resp) + self.check_error(object) try: return object["response"] @@ -95,7 +106,7 @@ class Api: except: raise ProtocolException(f"Expected JSON response") - self.check_error(resp) + self.check_error(object) try: return object["answer"] @@ -134,7 +145,7 @@ class Api: except: raise ProtocolException(f"Expected JSON response") - self.check_error(resp) + self.check_error(object) try: return object["response"] @@ -169,7 +180,7 @@ class Api: except: raise ProtocolException(f"Expected JSON response") - self.check_error(resp) + self.check_error(object) try: return object["response"] @@ -198,7 +209,7 @@ class Api: except: raise ProtocolException(f"Expected JSON response") - self.check_error(resp) + self.check_error(object) try: return object["vectors"] @@ -228,7 +239,7 @@ class Api: except: raise ProtocolException("Expected JSON response") - self.check_error(resp) + self.check_error(object) if "text" in object: return object["text"] @@ -280,7 +291,7 @@ class Api: except: raise ProtocolException("Expected JSON response") - self.check_error(resp) + self.check_error(object) if "response" not in object: raise ProtocolException("Response not formatted correctly") @@ -382,7 +393,7 @@ class Api: if resp.status_code != 200: raise ProtocolException(f"Status code {resp.status_code}") - def get_config(self): + def config_all(self): # The input consists of system and prompt strings input = { @@ -404,10 +415,150 @@ class Api: except: raise ProtocolException(f"Expected JSON response") - self.check_error(resp) + self.check_error(object) try: return object["config"], object["version"] except: raise ProtocolException(f"Response not formatted correctly") + def config_get(self, keys): + + # The input consists of system and prompt strings + input = { + "operation": "get", + "keys": [ + { "type": k.type, "key": k.key } + for k in keys + ] + } + + url = f"{self.url}config" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + try: + return [ + ConfigValue( + type = v["type"], + key = v["key"], + value = v["value"] + ) + for v in object["values"] + ] + except: + raise ProtocolException(f"Response not formatted correctly") + + def config_put(self, values): + + # The input consists of system and prompt strings + input = { + "operation": "put", + "values": [ + { "type": v.type, "key": v.key, "value": v.value } + for v in values + ] + } + + url = f"{self.url}config" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + try: + return None + except: + raise ProtocolException(f"Response not formatted correctly") + + def config_list(self, type): + + # The input consists of system and prompt strings + input = { + "operation": "list", + "type": type, + } + + url = f"{self.url}config" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + try: + return object["directory"] + except: + raise ProtocolException(f"Response not formatted correctly") + + def config_getvalues(self, type): + + # The input consists of system and prompt strings + input = { + "operation": "getvalues", + "type": type, + } + + url = f"{self.url}config" + + # Invoke the API, input is passed as JSON + resp = requests.post(url, json=input) + + # Should be a 200 status code + if resp.status_code != 200: + raise ProtocolException(f"Status code {resp.status_code}") + + try: + # Parse the response as JSON + object = resp.json() + except: + raise ProtocolException(f"Expected JSON response") + + self.check_error(object) + + try: + return [ + ConfigValue( + type = v["type"], + key = v["key"], + value = v["value"] + ) + for v in object["values"] + ] + except: + raise ProtocolException(f"Response not formatted correctly") + diff --git a/trustgraph-base/trustgraph/clients/config_client.py b/trustgraph-base/trustgraph/clients/config_client.py index eb797071..ed8c704a 100644 --- a/trustgraph-base/trustgraph/clients/config_client.py +++ b/trustgraph-base/trustgraph/clients/config_client.py @@ -61,7 +61,7 @@ class ConfigClient(BaseClient): listener=listener, ) - def request_get(self, keys, timeout=300): + def get(self, keys, timeout=300): resp = self.call( id=id, @@ -85,7 +85,7 @@ class ConfigClient(BaseClient): for v in resp.values ] - def request_list(self, type, timeout=300): + def list(self, type, timeout=300): resp = self.call( id=id, @@ -96,7 +96,7 @@ class ConfigClient(BaseClient): return resp.directory - def request_getvalues(self, type, timeout=300): + def getvalues(self, type, timeout=300): resp = self.call( id=id, @@ -114,7 +114,7 @@ class ConfigClient(BaseClient): for v in resp.values ] - def request_delete(self, keys, timeout=300): + def delete(self, keys, timeout=300): resp = self.call( id=id, @@ -131,25 +131,25 @@ class ConfigClient(BaseClient): return None - def request_put(self, value, timeout=300): + def put(self, values, timeout=300): resp = self.call( id=id, operation="put", values=[ ConfigValue( - type = k["type"], - key = k["key"], - value = k["value"] + type = v["type"], + key = v["key"], + value = v["value"] ) - for k in keys + for v in values ], timeout=timeout ) return None - def request_config(self, timeout=300): + def config(self, timeout=300): resp = self.call( id=id, @@ -157,5 +157,5 @@ class ConfigClient(BaseClient): timeout=timeout ) - return resp.config + return resp.config, resp.version diff --git a/trustgraph-cli/scripts/tg-init-pulsar b/trustgraph-cli/scripts/tg-init-pulsar index 69a13411..c7d447bd 100755 --- a/trustgraph-cli/scripts/tg-init-pulsar +++ b/trustgraph-cli/scripts/tg-init-pulsar @@ -7,8 +7,13 @@ Initialises Pulsar with Trustgraph tenant / namespaces & policy. import requests import time import argparse +import json + +from trustgraph.clients.config_client import ConfigClient default_pulsar_admin_url = "http://pulsar:8080" +default_pulsar_host = "pulsar://pulsar:6650" +subscriber = "tg-init-pulsar" def get_clusters(url): @@ -62,17 +67,68 @@ def ensure_namespace(url, tenant, namespace, config): print(f"Namespace {tenant}/{namespace} created.", flush=True) -def init(url, tenant="tg"): +def ensure_config(config, pulsar_host, pulsar_api_key): - clusters = get_clusters(url) + cli = ConfigClient( + subscriber=subscriber, + pulsar_host=pulsar_host, + pulsar_api_key=pulsar_api_key, + ) - ensure_tenant(url, tenant, clusters) + while True: - ensure_namespace(url, tenant, "flow", {}) + try: - ensure_namespace(url, tenant, "request", {}) + print("Get current config...", flush=True) + current, version = cli.config(timeout=5) - ensure_namespace(url, tenant, "response", { + except Exception as e: + + print("Exception:", e, flush=True) + time.sleep(2) + print("Retrying...", flush=True) + continue + + print("Current config version is", version, flush=True) + + if version != 0: + print("Already updated, not updating config. Done.", flush=True) + return + + print("Config is version 0, updating...", flush=True) + + batch = [] + + for type in config: + for key in config[type]: + print(f"Adding {type}/{key} to update.", flush=True) + batch.append({ + "type": type, + "key": key, + "value": json.dumps(config[type][key]), + }) + + try: + cli.put(batch, timeout=10) + print("Update succeeded.", flush=True) + break + except Exception as e: + print("Exception:", e, flush=True) + time.sleep(2) + print("Retrying...", flush=True) + continue + +def init(pulsar_admin_url, pulsar_host, pulsar_api_key, config, tenant): + + clusters = get_clusters(pulsar_admin_url) + + ensure_tenant(pulsar_admin_url, tenant, clusters) + + ensure_namespace(pulsar_admin_url, tenant, "flow", {}) + + ensure_namespace(pulsar_admin_url, tenant, "request", {}) + + ensure_namespace(pulsar_admin_url, tenant, "response", { "retention_policies": { "retentionSizeInMB": -1, "retentionTimeInMinutes": 3, @@ -80,7 +136,7 @@ def init(url, tenant="tg"): } }) - ensure_namespace(url, tenant, "config", { + ensure_namespace(pulsar_admin_url, tenant, "config", { "retention_policies": { "retentionSizeInMB": 10, "retentionTimeInMinutes": -1, @@ -88,6 +144,21 @@ def init(url, tenant="tg"): } }) + if config is not None: + + try: + print("Decoding config...", flush=True) + dec = json.loads(config) + print("Decoded.", flush=True) + except Exception as e: + print("Exception:", e, flush=True) + raise e + + ensure_config(dec, pulsar_host, pulsar_api_key) + + else: + print("No config to update.", flush=True) + def main(): parser = argparse.ArgumentParser( @@ -101,6 +172,28 @@ def main(): help=f'Pulsar admin URL (default: {default_pulsar_admin_url})', ) + parser.add_argument( + '--pulsar-host', + default=default_pulsar_host, + help=f'Pulsar host (default: {default_pulsar_host})', + ) + + parser.add_argument( + '--pulsar-api-key', + help=f'Pulsar API key', + ) + + parser.add_argument( + '-c', '--config', + help=f'Initial configuration to load', + ) + + parser.add_argument( + '-t', '--tenant', + default="tg", + help=f'Tenant (default: tg)', + ) + args = parser.parse_args() while True: @@ -112,7 +205,7 @@ def main(): f"Initialising with Pulsar {args.pulsar_admin_url}...", flush=True ) - init(args.pulsar_admin_url, "tg") + init(**vars(args)) print("Initialisation complete.", flush=True) break diff --git a/trustgraph-cli/scripts/tg-show-config b/trustgraph-cli/scripts/tg-show-config index 43d2d100..2fb5ef94 100755 --- a/trustgraph-cli/scripts/tg-show-config +++ b/trustgraph-cli/scripts/tg-show-config @@ -15,7 +15,7 @@ def show_config(url): api = Api(url) - config, version = api.get_config() + config, version = api.config_all() print("Version:", version) print(json.dumps(config, indent=4)) diff --git a/trustgraph-cli/scripts/tg-show-prompts b/trustgraph-cli/scripts/tg-show-prompts new file mode 100755 index 00000000..bb1cc6b3 --- /dev/null +++ b/trustgraph-cli/scripts/tg-show-prompts @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 + +""" +Dumps out the current prompts +""" + +import argparse +import os +from trustgraph.api import Api, ConfigKey +import json +import tabulate +import textwrap + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') + +def show_config(url): + + api = Api(url) + + values = api.config_get([ + ConfigKey(type="prompt", key="system"), + ConfigKey(type="prompt", key="template-index") + ]) + + system = json.loads(values[0].value) + ix = json.loads(values[1].value) + + values = api.config_get([ + ConfigKey(type="prompt", key=f"template.{v}") + for v in ix + ]) + + print() + + print("System prompt:") + + print(tabulate.tabulate( + [["prompt", system]], + tablefmt="pretty", + maxcolwidths=[None, 70], + stralign="left" + )) + + for n, key in enumerate(ix): + + data = json.loads(values[n].value) + + table = [] + + table.append(("prompt", data["prompt"])) + + if "response-type" in data: + table.append(("response", data["response-type"])) + + if "schema" in data: + table.append(("schema", data["schema"])) + + print() + print(key + ":") + + print(tabulate.tabulate( + table, + tablefmt="pretty", + maxcolwidths=[None, 70], + stralign="left" + )) + + print() + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-show-prompts', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + args = parser.parse_args() + + try: + + show_config( + url=args.api_url, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/scripts/tg-show-tools b/trustgraph-cli/scripts/tg-show-tools new file mode 100755 index 00000000..389abbc6 --- /dev/null +++ b/trustgraph-cli/scripts/tg-show-tools @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 + +""" +Dumps out the current agent tools +""" + +import argparse +import os +from trustgraph.api import Api, ConfigKey +import json +import tabulate +import textwrap + +default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') + +def show_config(url): + + api = Api(url) + + values = api.config_get([ + ConfigKey(type="agent", key="tool-index") + ]) + + ix = json.loads(values[0].value) + + values = api.config_get([ + ConfigKey(type="agent", key=f"tool.{v}") + for v in ix + ]) + + for n, key in enumerate(ix): + + data = json.loads(values[n].value) + + table = [] + + table.append(("id", data["id"])) + table.append(("name", data["name"])) + table.append(("description", data["description"])) + + for n, arg in enumerate(data["arguments"]): + table.append(( + f"arg {n}", + f"{arg['name']}: {arg['type']}\n{arg['description']}" + )) + + + print() + print(key + ":") + + print(tabulate.tabulate( + table, + tablefmt="pretty", + maxcolwidths=[None, 70], + stralign="left" + )) + + print() + +def main(): + + parser = argparse.ArgumentParser( + prog='tg-show-prompts', + description=__doc__, + ) + + parser.add_argument( + '-u', '--api-url', + default=default_url, + help=f'API URL (default: {default_url})', + ) + + args = parser.parse_args() + + try: + + show_config( + url=args.api_url, + ) + + except Exception as e: + + print("Exception:", e, flush=True) + +main() + diff --git a/trustgraph-cli/setup.py b/trustgraph-cli/setup.py index 406e2c24..0ef523ea 100644 --- a/trustgraph-cli/setup.py +++ b/trustgraph-cli/setup.py @@ -63,5 +63,7 @@ setuptools.setup( "scripts/tg-save-kg-core", "scripts/tg-save-doc-embeds", "scripts/tg-show-config", + "scripts/tg-show-tools", + "scripts/tg-show-prompts", ] ) diff --git a/trustgraph-flow/trustgraph/config/service/service.py b/trustgraph-flow/trustgraph/config/service/service.py index 13c28a96..ee0c960e 100644 --- a/trustgraph-flow/trustgraph/config/service/service.py +++ b/trustgraph-flow/trustgraph/config/service/service.py @@ -80,7 +80,7 @@ class Processor(ConsumerProducer): directory = None, config = None, error = Error( - code = "key-error", + type = "key-error", message = f"Key error" ) ) @@ -112,8 +112,8 @@ class Processor(ConsumerProducer): directory = None, config = None, error = Error( - code="key-error", - message="No such type", + type = "key-error", + message = "No such type", ), ) @@ -135,7 +135,7 @@ class Processor(ConsumerProducer): directory = None, config = None, error = Error( - code = "key-error", + type = "key-error", message = f"Key error" ) ) @@ -167,7 +167,7 @@ class Processor(ConsumerProducer): directory = None, config = None, error = Error( - code = "key-error", + type = "key-error", message = f"Key error" ) ) @@ -271,8 +271,8 @@ class Processor(ConsumerProducer): directory=None, values=None, error=Error( - code="bad-operation", - message="Bad operation" + type = "bad-operation", + message = "Bad operation" ) )