Configuration initialisation (#335)

* - Fixed error reporting in config
- Updated tg-init-pulsar to be able to load initial config to config-svc
- Tweaked API naming and added more config calls

* Tools to dump out prompts and agent tools
This commit is contained in:
cybermaggedon 2025-04-02 13:52:33 +01:00 committed by GitHub
parent a2c64cad4a
commit 1d222235d3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 471 additions and 37 deletions

View file

@ -60,6 +60,12 @@ container: update-package-versions
${DOCKER} build -f containers/Containerfile.ocr \
-t ${CONTAINER_BASE}/trustgraph-ocr:${VERSION} .
basic-containers: update-package-versions
${DOCKER} build -f containers/Containerfile.base \
-t ${CONTAINER_BASE}/trustgraph-base:${VERSION} .
${DOCKER} build -f containers/Containerfile.flow \
-t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} .
container.ocr:
${DOCKER} build -f containers/Containerfile.ocr \
-t ${CONTAINER_BASE}/trustgraph-ocr:${VERSION} .

View file

@ -18,6 +18,17 @@ class Triple:
p : str
o : str
@dataclasses.dataclass
class ConfigKey:
type : str
key : str
@dataclasses.dataclass
class ConfigValue:
type : str
key : str
value : str
class Api:
def __init__(self, url="http://localhost:8088/"):
@ -35,7 +46,7 @@ class Api:
try:
msg = response["error"]["message"]
tp = response["error"]["message"]
tp = response["error"]["type"]
except:
raise ApplicationException(
"Error, but the error object is broken"
@ -66,7 +77,7 @@ class Api:
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(resp)
self.check_error(object)
try:
return object["response"]
@ -95,7 +106,7 @@ class Api:
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(resp)
self.check_error(object)
try:
return object["answer"]
@ -134,7 +145,7 @@ class Api:
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(resp)
self.check_error(object)
try:
return object["response"]
@ -169,7 +180,7 @@ class Api:
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(resp)
self.check_error(object)
try:
return object["response"]
@ -198,7 +209,7 @@ class Api:
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(resp)
self.check_error(object)
try:
return object["vectors"]
@ -228,7 +239,7 @@ class Api:
except:
raise ProtocolException("Expected JSON response")
self.check_error(resp)
self.check_error(object)
if "text" in object:
return object["text"]
@ -280,7 +291,7 @@ class Api:
except:
raise ProtocolException("Expected JSON response")
self.check_error(resp)
self.check_error(object)
if "response" not in object:
raise ProtocolException("Response not formatted correctly")
@ -382,7 +393,7 @@ class Api:
if resp.status_code != 200:
raise ProtocolException(f"Status code {resp.status_code}")
def get_config(self):
def config_all(self):
# The input consists of system and prompt strings
input = {
@ -404,10 +415,150 @@ class Api:
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(resp)
self.check_error(object)
try:
return object["config"], object["version"]
except:
raise ProtocolException(f"Response not formatted correctly")
def config_get(self, keys):
# The input consists of system and prompt strings
input = {
"operation": "get",
"keys": [
{ "type": k.type, "key": k.key }
for k in keys
]
}
url = f"{self.url}config"
# Invoke the API, input is passed as JSON
resp = requests.post(url, json=input)
# Should be a 200 status code
if resp.status_code != 200:
raise ProtocolException(f"Status code {resp.status_code}")
try:
# Parse the response as JSON
object = resp.json()
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(object)
try:
return [
ConfigValue(
type = v["type"],
key = v["key"],
value = v["value"]
)
for v in object["values"]
]
except:
raise ProtocolException(f"Response not formatted correctly")
def config_put(self, values):
# The input consists of system and prompt strings
input = {
"operation": "put",
"values": [
{ "type": v.type, "key": v.key, "value": v.value }
for v in values
]
}
url = f"{self.url}config"
# Invoke the API, input is passed as JSON
resp = requests.post(url, json=input)
# Should be a 200 status code
if resp.status_code != 200:
raise ProtocolException(f"Status code {resp.status_code}")
try:
# Parse the response as JSON
object = resp.json()
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(object)
try:
return None
except:
raise ProtocolException(f"Response not formatted correctly")
def config_list(self, type):
# The input consists of system and prompt strings
input = {
"operation": "list",
"type": type,
}
url = f"{self.url}config"
# Invoke the API, input is passed as JSON
resp = requests.post(url, json=input)
# Should be a 200 status code
if resp.status_code != 200:
raise ProtocolException(f"Status code {resp.status_code}")
try:
# Parse the response as JSON
object = resp.json()
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(object)
try:
return object["directory"]
except:
raise ProtocolException(f"Response not formatted correctly")
def config_getvalues(self, type):
# The input consists of system and prompt strings
input = {
"operation": "getvalues",
"type": type,
}
url = f"{self.url}config"
# Invoke the API, input is passed as JSON
resp = requests.post(url, json=input)
# Should be a 200 status code
if resp.status_code != 200:
raise ProtocolException(f"Status code {resp.status_code}")
try:
# Parse the response as JSON
object = resp.json()
except:
raise ProtocolException(f"Expected JSON response")
self.check_error(object)
try:
return [
ConfigValue(
type = v["type"],
key = v["key"],
value = v["value"]
)
for v in object["values"]
]
except:
raise ProtocolException(f"Response not formatted correctly")

View file

@ -61,7 +61,7 @@ class ConfigClient(BaseClient):
listener=listener,
)
def request_get(self, keys, timeout=300):
def get(self, keys, timeout=300):
resp = self.call(
id=id,
@ -85,7 +85,7 @@ class ConfigClient(BaseClient):
for v in resp.values
]
def request_list(self, type, timeout=300):
def list(self, type, timeout=300):
resp = self.call(
id=id,
@ -96,7 +96,7 @@ class ConfigClient(BaseClient):
return resp.directory
def request_getvalues(self, type, timeout=300):
def getvalues(self, type, timeout=300):
resp = self.call(
id=id,
@ -114,7 +114,7 @@ class ConfigClient(BaseClient):
for v in resp.values
]
def request_delete(self, keys, timeout=300):
def delete(self, keys, timeout=300):
resp = self.call(
id=id,
@ -131,25 +131,25 @@ class ConfigClient(BaseClient):
return None
def request_put(self, value, timeout=300):
def put(self, values, timeout=300):
resp = self.call(
id=id,
operation="put",
values=[
ConfigValue(
type = k["type"],
key = k["key"],
value = k["value"]
type = v["type"],
key = v["key"],
value = v["value"]
)
for k in keys
for v in values
],
timeout=timeout
)
return None
def request_config(self, timeout=300):
def config(self, timeout=300):
resp = self.call(
id=id,
@ -157,5 +157,5 @@ class ConfigClient(BaseClient):
timeout=timeout
)
return resp.config
return resp.config, resp.version

View file

@ -7,8 +7,13 @@ Initialises Pulsar with Trustgraph tenant / namespaces & policy.
import requests
import time
import argparse
import json
from trustgraph.clients.config_client import ConfigClient
default_pulsar_admin_url = "http://pulsar:8080"
default_pulsar_host = "pulsar://pulsar:6650"
subscriber = "tg-init-pulsar"
def get_clusters(url):
@ -62,17 +67,68 @@ def ensure_namespace(url, tenant, namespace, config):
print(f"Namespace {tenant}/{namespace} created.", flush=True)
def init(url, tenant="tg"):
def ensure_config(config, pulsar_host, pulsar_api_key):
clusters = get_clusters(url)
cli = ConfigClient(
subscriber=subscriber,
pulsar_host=pulsar_host,
pulsar_api_key=pulsar_api_key,
)
ensure_tenant(url, tenant, clusters)
while True:
ensure_namespace(url, tenant, "flow", {})
try:
ensure_namespace(url, tenant, "request", {})
print("Get current config...", flush=True)
current, version = cli.config(timeout=5)
ensure_namespace(url, tenant, "response", {
except Exception as e:
print("Exception:", e, flush=True)
time.sleep(2)
print("Retrying...", flush=True)
continue
print("Current config version is", version, flush=True)
if version != 0:
print("Already updated, not updating config. Done.", flush=True)
return
print("Config is version 0, updating...", flush=True)
batch = []
for type in config:
for key in config[type]:
print(f"Adding {type}/{key} to update.", flush=True)
batch.append({
"type": type,
"key": key,
"value": json.dumps(config[type][key]),
})
try:
cli.put(batch, timeout=10)
print("Update succeeded.", flush=True)
break
except Exception as e:
print("Exception:", e, flush=True)
time.sleep(2)
print("Retrying...", flush=True)
continue
def init(pulsar_admin_url, pulsar_host, pulsar_api_key, config, tenant):
clusters = get_clusters(pulsar_admin_url)
ensure_tenant(pulsar_admin_url, tenant, clusters)
ensure_namespace(pulsar_admin_url, tenant, "flow", {})
ensure_namespace(pulsar_admin_url, tenant, "request", {})
ensure_namespace(pulsar_admin_url, tenant, "response", {
"retention_policies": {
"retentionSizeInMB": -1,
"retentionTimeInMinutes": 3,
@ -80,7 +136,7 @@ def init(url, tenant="tg"):
}
})
ensure_namespace(url, tenant, "config", {
ensure_namespace(pulsar_admin_url, tenant, "config", {
"retention_policies": {
"retentionSizeInMB": 10,
"retentionTimeInMinutes": -1,
@ -88,6 +144,21 @@ def init(url, tenant="tg"):
}
})
if config is not None:
try:
print("Decoding config...", flush=True)
dec = json.loads(config)
print("Decoded.", flush=True)
except Exception as e:
print("Exception:", e, flush=True)
raise e
ensure_config(dec, pulsar_host, pulsar_api_key)
else:
print("No config to update.", flush=True)
def main():
parser = argparse.ArgumentParser(
@ -101,6 +172,28 @@ def main():
help=f'Pulsar admin URL (default: {default_pulsar_admin_url})',
)
parser.add_argument(
'--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'--pulsar-api-key',
help=f'Pulsar API key',
)
parser.add_argument(
'-c', '--config',
help=f'Initial configuration to load',
)
parser.add_argument(
'-t', '--tenant',
default="tg",
help=f'Tenant (default: tg)',
)
args = parser.parse_args()
while True:
@ -112,7 +205,7 @@ def main():
f"Initialising with Pulsar {args.pulsar_admin_url}...",
flush=True
)
init(args.pulsar_admin_url, "tg")
init(**vars(args))
print("Initialisation complete.", flush=True)
break

View file

@ -15,7 +15,7 @@ def show_config(url):
api = Api(url)
config, version = api.get_config()
config, version = api.config_all()
print("Version:", version)
print(json.dumps(config, indent=4))

View file

@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""
Dumps out the current prompts
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey
import json
import tabulate
import textwrap
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def show_config(url):
api = Api(url)
values = api.config_get([
ConfigKey(type="prompt", key="system"),
ConfigKey(type="prompt", key="template-index")
])
system = json.loads(values[0].value)
ix = json.loads(values[1].value)
values = api.config_get([
ConfigKey(type="prompt", key=f"template.{v}")
for v in ix
])
print()
print("System prompt:")
print(tabulate.tabulate(
[["prompt", system]],
tablefmt="pretty",
maxcolwidths=[None, 70],
stralign="left"
))
for n, key in enumerate(ix):
data = json.loads(values[n].value)
table = []
table.append(("prompt", data["prompt"]))
if "response-type" in data:
table.append(("response", data["response-type"]))
if "schema" in data:
table.append(("schema", data["schema"]))
print()
print(key + ":")
print(tabulate.tabulate(
table,
tablefmt="pretty",
maxcolwidths=[None, 70],
stralign="left"
))
print()
def main():
parser = argparse.ArgumentParser(
prog='tg-show-prompts',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
show_config(
url=args.api_url,
)
except Exception as e:
print("Exception:", e, flush=True)
main()

View file

@ -0,0 +1,86 @@
#!/usr/bin/env python3
"""
Dumps out the current agent tools
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey
import json
import tabulate
import textwrap
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def show_config(url):
api = Api(url)
values = api.config_get([
ConfigKey(type="agent", key="tool-index")
])
ix = json.loads(values[0].value)
values = api.config_get([
ConfigKey(type="agent", key=f"tool.{v}")
for v in ix
])
for n, key in enumerate(ix):
data = json.loads(values[n].value)
table = []
table.append(("id", data["id"]))
table.append(("name", data["name"]))
table.append(("description", data["description"]))
for n, arg in enumerate(data["arguments"]):
table.append((
f"arg {n}",
f"{arg['name']}: {arg['type']}\n{arg['description']}"
))
print()
print(key + ":")
print(tabulate.tabulate(
table,
tablefmt="pretty",
maxcolwidths=[None, 70],
stralign="left"
))
print()
def main():
parser = argparse.ArgumentParser(
prog='tg-show-prompts',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
show_config(
url=args.api_url,
)
except Exception as e:
print("Exception:", e, flush=True)
main()

View file

@ -63,5 +63,7 @@ setuptools.setup(
"scripts/tg-save-kg-core",
"scripts/tg-save-doc-embeds",
"scripts/tg-show-config",
"scripts/tg-show-tools",
"scripts/tg-show-prompts",
]
)

View file

@ -80,7 +80,7 @@ class Processor(ConsumerProducer):
directory = None,
config = None,
error = Error(
code = "key-error",
type = "key-error",
message = f"Key error"
)
)
@ -112,8 +112,8 @@ class Processor(ConsumerProducer):
directory = None,
config = None,
error = Error(
code="key-error",
message="No such type",
type = "key-error",
message = "No such type",
),
)
@ -135,7 +135,7 @@ class Processor(ConsumerProducer):
directory = None,
config = None,
error = Error(
code = "key-error",
type = "key-error",
message = f"Key error"
)
)
@ -167,7 +167,7 @@ class Processor(ConsumerProducer):
directory = None,
config = None,
error = Error(
code = "key-error",
type = "key-error",
message = f"Key error"
)
)
@ -271,8 +271,8 @@ class Processor(ConsumerProducer):
directory=None,
values=None,
error=Error(
code="bad-operation",
message="Bad operation"
type = "bad-operation",
message = "Bad operation"
)
)