Migrate from setup.py to pyproject.toml (#440)

* Converted setup.py to pyproject.toml

* Modern package infrastructure as recommended by py docs
This commit is contained in:
cybermaggedon 2025-07-23 21:22:08 +01:00 committed by GitHub
parent d83e4e3d59
commit 98022d6af4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
145 changed files with 561 additions and 1159 deletions

View file

@ -0,0 +1 @@
# TrustGraph CLI modules

View file

@ -0,0 +1,204 @@
"""
Loads a document into the library
"""
import hashlib
import argparse
import os
import time
import uuid
from trustgraph.api import Api
from trustgraph.knowledge import hash, to_uri
from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG
from trustgraph.knowledge import Organization, PublicationEvent
from trustgraph.knowledge import DigitalDocument
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
class Loader:
def __init__(
self, id, url, user, metadata, title, comments, kind, tags
):
self.api = Api(url).library()
self.user = user
self.metadata = metadata
self.title = title
self.comments = comments
self.kind = kind
self.identifier = id
if tags:
self.tags = tags.split(",")
else:
self.tags = []
def load(self, files):
for file in files:
self.load_file(file)
def load_file(self, file):
try:
path = file
data = open(path, "rb").read()
# Create a SHA256 hash from the data
if self.identifier:
id = self.identifier
else:
id = hash(data)
id = to_uri(PREF_DOC, id)
self.metadata.id = id
self.api.add_document(
document=data, id=id, metadata=self.metadata,
user=self.user, kind=self.kind, title=self.title,
comments=self.comments, tags=self.tags
)
print(f"{file}: Loaded successfully.")
except Exception as e:
print(f"{file}: Failed: {str(e)}", flush=True)
raise e
def main():
parser = argparse.ArgumentParser(
prog='tg-add-library-document',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'--name', help=f'Document name'
)
parser.add_argument(
'--description', help=f'Document description'
)
parser.add_argument(
'--copyright-notice', help=f'Copyright notice'
)
parser.add_argument(
'--copyright-holder', help=f'Copyright holder'
)
parser.add_argument(
'--copyright-year', help=f'Copyright year'
)
parser.add_argument(
'--license', help=f'Copyright license'
)
parser.add_argument(
'--publication-organization', help=f'Publication organization'
)
parser.add_argument(
'--publication-description', help=f'Publication description'
)
parser.add_argument(
'--publication-date', help=f'Publication date'
)
parser.add_argument(
'--document-url', help=f'Document URL'
)
parser.add_argument(
'--keyword', nargs='+', help=f'Keyword'
)
parser.add_argument(
'--identifier', '--id', help=f'Document ID'
)
parser.add_argument(
'-k', '--kind',
required=True,
help=f'Document MIME type'
)
parser.add_argument(
'--tags',
help=f'Tags, command separated'
)
parser.add_argument(
'files', nargs='+',
help=f'File to load'
)
args = parser.parse_args()
try:
document = DigitalDocument(
args.identifier,
name=args.name,
description=args.description,
copyright_notice=args.copyright_notice,
copyright_holder=args.copyright_holder,
copyright_year=args.copyright_year,
license=args.license,
url=args.document_url,
keywords=args.keyword,
)
if args.publication_organization:
org = Organization(
id=to_uri(PREF_ORG, hash(args.publication_organization)),
name=args.publication_organization,
)
document.publication = PublicationEvent(
id = to_uri(PREF_PUBEV, str(uuid.uuid4())),
organization=org,
description=args.publication_description,
start_date=args.publication_date,
end_date=args.publication_date,
)
p = Loader(
id=args.identifier,
url=args.url,
user=args.user,
metadata=document,
title=args.name,
comments=args.description,
kind=args.kind,
tags=args.tags,
)
p.load(args.files)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,51 @@
"""
Deletes a flow class
"""
import argparse
import os
import tabulate
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def delete_flow_class(url, class_name):
api = Api(url).flow()
class_names = api.delete_class(class_name)
def main():
parser = argparse.ArgumentParser(
prog='tg-delete-flow-class',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-n', '--class-name',
help=f'Flow class name',
)
args = parser.parse_args()
try:
delete_flow_class(
url=args.api_url,
class_name=args.class_name,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,59 @@
"""
Deletes a flow class
"""
import argparse
import os
import tabulate
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def delete_kg_core(url, user, id):
api = Api(url).knowledge()
class_names = api.delete_kg_core(user = user, id = id)
def main():
parser = argparse.ArgumentParser(
prog='tg-delete-flow-class',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default="trustgraph",
help='API URL (default: trustgraph)',
)
parser.add_argument(
'--id', '--identifier',
required=True,
help=f'Knowledge core ID',
)
args = parser.parse_args()
try:
delete_kg_core(
url=args.api_url,
user=args.user,
id=args.id,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,93 @@
"""
Deletes MCP (Model Control Protocol) tools from the TrustGraph system.
Removes MCP tool configurations by ID from the 'mcp' configuration group.
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey
import textwrap
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def delete_mcp_tool(
url : str,
id : str,
):
api = Api(url).config()
# Check if the tool exists first
try:
values = api.get([
ConfigKey(type="mcp", key=id)
])
if not values or not values[0].value:
print(f"MCP tool '{id}' not found.")
return False
except Exception as e:
print(f"MCP tool '{id}' not found.")
return False
# Delete the MCP tool configuration from the 'mcp' group
try:
api.delete([
ConfigKey(type="mcp", key=id)
])
print(f"MCP tool '{id}' deleted successfully.")
return True
except Exception as e:
print(f"Error deleting MCP tool '{id}': {e}")
return False
def main():
parser = argparse.ArgumentParser(
prog='tg-delete-mcp-tool',
description=__doc__,
epilog=textwrap.dedent('''
This utility removes MCP tool configurations from the TrustGraph system.
Once deleted, the tool will no longer be available for use.
Examples:
%(prog)s --id weather
%(prog)s --id calculator
%(prog)s --api-url http://localhost:9000/ --id file-reader
''').strip(),
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'--id',
required=True,
help='MCP tool ID to delete',
)
args = parser.parse_args()
try:
if not args.id:
raise RuntimeError("Must specify --id for MCP tool to delete")
delete_mcp_tool(
url=args.api_url,
id=args.id
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,98 @@
"""
Deletes tools from the TrustGraph system.
Removes tool configurations by ID from the agent configuration
and updates the tool index accordingly.
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey, ConfigValue
import json
import textwrap
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def delete_tool(
url : str,
id : str,
):
api = Api(url).config()
# Check if the tool configuration exists
try:
tool_values = api.get([
ConfigKey(type="tool", key=id)
])
if not tool_values or not tool_values[0].value:
print(f"Tool configuration for '{id}' not found.")
return False
except Exception as e:
print(f"Tool configuration for '{id}' not found.")
return False
# Delete the tool configuration and update the index
try:
# Delete the tool configuration
api.delete([
ConfigKey(type="tool", key=id)
])
print(f"Tool '{id}' deleted successfully.")
return True
except Exception as e:
print(f"Error deleting tool '{id}': {e}")
return False
def main():
parser = argparse.ArgumentParser(
prog='tg-delete-tool',
description=__doc__,
epilog=textwrap.dedent('''
This utility removes tool configurations from the TrustGraph system.
It removes the tool from both the tool index and deletes the tool
configuration. Once deleted, the tool will no longer be available for use.
Examples:
%(prog)s --id weather
%(prog)s --id calculator
%(prog)s --api-url http://localhost:9000/ --id file-reader
''').strip(),
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'--id',
required=True,
help='Tool ID to delete',
)
args = parser.parse_args()
try:
if not args.id:
raise RuntimeError("Must specify --id for tool to delete")
delete_tool(
url=args.api_url,
id=args.id
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,91 @@
"""
This utility reads a knowledge core in msgpack format and outputs its
contents in JSON form to standard output. This is useful only as a
diagnostic utility.
"""
import msgpack
import sys
import argparse
import json
def dump(input_file, action):
with open(input_file, 'rb') as f:
unpacker = msgpack.Unpacker(f, raw=False)
for unpacked in unpacker:
print(json.dumps(unpacked))
def summary(input_file, action):
vector_dim = None
triples = set()
max_records = 1000000
with open(input_file, 'rb') as f:
unpacker = msgpack.Unpacker(f, raw=False)
rec_count = 0
for msg in unpacker:
if msg[0] == "ge":
vector_dim = len(msg[1]["v"][0])
if msg[0] == "t":
for elt in msg[1]["m"]["m"]:
triples.add((
elt["s"]["v"],
elt["p"]["v"],
elt["o"]["v"],
))
if rec_count > max_records: break
rec_count += 1
print("Vector dimension:", vector_dim)
for t in triples:
if t[1] == "http://www.w3.org/2000/01/rdf-schema#label":
print("-", t[2])
def main():
parser = argparse.ArgumentParser(
prog='tg-dump-msgpack',
description=__doc__,
)
parser.add_argument(
'-i', '--input-file',
required=True,
help=f'Input file'
)
parser.add_argument(
'-s', '--summary', action="store_const", const="summary",
dest="action",
help=f'Show a summary'
)
parser.add_argument(
'-r', '--records', action="store_const", const="records",
dest="action",
help=f'Dump individual records'
)
args = parser.parse_args()
if args.action == "summary":
summary(**vars(args))
else:
dump(**vars(args))
if __name__ == "__main__":
main()

View file

@ -0,0 +1,54 @@
"""
Outputs a flow class definition in JSON format.
"""
import argparse
import os
import tabulate
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def get_flow_class(url, class_name):
api = Api(url).flow()
cls = api.get_class(class_name)
print(json.dumps(cls, indent=4))
def main():
parser = argparse.ArgumentParser(
prog='tg-get-flow-class',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-n', '--class-name',
required=True,
help=f'Flow class name',
)
args = parser.parse_args()
try:
get_flow_class(
url=args.api_url,
class_name=args.class_name,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,159 @@
"""
Uses the knowledge service to fetch a knowledge core which is saved
to a local file in msgpack format.
"""
import argparse
import os
import textwrap
import uuid
import asyncio
import json
from websockets.asyncio.client import connect
import msgpack
default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/')
default_user = 'trustgraph'
def write_triple(f, data):
msg = (
"t",
{
"m": {
"i": data["metadata"]["id"],
"m": data["metadata"]["metadata"],
"u": data["metadata"]["user"],
"c": data["metadata"]["collection"],
},
"t": data["triples"],
}
)
f.write(msgpack.packb(msg, use_bin_type=True))
def write_ge(f, data):
msg = (
"ge",
{
"m": {
"i": data["metadata"]["id"],
"m": data["metadata"]["metadata"],
"u": data["metadata"]["user"],
"c": data["metadata"]["collection"],
},
"e": [
{
"e": ent["entity"],
"v": ent["vectors"],
}
for ent in data["entities"]
]
}
)
f.write(msgpack.packb(msg, use_bin_type=True))
async def fetch(url, user, id, output):
if not url.endswith("/"):
url += "/"
url = url + "api/v1/socket"
mid = str(uuid.uuid4())
async with connect(url) as ws:
req = json.dumps({
"id": mid,
"service": "knowledge",
"request": {
"operation": "get-kg-core",
"user": user,
"id": id,
}
})
await ws.send(req)
ge = 0
t = 0
with open(output, "wb") as f:
while True:
msg = await ws.recv()
obj = json.loads(msg)
if "response" not in obj:
raise RuntimeError("No response?")
response = obj["response"]
if "error" in response:
raise RuntimeError(obj["error"])
if "eos" in response:
if response["eos"]: break
if "triples" in response:
t += 1
write_triple(f, response["triples"])
if "graph-embeddings" in response:
ge += 1
write_ge(f, response["graph-embeddings"])
print(f"Got: {t} triple, {ge} GE messages.")
await ws.close()
def main():
parser = argparse.ArgumentParser(
prog='tg-get-kg-core',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'--id', '--identifier',
required=True,
help=f'Knowledge core ID',
)
parser.add_argument(
'-o', '--output',
required=True,
help=f'Output file'
)
args = parser.parse_args()
try:
asyncio.run(
fetch(
url = args.url,
user = args.user,
id = args.id,
output = args.output,
)
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,104 @@
"""
Connects to the graph query service and dumps all graph edges in Turtle
format.
"""
import rdflib
import io
import sys
import argparse
import os
from trustgraph.api import Api, Uri
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
default_collection = 'default'
def show_graph(url, flow_id, user, collection):
api = Api(url).flow().id(flow_id)
rows = api.triples_query(
s=None, p=None, o=None,
user=user, collection=collection,
limit=10_000)
g = rdflib.Graph()
for row in rows:
sv = rdflib.term.URIRef(row.s)
pv = rdflib.term.URIRef(row.p)
if isinstance(row.o, Uri):
# Skip malformed URLs with spaces in
if " " in row.o:
continue
ov = rdflib.term.URIRef(row.o)
else:
ov = rdflib.term.Literal(row.o)
g.add((sv, pv, ov))
g.serialize(destination="output.ttl", format="turtle")
buf = io.BytesIO()
g.serialize(destination=buf, format="turtle")
sys.stdout.write(buf.getvalue().decode("utf-8"))
def main():
parser = argparse.ArgumentParser(
prog='tg-graph-to-turtle',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-C', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
args = parser.parse_args()
try:
show_graph(
url = args.api_url,
flow_id = args.flow_id,
user = args.user,
collection = args.collection,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,11 @@
#!/usr/bin/env bash
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H 'Content-Type: application/json' \
-X PUT \
http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'

View file

@ -0,0 +1,239 @@
"""
Initialises Pulsar with Trustgraph tenant / namespaces & policy.
"""
import requests
import time
import argparse
import json
from trustgraph.clients.config_client import ConfigClient
default_pulsar_admin_url = "http://pulsar:8080"
default_pulsar_host = "pulsar://pulsar:6650"
subscriber = "tg-init-pulsar"
def get_clusters(url):
print("Get clusters...", flush=True)
resp = requests.get(f"{url}/admin/v2/clusters")
if resp.status_code != 200: raise RuntimeError("Could not fetch clusters")
return resp.json()
def ensure_tenant(url, tenant, clusters):
resp = requests.get(f"{url}/admin/v2/tenants/{tenant}")
if resp.status_code == 200:
print(f"Tenant {tenant} already exists.", flush=True)
return
resp = requests.put(
f"{url}/admin/v2/tenants/{tenant}",
json={
"adminRoles": [],
"allowedClusters": clusters,
}
)
if resp.status_code != 204:
print(resp.text, flush=True)
raise RuntimeError("Tenant creation failed.")
print(f"Tenant {tenant} created.", flush=True)
def ensure_namespace(url, tenant, namespace, config):
resp = requests.get(f"{url}/admin/v2/namespaces/{tenant}/{namespace}")
if resp.status_code == 200:
print(f"Namespace {tenant}/{namespace} already exists.", flush=True)
return
resp = requests.put(
f"{url}/admin/v2/namespaces/{tenant}/{namespace}",
json=config,
)
if resp.status_code != 204:
print(resp.status_code, flush=True)
print(resp.text, flush=True)
raise RuntimeError(f"Namespace {tenant}/{namespace} creation failed.")
print(f"Namespace {tenant}/{namespace} created.", flush=True)
def ensure_config(config, pulsar_host, pulsar_api_key):
cli = ConfigClient(
subscriber=subscriber,
pulsar_host=pulsar_host,
pulsar_api_key=pulsar_api_key,
)
while True:
try:
print("Get current config...", flush=True)
current, version = cli.config(timeout=5)
except Exception as e:
print("Exception:", e, flush=True)
time.sleep(2)
print("Retrying...", flush=True)
continue
print("Current config version is", version, flush=True)
if version != 0:
print("Already updated, not updating config. Done.", flush=True)
return
print("Config is version 0, updating...", flush=True)
batch = []
for type in config:
for key in config[type]:
print(f"Adding {type}/{key} to update.", flush=True)
batch.append({
"type": type,
"key": key,
"value": json.dumps(config[type][key]),
})
try:
cli.put(batch, timeout=10)
print("Update succeeded.", flush=True)
break
except Exception as e:
print("Exception:", e, flush=True)
time.sleep(2)
print("Retrying...", flush=True)
continue
def init(
pulsar_admin_url, pulsar_host, pulsar_api_key, tenant,
config, config_file,
):
clusters = get_clusters(pulsar_admin_url)
ensure_tenant(pulsar_admin_url, tenant, clusters)
ensure_namespace(pulsar_admin_url, tenant, "flow", {})
ensure_namespace(pulsar_admin_url, tenant, "request", {})
ensure_namespace(pulsar_admin_url, tenant, "response", {
"retention_policies": {
"retentionSizeInMB": -1,
"retentionTimeInMinutes": 3,
"subscriptionExpirationTimeMinutes": 30,
}
})
ensure_namespace(pulsar_admin_url, tenant, "config", {
"retention_policies": {
"retentionSizeInMB": 10,
"retentionTimeInMinutes": -1,
"subscriptionExpirationTimeMinutes": 5,
}
})
if config is not None:
try:
print("Decoding config...", flush=True)
dec = json.loads(config)
print("Decoded.", flush=True)
except Exception as e:
print("Exception:", e, flush=True)
raise e
ensure_config(dec, pulsar_host, pulsar_api_key)
elif config_file is not None:
try:
print("Decoding config...", flush=True)
dec = json.load(open(config_file))
print("Decoded.", flush=True)
except Exception as e:
print("Exception:", e, flush=True)
raise e
ensure_config(dec, pulsar_host, pulsar_api_key)
else:
print("No config to update.", flush=True)
def main():
parser = argparse.ArgumentParser(
prog='tg-init-trustgraph',
description=__doc__,
)
parser.add_argument(
'-p', '--pulsar-admin-url',
default=default_pulsar_admin_url,
help=f'Pulsar admin URL (default: {default_pulsar_admin_url})',
)
parser.add_argument(
'--pulsar-host',
default=default_pulsar_host,
help=f'Pulsar host (default: {default_pulsar_host})',
)
parser.add_argument(
'--pulsar-api-key',
help=f'Pulsar API key',
)
parser.add_argument(
'-c', '--config',
help=f'Initial configuration to load',
)
parser.add_argument(
'-C', '--config-file',
help=f'Initial configuration to load from file',
)
parser.add_argument(
'-t', '--tenant',
default="tg",
help=f'Tenant (default: tg)',
)
args = parser.parse_args()
while True:
try:
print(flush=True)
print(
f"Initialising with Pulsar {args.pulsar_admin_url}...",
flush=True
)
init(**vars(args))
print("Initialisation complete.", flush=True)
break
except Exception as e:
print("Exception:", e, flush=True)
print("Sleeping...", flush=True)
time.sleep(2)
print("Will retry...", flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,171 @@
"""
Uses the agent service to answer a question
"""
import argparse
import os
import textwrap
import uuid
import asyncio
import json
from websockets.asyncio.client import connect
default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/')
default_user = 'trustgraph'
default_collection = 'default'
def wrap(text, width=75):
if text is None: text = "n/a"
out = textwrap.wrap(
text, width=width
)
return "\n".join(out)
def output(text, prefix="> ", width=78):
out = textwrap.indent(
text, prefix=prefix
)
print(out)
async def question(
url, question, flow_id, user, collection,
plan=None, state=None, verbose=False
):
if not url.endswith("/"):
url += "/"
url = url + "api/v1/socket"
if verbose:
output(wrap(question), "\U00002753 ")
print()
def think(x):
if verbose:
output(wrap(x), "\U0001f914 ")
print()
def observe(x):
if verbose:
output(wrap(x), "\U0001f4a1 ")
print()
mid = str(uuid.uuid4())
async with connect(url) as ws:
req = json.dumps({
"id": mid,
"service": "agent",
"flow": flow_id,
"request": {
"question": question,
}
})
await ws.send(req)
while True:
msg = await ws.recv()
obj = json.loads(msg)
if "error" in obj:
raise RuntimeError(obj["error"])
if obj["id"] != mid:
print("Ignore message")
continue
if "thought" in obj["response"]:
think(obj["response"]["thought"])
if "observation" in obj["response"]:
observe(obj["response"]["observation"])
if "answer" in obj["response"]:
print(obj["response"]["answer"])
if obj["complete"]: break
await ws.close()
def main():
parser = argparse.ArgumentParser(
prog='tg-invoke-agent',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-q', '--question',
required=True,
help=f'Question to answer',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-C', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument(
'-l', '--plan',
help=f'Agent plan (default: unspecified)'
)
parser.add_argument(
'-s', '--state',
help=f'Agent initial state (default: unspecified)'
)
parser.add_argument(
'-v', '--verbose',
action="store_true",
help=f'Output thinking/observations'
)
args = parser.parse_args()
try:
asyncio.run(
question(
url = args.url,
flow_id = args.flow_id,
question = args.question,
user = args.user,
collection = args.collection,
plan = args.plan,
state = args.state,
verbose = args.verbose,
)
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,86 @@
"""
Uses the DocumentRAG service to answer a question
"""
import argparse
import os
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
default_collection = 'default'
default_doc_limit = 10
def question(url, flow_id, question, user, collection, doc_limit):
api = Api(url).flow().id(flow_id)
resp = api.document_rag(
question=question, user=user, collection=collection,
doc_limit=doc_limit,
)
print(resp)
def main():
parser = argparse.ArgumentParser(
prog='tg-invoke-document-rag',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-q', '--question',
required=True,
help=f'Question to answer',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-C', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument(
'-d', '--doc-limit',
default=default_doc_limit,
help=f'Document limit (default: {default_doc_limit})'
)
args = parser.parse_args()
try:
question(
url=args.url,
flow_id = args.flow_id,
question=args.question,
user=args.user,
collection=args.collection,
doc_limit=args.doc_limit,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,115 @@
"""
Uses the GraphRAG service to answer a question
"""
import argparse
import os
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
default_collection = 'default'
default_entity_limit = 50
default_triple_limit = 30
default_max_subgraph_size = 150
default_max_path_length = 2
def question(
url, flow_id, question, user, collection, entity_limit, triple_limit,
max_subgraph_size, max_path_length
):
api = Api(url).flow().id(flow_id)
resp = api.graph_rag(
question=question, user=user, collection=collection,
entity_limit=entity_limit, triple_limit=triple_limit,
max_subgraph_size=max_subgraph_size,
max_path_length=max_path_length
)
print(resp)
def main():
parser = argparse.ArgumentParser(
prog='tg-invoke-graph-rag',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-q', '--question',
required=True,
help=f'Question to answer',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-C', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument(
'-e', '--entity-limit',
default=default_entity_limit,
help=f'Entity limit (default: {default_entity_limit})'
)
parser.add_argument(
'-t', '--triple-limit',
default=default_triple_limit,
help=f'Triple limit (default: {default_triple_limit})'
)
parser.add_argument(
'-s', '--max-subgraph-size',
default=default_max_subgraph_size,
help=f'Max subgraph size (default: {default_max_subgraph_size})'
)
parser.add_argument(
'-p', '--max-path-length',
default=default_max_path_length,
help=f'Max path length (default: {default_max_path_length})'
)
args = parser.parse_args()
try:
question(
url=args.url,
flow_id = args.flow_id,
question=args.question,
user=args.user,
collection=args.collection,
entity_limit=args.entity_limit,
triple_limit=args.triple_limit,
max_subgraph_size=args.max_subgraph_size,
max_path_length=args.max_path_length,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,68 @@
"""
Invokes the text completion service by specifying an LLM system prompt
and user prompt. Both arguments are required.
"""
import argparse
import os
import json
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def query(url, flow_id, system, prompt):
api = Api(url).flow().id(flow_id)
resp = api.text_completion(system=system, prompt=prompt)
print(resp)
def main():
parser = argparse.ArgumentParser(
prog='tg-invoke-llm',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'system',
nargs=1,
help='LLM system prompt e.g. You are a helpful assistant',
)
parser.add_argument(
'prompt',
nargs=1,
help='LLM prompt e.g. What is 2 + 2?',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
args = parser.parse_args()
try:
query(
url=args.url,
flow_id = args.flow_id,
system=args.system[0],
prompt=args.prompt[0],
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,78 @@
"""
Invokes MCP (Model Control Protocol) tools through the TrustGraph API.
Allows calling MCP tools by specifying the tool name and providing
parameters as a JSON-encoded dictionary. The tool is executed within
the context of a specified flow.
"""
import argparse
import os
import json
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def query(url, flow_id, name, parameters):
api = Api(url).flow().id(flow_id)
resp = api.mcp_tool(name=name, parameters=parameters)
if isinstance(resp, str):
print(resp)
else:
print(json.dumps(resp, indent=4))
def main():
parser = argparse.ArgumentParser(
prog='tg-invoke-mcp-tool',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-n', '--name',
metavar='tool-name',
help=f'MCP tool name',
)
parser.add_argument(
'-P', '--parameters',
help='''Tool parameters, should be JSON-encoded dict.''',
)
args = parser.parse_args()
if args.parameters:
parameters = json.loads(args.parameters)
else:
parameters = {}
try:
query(
url = args.url,
flow_id = args.flow_id,
name = args.name,
parameters = parameters,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,88 @@
"""
Invokes the LLM prompt service by specifying the prompt template to use
and values for the variables in the prompt template. The
prompt template is identified by its template identifier e.g.
question, extract-definitions. Template variable values are specified
using key=value arguments on the command line, and these replace
{{key}} placeholders in the template.
"""
import argparse
import os
import json
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def query(url, flow_id, template_id, variables):
api = Api(url).flow().id(flow_id)
resp = api.prompt(id=template_id, variables=variables)
if isinstance(resp, str):
print(resp)
else:
print(json.dumps(resp, indent=4))
def main():
parser = argparse.ArgumentParser(
prog='tg-invoke-prompt',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'id',
metavar='template-id',
nargs=1,
help=f'Prompt identifier e.g. question, extract-definitions',
)
parser.add_argument(
'variable',
nargs='*',
metavar="variable=value",
help='''Prompt template terms of the form variable=value, can be
specified multiple times''',
)
args = parser.parse_args()
variables = {}
for variable in args.variable:
toks = variable.split("=", 1)
if len(toks) != 2:
raise RuntimeError(f"Malformed variable: {variable}")
variables[toks[0]] = toks[1]
try:
query(
url=args.url,
flow_id=args.flow_id,
template_id=args.id[0],
variables=variables,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,233 @@
"""
This utility takes a document embeddings core and loads it into a running
TrustGraph through the API. The document embeddings core should be in msgpack
format, which is the default format produce by tg-save-doc-embeds.
"""
import aiohttp
import asyncio
import msgpack
import json
import sys
import argparse
import os
import signal
class Running:
def __init__(self): self.running = True
def get(self): return self.running
def stop(self): self.running = False
de_counts = 0
async def load_de(running, queue, url):
global de_counts
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
while running.get():
try:
msg = await asyncio.wait_for(queue.get(), 1)
# End of load
if msg is None:
break
except:
# Hopefully it's TimeoutError. Annoying to match since
# it changed in 3.11.
continue
msg = {
"metadata": {
"id": msg["m"]["i"],
"metadata": msg["m"]["m"],
"user": msg["m"]["u"],
"collection": msg["m"]["c"],
},
"chunks": [
{
"chunk": chunk["c"],
"vectors": chunk["v"],
}
for chunk in msg["c"]
],
}
try:
await ws.send_json(msg)
except Exception as e:
print(e)
de_counts += 1
async def stats(running):
global de_counts
while running.get():
await asyncio.sleep(2)
print(
f"Graph embeddings: {de_counts:10d}"
)
async def loader(running, de_queue, path, format, user, collection):
if format == "json":
raise RuntimeError("Not implemented")
else:
with open(path, "rb") as f:
unpacker = msgpack.Unpacker(f, raw=False)
while running.get():
try:
unpacked = unpacker.unpack()
except:
break
if user:
unpacked["metadata"]["user"] = user
if collection:
unpacked["metadata"]["collection"] = collection
if unpacked[0] == "de":
qtype = de_queue
while running.get():
try:
await asyncio.wait_for(qtype.put(unpacked[1]), 0.5)
# Successful put message, move on
break
except:
# Hopefully it's TimeoutError. Annoying to match since
# it changed in 3.11.
continue
if not running.get(): break
# Put 'None' on end of queue to finish
while running.get():
try:
await asyncio.wait_for(de_queue.put(None), 1)
# Successful put message, move on
break
except:
# Hopefully it's TimeoutError. Annoying to match since
# it changed in 3.11.
continue
async def run(running, **args):
# Maxsize on queues reduces back-pressure so tg-load-kg-core doesn't
# grow to eat all memory
de_q = asyncio.Queue(maxsize=10)
url = args["url"]
flow_id = args["flow_id"]
load_task = asyncio.create_task(
loader(
running=running,
de_queue=de_q,
path=args["input_file"], format=args["format"],
user=args["user"], collection=args["collection"],
)
)
de_task = asyncio.create_task(
load_de(
running=running,
queue=de_q,
url = f"{url}api/v1/flow/{flow_id}/import/document-embeddings"
)
)
stats_task = asyncio.create_task(stats(running))
await de_task
running.stop()
await load_task
await stats_task
async def main(running):
parser = argparse.ArgumentParser(
prog='tg-load-doc-embeds',
description=__doc__,
)
default_url = os.getenv("TRUSTGRAPH_API", "http://localhost:8088/")
default_user = "trustgraph"
collection = "default"
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'TrustGraph API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-i', '--input-file',
# Make it mandatory, difficult to over-write an existing file
required=True,
help=f'Output file'
)
parser.add_argument(
'--format',
default="msgpack",
choices=["msgpack", "json"],
help=f'Output format (default: msgpack)',
)
parser.add_argument(
'--user',
help=f'User ID to load as (default: from input)'
)
parser.add_argument(
'--collection',
help=f'Collection ID to load as (default: from input)'
)
args = parser.parse_args()
await run(running, **vars(args))
running = Running()
def interrupt(sig, frame):
running.stop()
print('Interrupt')
signal.signal(signal.SIGINT, interrupt)
asyncio.run(main(running))

View file

@ -0,0 +1,78 @@
"""
Starts a load operation on a knowledge core which is already stored by
the knowledge manager. You could load a core with tg-put-kg-core and then
run this utility.
"""
import argparse
import os
import tabulate
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_flow = "default"
default_collection = "default"
def load_kg_core(url, user, id, flow, collection):
api = Api(url).knowledge()
class_names = api.load_kg_core(user = user, id = id, flow=flow,
collection=collection)
def main():
parser = argparse.ArgumentParser(
prog='tg-delete-flow-class',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default="trustgraph",
help='API URL (default: trustgraph)',
)
parser.add_argument(
'--id', '--identifier',
required=True,
help=f'Knowledge core ID',
)
parser.add_argument(
'-f', '--flow-id',
default=default_flow,
help=f'Flow ID (default: {default_flow}',
)
parser.add_argument(
'-c', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection}',
)
args = parser.parse_args()
try:
load_kg_core(
url=args.api_url,
user=args.user,
id=args.id,
flow=args.flow_id,
collection=args.collection,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,200 @@
"""
Loads a PDF document into TrustGraph processing by directing to
the pdf-decoder queue.
Consider using tg-add-library-document to load
a document, followed by tg-start-library-processing to initiate processing.
"""
import hashlib
import argparse
import os
import time
import uuid
from trustgraph.api import Api
from trustgraph.knowledge import hash, to_uri
from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG
from trustgraph.knowledge import Organization, PublicationEvent
from trustgraph.knowledge import DigitalDocument
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
default_collection = 'default'
class Loader:
def __init__(
self,
url,
flow_id,
user,
collection,
metadata,
):
self.api = Api(url).flow().id(flow_id)
self.user = user
self.collection = collection
self.metadata = metadata
def load(self, files):
for file in files:
self.load_file(file)
def load_file(self, file):
try:
path = file
data = open(path, "rb").read()
# Create a SHA256 hash from the data
id = hash(data)
id = to_uri(PREF_DOC, id)
self.metadata.id = id
self.api.load_document(
document=data, id=id, metadata=self.metadata,
user=self.user,
collection=self.collection,
)
print(f"{file}: Loaded successfully.")
except Exception as e:
print(f"{file}: Failed: {str(e)}", flush=True)
raise e
def main():
parser = argparse.ArgumentParser(
prog='tg-load-pdf',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-C', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument(
'--name', help=f'Document name'
)
parser.add_argument(
'--description', help=f'Document description'
)
parser.add_argument(
'--copyright-notice', help=f'Copyright notice'
)
parser.add_argument(
'--copyright-holder', help=f'Copyright holder'
)
parser.add_argument(
'--copyright-year', help=f'Copyright year'
)
parser.add_argument(
'--license', help=f'Copyright license'
)
parser.add_argument(
'--publication-organization', help=f'Publication organization'
)
parser.add_argument(
'--publication-description', help=f'Publication description'
)
parser.add_argument(
'--publication-date', help=f'Publication date'
)
parser.add_argument(
'--document-url', help=f'Document URL'
)
parser.add_argument(
'--keyword', nargs='+', help=f'Keyword'
)
parser.add_argument(
'--identifier', '--id', help=f'Document ID'
)
parser.add_argument(
'files', nargs='+',
help=f'File to load'
)
args = parser.parse_args()
try:
document = DigitalDocument(
id,
name=args.name,
description=args.description,
copyright_notice=args.copyright_notice,
copyright_holder=args.copyright_holder,
copyright_year=args.copyright_year,
license=args.license,
url=args.document_url,
keywords=args.keyword,
)
if args.publication_organization:
org = Organization(
id=to_uri(PREF_ORG, hash(args.publication_organization)),
name=args.publication_organization,
)
document.publication = PublicationEvent(
id = to_uri(PREF_PUBEV, str(uuid.uuid4())),
organization=org,
description=args.publication_description,
start_date=args.publication_date,
end_date=args.publication_date,
)
p = Loader(
url=args.url,
flow_id = args.flow_id,
user=args.user,
collection=args.collection,
metadata=document,
)
p.load(args.files)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,739 @@
"""
Loads a PDF document into the library
"""
import argparse
import os
import uuid
import datetime
import requests
from trustgraph.api import Api
from trustgraph.api.types import hash, Uri, Literal, Triple
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
from requests.adapters import HTTPAdapter
from urllib3.response import HTTPResponse
class FileAdapter(HTTPAdapter):
def send(self, request, *args, **kwargs):
resp = HTTPResponse(body=open(request.url[7:], 'rb'), status=200, preload_content=False)
return self.build_response(request, resp)
session = requests.session()
session.mount('file://', FileAdapter())
try:
os.mkdir("doc-cache")
except:
pass
documents = [
{
"id": "https://trustgraph.ai/doc/challenger-report-vol-1",
"title": "Report of the Presidential Commission on the Space Shuttle Challenger Accident, Volume 1",
"comments": "The findings of the Commission regarding the circumstances surrounding the Challenger accident are reported and recommendations for corrective action are outlined",
"url": "https://ntrs.nasa.gov/api/citations/19860015255/downloads/19860015255.pdf",
"kind": "application/pdf",
"date": datetime.datetime.now().date(),
"tags": ["nasa", "safety-engineering", "space-shuttle"],
"metadata": [
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/DigitalDocument")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("Report of the Presidential Commission on the Space Shuttle Challenger Accident, Volume 1")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/name"),
o = Literal("Report of the Presidential Commission on the Space Shuttle Challenger Accident, Volume 1")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/description"),
o = Literal("The findings of the Commission regarding the circumstances surrounding the Challenger accident are reported and recommendations for corrective action are outlined")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/copyrightNotice"),
o = Literal("Work of the US Gov. Public Use Permitted")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/copyrightHolder"),
o = Literal("US Gov.")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/copyrightYear"),
o = Literal("1986")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/keywords"),
o = Literal("nasa")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/keywords"),
o = Literal("space-shuttle")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/keywords"),
o = Literal("safety-engineering")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/keywords"),
o = Literal("challenger")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/keywords"),
o = Literal("space-transportation")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/publication"),
o = Uri("https://trustgraph.ai/pubev/d946c320-0432-48c8-a015-26b0af3cedae")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/d946c320-0432-48c8-a015-26b0af3cedae"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/PublicationEvent")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/d946c320-0432-48c8-a015-26b0af3cedae"),
p = Uri("https://schema.org/description"),
o = Literal("The findings of the Commission regarding the circumstances surrounding the Challenger accident are reported and recommendations for corrective action are outlined")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/d946c320-0432-48c8-a015-26b0af3cedae"),
p = Uri("https://schema.org/publishedBy"),
o = Uri("https://trustgraph.ai/org/nasa")
),
Triple(
s = Uri("https://trustgraph.ai/org/nasa"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/Organization")
),
Triple(
s = Uri("https://trustgraph.ai/org/nasa"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("NASA")
),
Triple(
s = Uri("https://trustgraph.ai/org/nasa"),
p = Uri("https://schema.org/name"),
o = Literal("NASA")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/d946c320-0432-48c8-a015-26b0af3cedae"),
p = Uri("https://schema.org/startDate"),
o = Literal("1986-06-06")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/d946c320-0432-48c8-a015-26b0af3cedae"),
p = Uri("https://schema.org/endDate"),
o = Literal("1986-06-06")
),
Triple(
s = Uri("https://trustgraph.ai/doc/challenger-report-vol-1"),
p = Uri("https://schema.org/url"),
o = Uri("https://ntrs.nasa.gov/api/citations/19860015255/downloads/19860015255.pdf")
)
]
},
{
"id": "https://trustgraph.ai/doc/icelandic-dictionary",
"title": "A Concise Dictionary of Old Icelandic",
"comments": "A Concise Dictionary of Old Icelandic, published in 1910, is a 551-page dictionary that offers a comprehensive overview of the Old Norse language, particularly Old Icelandic.",
"url": "https://css4.pub/2015/icelandic/dictionary.pdf",
"kind": "application/pdf",
"date": datetime.datetime.now().date(),
"tags": ["old-icelandic", "dictionary", "language", "grammar", "old-norse", "icelandic"],
"metadata": [
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/DigitalDocument")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("A Concise Dictionary of Old Icelandic"),
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/name"),
o = Literal("A Concise Dictionary of Old Icelandic"),
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/description"),
o = Literal("A Concise Dictionary of Old Icelandic, published in 1910, is a 551-page dictionary that offers a comprehensive overview of the Old Norse language, particularly Old Icelandic."),
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/copyrightNotice"),
o = Literal("Copyright expired, public domain")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/copyrightHolder"),
o = Literal("Geir Zoëga, Clarendon Press")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/copyrightYear"),
o = Literal("1910")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/keywords"),
o = Literal("icelandic")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/keywords"),
o = Literal("old-norse")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/keywords"),
o = Literal("dictionary")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/keywords"),
o = Literal("grammar")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/keywords"),
o = Literal("old-icelandic")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/publication"),
o = Uri("https://trustgraph.ai/pubev/11a78156-3aea-4263-9f1b-0c63cbde69d7")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/11a78156-3aea-4263-9f1b-0c63cbde69d7"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/PublicationEvent")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/11a78156-3aea-4263-9f1b-0c63cbde69d7"),
p = Uri("https://schema.org/description"),
o = Literal("Published by Clarendon Press in 1910"),
),
Triple(
s = Uri("https://trustgraph.ai/pubev/11a78156-3aea-4263-9f1b-0c63cbde69d7"),
p = Uri("https://schema.org/publishedBy"),
o = Uri("https://trustgraph.ai/org/clarendon-press")
),
Triple(
s = Uri("https://trustgraph.ai/org/clarendon-press"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/Organization")
),
Triple(
s = Uri("https://trustgraph.ai/org/clarendon-press"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("NASA")
),
Triple(
s = Uri("https://trustgraph.ai/org/clarendon-press"),
p = Uri("https://schema.org/name"),
o = Literal("Clarendon Press")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/11a78156-3aea-4263-9f1b-0c63cbde69d7"),
p = Uri("https://schema.org/startDate"),
o = Literal("1910-01-01")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/11a78156-3aea-4263-9f1b-0c63cbde69d7"),
p = Uri("https://schema.org/endDate"),
o = Literal("1910-01-01")
),
Triple(
s = Uri("https://trustgraph.ai/doc/icelandic-dictionary"),
p = Uri("https://schema.org/url"),
o = Uri("https://digital-research-books-beta.nypl.org/edition/10476341")
)
]
},
{
"id": "https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025",
"title": "Annual threat assessment of the U.S. intelligence community - March 2025",
"comments": "The report reflects the collective insights of the Intelligence Community (IC), which is committed to providing the nuanced, independent, and unvarnished intelligence that policymakers, warfighters, and domestic law enforcement personnel need to protect American lives and Americas interests anywhere in the world.",
"url": "https://www.intelligence.senate.gov/sites/default/files/2025%20Annual%20Threat%20Assessment%20of%20the%20U.S.%20Intelligence%20Community.pdf",
"kind": "application/pdf",
"date": datetime.datetime.now().date(),
"tags": ["adversary-cooperation", "cyberthreats", "supply-chain-vulnerabilities", "economic-competition", "national-security", "data-privacy"],
"metadata": [
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/DigitalDocument")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("Annual threat assessment of the U.S. intelligence community - March 2025"),
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/name"),
o = Literal("Annual threat assessment of the U.S. intelligence community - March 2025"),
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/description"),
o = Literal("The report reflects the collective insights of the Intelligence Community (IC), which is committed to providing the nuanced, independent, and unvarnished intelligence that policymakers, warfighters, and domestic law enforcement personnel need to protect American lives and Americas interests anywhere in the world."),
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/copyrightNotice"),
o = Literal("Not copyright")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/copyrightHolder"),
o = Literal("US Government")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/copyrightYear"),
o = Literal("2025")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/keywords"),
o = Literal("adversary-cooperation")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/keywords"),
o = Literal("cyberthreats")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/keywords"),
o = Literal("supply-chain-vulnerabilities")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/keywords"),
o = Literal("economic-competition")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/keywords"),
o = Literal("national-security")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/publication"),
o = Uri("https://trustgraph.ai/pubev/0f1cfbe2-ce64-403b-8327-799aa8ba3cec")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/0f1cfbe2-ce64-403b-8327-799aa8ba3cec"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/PublicationEvent")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/0f1cfbe2-ce64-403b-8327-799aa8ba3cec"),
p = Uri("https://schema.org/description"),
o = Literal("Published by the Director of National Intelligence (DNI)"),
),
Triple(
s = Uri("https://trustgraph.ai/pubev/0f1cfbe2-ce64-403b-8327-799aa8ba3cec"),
p = Uri("https://schema.org/publishedBy"),
o = Uri("https://trustgraph.ai/org/us-gov-dni")
),
Triple(
s = Uri("https://trustgraph.ai/org/us-gov-dni"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/Organization")
),
Triple(
s = Uri("https://trustgraph.ai/org/us-gov-dni"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("The Director of National Intelligence")
),
Triple(
s = Uri("https://trustgraph.ai/org/us-gov-dni"),
p = Uri("https://schema.org/name"),
o = Literal("The Director of National Intelligence")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/0f1cfbe2-ce64-403b-8327-799aa8ba3cec"),
p = Uri("https://schema.org/startDate"),
o = Literal("2025-03-18")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/0f1cfbe2-ce64-403b-8327-799aa8ba3cec"),
p = Uri("https://schema.org/endDate"),
o = Literal("2025-03-18")
),
Triple(
s = Uri("https://trustgraph.ai/doc/annual-threat-assessment-us-dni-march-2025"),
p = Uri("https://schema.org/url"),
o = Uri("https://www.dni.gov/index.php/newsroom/reports-publications/reports-publications-2025/4058-2025-annual-threat-assessment")
)
]
},
{
"id": "https://trustgraph.ai/doc/intelligence-and-state",
"title": "The Role of Intelligence and State Policies in International Security",
"comments": "A volume by Mehmet Emin Erendor, published by Cambridge Scholars Publishing (2021). It is well-known that the understanding of security has changed since the end of the Cold War. This, in turn, has impacted the characteristics of intelligence, as states have needed to improve their security policies with new intelligence tactics. This volume investigates this new state of play in the international arena.",
"url": "https://www.cambridgescholars.com/resources/pdfs/978-1-5275-7604-9-sample.pdf",
"kind": "application/pdf",
"date": "2025-05-06",
"tags": ["intelligence", "state-policy", "international-security", "national-security", "geopolitics", "foreign-policy", "security-studies", "military", "crime"],
"metadata": [
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/Book")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("The Role of Intelligence and State Policies in International Security")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/name"),
o = Literal("The Role of Intelligence and State Policies in International Security")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/description"),
o = Literal("A volume by Mehmet Emin Erendor. It is well-known that the understanding of security has changed since the end of the Cold War. This, in turn, has impacted the characteristics of intelligence, as states have needed to improve their security policies with new intelligence tactics. This volume investigates this new state of play in the international arena.")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/author"),
o = Literal("Mehmet Emin Erendor")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/isbn"),
o = Literal("9781527576049")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/numberOfPages"),
o = Literal("220")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("intelligence")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("state policy")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("international security")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("national security")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("geopolitics")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/publication"),
o = Uri("https://trustgraph.ai/pubev/b4352222-5da0-480d-a00f-f7342fe77862")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/b4352222-5da0-480d-a00f-f7342fe77862"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/PublicationEvent")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/b4352222-5da0-480d-a00f-f7342fe77862"),
p = Uri("https://schema.org/description"),
o = Literal("Published by Cambridge Scholars Publishing on October 28, 2021.")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/b4352222-5da0-480d-a00f-f7342fe77862"),
p = Uri("https://schema.org/publishedBy"),
o = Uri("https://trustgraph.ai/org/cambridge-scholars-publishing")
),
Triple(
s = Uri("https://trustgraph.ai/org/cambridge-scholars-publishing"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/Organization")
),
Triple(
s = Uri("https://trustgraph.ai/org/cambridge-scholars-publishing"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("Cambridge Scholars Publishing")
),
Triple(
s = Uri("https://trustgraph.ai/org/cambridge-scholars-publishing"),
p = Uri("https://schema.org/name"),
o = Literal("Cambridge Scholars Publishing")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/b4352222-5da0-480d-a00f-f7342fe77862"),
p = Uri("https://schema.org/startDate"),
o = Literal("2021-10-28")
),
Triple(
s = Uri("https://trustgraph.ai/doc/intelligence-and-state"),
p = Uri("https://schema.org/url"),
o = Uri("https://www.cambridgescholars.com/resources/pdfs/978-1-5275-7604-9-sample.pdf")
)
]
},
{
"id": "https://trustgraph.ai/doc/beyond-vigilant-state",
"title": "Beyond the vigilant state: globalisation and intelligence",
"comments": "This academic paper by Richard J. Aldrich examines the relationship between globalization and intelligence agencies, discussing how intelligence services have adapted to global changes in the post-Cold War era.",
"url": "https://warwick.ac.uk/fac/soc/pais/people/aldrich/publications/beyond.pdf",
"kind": "application/pdf",
"date": datetime.datetime.now().date(),
"tags": ["intelligence", "globalization", "security-studies", "surveillance", "international-relations", "post-cold-war"],
"metadata": [
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/ScholarlyArticle")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("Beyond the vigilant state: globalisation and intelligence"),
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/name"),
o = Literal("Beyond the vigilant state: globalisation and intelligence"),
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/description"),
o = Literal("This academic paper by Richard J. Aldrich examines the relationship between globalization and intelligence agencies, discussing how intelligence services have adapted to global changes in the post-Cold War era."),
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/copyrightNotice"),
o = Literal("(c) British International Studies Association")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/copyrightHolder"),
o = Literal("British International Studies Association")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/author"),
o = Uri("https://trustgraph.ai/person/3a45f8c9-b7d1-42e5-8631-d9f82c4a0e22")
),
Triple(
s = Uri("https://trustgraph.ai/person/3a45f8c9-b7d1-42e5-8631-d9f82c4a0e22"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/Person")
),
Triple(
s = Uri("https://trustgraph.ai/person/3a45f8c9-b7d1-42e5-8631-d9f82c4a0e22"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("Richard J. Aldrich")
),
Triple(
s = Uri("https://trustgraph.ai/person/3a45f8c9-b7d1-42e5-8631-d9f82c4a0e22"),
p = Uri("https://schema.org/name"),
o = Literal("Richard J. Aldrich")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("intelligence")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("globalisation")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("security-studies")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("surveillance")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("international-relations")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/keywords"),
o = Literal("post-cold-war")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/publication"),
o = Uri("https://trustgraph.ai/pubev/75c83dfa-6b2e-4d89-bda1-c8e92f0e3410")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/75c83dfa-6b2e-4d89-bda1-c8e92f0e3410"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/PublicationEvent")
),
Triple(
s = Uri("https://trustgraph.ai/pubev/75c83dfa-6b2e-4d89-bda1-c8e92f0e3410"),
p = Uri("https://schema.org/description"),
o = Literal("Published in Review of International Studies"),
),
Triple(
s = Uri("https://trustgraph.ai/pubev/75c83dfa-6b2e-4d89-bda1-c8e92f0e3410"),
p = Uri("https://schema.org/publishedBy"),
o = Uri("https://trustgraph.ai/org/british-international-studies-association")
),
Triple(
s = Uri("https://trustgraph.ai/org/british-international-studies-association"),
p = Uri("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"),
o = Uri("https://schema.org/Organization")
),
Triple(
s = Uri("https://trustgraph.ai/org/british-international-studies-association"),
p = Uri("http://www.w3.org/2000/01/rdf-schema#label"),
o = Literal("British International Studies Association")
),
Triple(
s = Uri("https://trustgraph.ai/org/british-international-studies-association"),
p = Uri("https://schema.org/name"),
o = Literal("British International Studies Association")
),
Triple(
s = Uri("https://trustgraph.ai/doc/beyond-vigilant-state"),
p = Uri("https://schema.org/url"),
o = Uri("https://warwick.ac.uk/fac/soc/pais/people/aldrich/publications/beyond.pdf")
)
]
}
]
class Loader:
def __init__(
self, url, user
):
self.api = Api(url).library()
self.user = user
def load(self, documents):
for doc in documents:
self.load_doc(doc)
def load_doc(self, doc):
try:
print(doc["title"], ":")
hid = hash(doc["url"])
cache_file = f"doc-cache/{hid}"
if os.path.isfile(cache_file):
print(" (use cache file)")
content = open(cache_file, "rb").read()
else:
print(" downloading...")
resp = session.get(doc["url"])
content = resp.content
open(cache_file, "wb").write(content)
print(" done.")
print(" adding...")
self.api.add_document(
id = doc["id"], metadata = doc["metadata"],
user = self.user, kind = doc["kind"], title = doc["title"],
comments = doc["comments"], tags = doc["tags"],
document = content
)
print(" successful.")
except Exception as e:
print("Failed: {str(e)}", flush=True)
raise e
def main():
parser = argparse.ArgumentParser(
prog='tg-add-library-document',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
args = parser.parse_args()
try:
p = Loader(
url=args.url,
user=args.user,
)
p.load(documents)
except Exception as e:
print("Exception:", e, flush=True)
raise e
if __name__ == "__main__":
main()

View file

@ -0,0 +1,205 @@
"""
Loads a text document into TrustGraph processing by directing to a text
loader queue.
Consider using tg-add-library-document to load
a document, followed by tg-start-library-processing to initiate processing.
"""
import pulsar
from pulsar.schema import JsonSchema
import hashlib
import argparse
import os
import time
import uuid
from trustgraph.api import Api
from trustgraph.knowledge import hash, to_uri
from trustgraph.knowledge import PREF_PUBEV, PREF_DOC, PREF_ORG
from trustgraph.knowledge import Organization, PublicationEvent
from trustgraph.knowledge import DigitalDocument
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
default_collection = 'default'
class Loader:
def __init__(
self,
url,
flow_id,
user,
collection,
metadata,
):
self.api = Api(url).flow().id(flow_id)
self.user = user
self.collection = collection
self.metadata = metadata
def load(self, files):
for file in files:
self.load_file(file)
def load_file(self, file):
try:
path = file
data = open(path, "rb").read()
# Create a SHA256 hash from the data
id = hash(data)
id = to_uri(PREF_DOC, id)
self.metadata.id = id
self.api.load_text(
text=data, id=id, metadata=self.metadata,
user=self.user,
collection=self.collection,
)
print(f"{file}: Loaded successfully.")
except Exception as e:
print(f"{file}: Failed: {str(e)}", flush=True)
raise e
def main():
parser = argparse.ArgumentParser(
prog='tg-load-text',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-C', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument(
'--name', help=f'Document name'
)
parser.add_argument(
'--description', help=f'Document description'
)
parser.add_argument(
'--copyright-notice', help=f'Copyright notice'
)
parser.add_argument(
'--copyright-holder', help=f'Copyright holder'
)
parser.add_argument(
'--copyright-year', help=f'Copyright year'
)
parser.add_argument(
'--license', help=f'Copyright license'
)
parser.add_argument(
'--publication-organization', help=f'Publication organization'
)
parser.add_argument(
'--publication-description', help=f'Publication description'
)
parser.add_argument(
'--publication-date', help=f'Publication date'
)
parser.add_argument(
'--document-url', help=f'Document URL'
)
parser.add_argument(
'--keyword', nargs='+', help=f'Keyword'
)
parser.add_argument(
'--identifier', '--id', help=f'Document ID'
)
parser.add_argument(
'files', nargs='+',
help=f'File to load'
)
args = parser.parse_args()
try:
document = DigitalDocument(
id,
name=args.name,
description=args.description,
copyright_notice=args.copyright_notice,
copyright_holder=args.copyright_holder,
copyright_year=args.copyright_year,
license=args.license,
url=args.document_url,
keywords=args.keyword,
)
if args.publication_organization:
org = Organization(
id=to_uri(PREF_ORG, hash(args.publication_organization)),
name=args.publication_organization,
)
document.publication = PublicationEvent(
id = to_uri(PREF_PUBEV, str(uuid.uuid4())),
organization=org,
description=args.publication_description,
start_date=args.publication_date,
end_date=args.publication_date,
)
p = Loader(
url = args.url,
flow_id = args.flow_id,
user = args.user,
collection = args.collection,
metadata = document,
)
p.load(args.files)
print("All done.")
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,159 @@
"""
Loads triples into the knowledge graph.
"""
import asyncio
import argparse
import os
import time
import rdflib
import json
from websockets.asyncio.client import connect
from trustgraph.log_level import LogLevel
default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/')
default_user = 'trustgraph'
default_collection = 'default'
class Loader:
def __init__(
self,
files,
flow,
user,
collection,
document_id,
url = default_url,
):
if not url.endswith("/"):
url += "/"
url = url + f"api/v1/flow/{flow}/import/triples"
self.url = url
self.files = files
self.user = user
self.collection = collection
self.document_id = document_id
async def run(self):
try:
async with connect(self.url) as ws:
for file in self.files:
await self.load_file(file, ws)
except Exception as e:
print(e, flush=True)
async def load_file(self, file, ws):
g = rdflib.Graph()
g.parse(file, format="turtle")
def Value(value, is_uri):
return { "v": value, "e": is_uri }
triples = []
for e in g:
s = Value(value=str(e[0]), is_uri=True)
p = Value(value=str(e[1]), is_uri=True)
if type(e[2]) == rdflib.term.URIRef:
o = Value(value=str(e[2]), is_uri=True)
else:
o = Value(value=str(e[2]), is_uri=False)
req = {
"metadata": {
"id": self.document_id,
"metadata": [],
"user": self.user,
"collection": self.collection
},
"triples": [
{
"s": s,
"p": p,
"o": o,
}
]
}
await ws.send(json.dumps(req))
def main():
parser = argparse.ArgumentParser(
prog='tg-load-turtle',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-i', '--document-id',
required=True,
help=f'Document ID)',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-C', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
parser.add_argument(
'files', nargs='+',
help=f'Turtle files to load'
)
args = parser.parse_args()
while True:
try:
p = Loader(
document_id = args.document_id,
url = args.api_url,
flow = args.flow_id,
files = args.files,
user = args.user,
collection = args.collection,
)
asyncio.run(p.run())
print("File loaded.")
break
except Exception as e:
print("Exception:", e, flush=True)
print("Will retry...", flush=True)
time.sleep(10)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,57 @@
"""
Uploads a flow class definition. You can take the output of
tg-get-flow-class and load it back in using this utility.
"""
import argparse
import os
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def put_flow_class(url, class_name, config):
api = Api(url)
class_names = api.flow().put_class(class_name, config)
def main():
parser = argparse.ArgumentParser(
prog='tg-put-flow-class',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-n', '--class-name',
help=f'Flow class name',
)
parser.add_argument(
'-c', '--config',
help=f'Initial configuration to load, should be raw JSON',
)
args = parser.parse_args()
try:
put_flow_class(
url=args.api_url,
class_name=args.class_name,
config=json.loads(args.config),
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,181 @@
"""
Uses the agent service to answer a question
"""
import argparse
import os
import textwrap
import uuid
import asyncio
import json
from websockets.asyncio.client import connect
import msgpack
default_url = os.getenv("TRUSTGRAPH_URL", 'ws://localhost:8088/')
default_user = 'trustgraph'
def read_message(unpacked, id, user):
if unpacked[0] == "ge":
msg = unpacked[1]
return "ge", {
"metadata": {
"id": id,
"metadata": msg["m"]["m"],
"user": user,
"collection": "default", # Not used?
},
"entities": [
{
"entity": ent["e"],
"vectors": ent["v"],
}
for ent in msg["e"]
],
}
elif unpacked[0] == "t":
msg = unpacked[1]
return "t", {
"metadata": {
"id": id,
"metadata": msg["m"]["m"],
"user": user,
"collection": "default", # Not used by receiver?
},
"triples": msg["t"],
}
else:
raise RuntimeError("Unpacked unexpected messsage type", unpacked[0])
async def put(url, user, id, input):
if not url.endswith("/"):
url += "/"
url = url + "api/v1/socket"
async with connect(url) as ws:
ge = 0
t = 0
with open(input, "rb") as f:
unpacker = msgpack.Unpacker(f, raw=False)
while True:
try:
unpacked = unpacker.unpack()
except:
break
kind, msg = read_message(unpacked, id, user)
mid = str(uuid.uuid4())
if kind == "ge":
ge += 1
req = json.dumps({
"id": mid,
"service": "knowledge",
"request": {
"operation": "put-kg-core",
"user": user,
"id": id,
"graph-embeddings": msg
}
})
elif kind == "t":
t += 1
req = json.dumps({
"id": mid,
"service": "knowledge",
"request": {
"operation": "put-kg-core",
"user": user,
"id": id,
"triples": msg
}
})
else:
raise RuntimeError("Unexpected message kind", kind)
await ws.send(req)
# Retry loop, wait for right response to come back
while True:
msg = await ws.recv()
msg = json.loads(msg)
if msg["id"] != mid:
continue
if "response" in msg:
if "error" in msg["response"]:
raise RuntimeError(msg["response"]["error"])
break
print(f"Put: {t} triple, {ge} GE messages.")
await ws.close()
def main():
parser = argparse.ArgumentParser(
prog='tg-put-kg-core',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'--id', '--identifier',
required=True,
help=f'Knowledge core ID',
)
parser.add_argument(
'-i', '--input',
required=True,
help=f'Input file'
)
args = parser.parse_args()
try:
asyncio.run(
put(
url = args.url,
user = args.user,
id = args.id,
input = args.input,
)
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,57 @@
"""
Remove a document from the library
"""
import argparse
import os
import uuid
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
def remove_doc(url, user, id):
api = Api(url).library()
api.remove_document(user=user, id=id)
def main():
parser = argparse.ArgumentParser(
prog='tg-remove-library-document',
description=__doc__,
)
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'--identifier', '--id',
required=True,
help=f'Document ID'
)
args = parser.parse_args()
try:
remove_doc(args.url, args.user, args.identifier)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,203 @@
"""
This utility connects to a running TrustGraph through the API and creates
a document embeddings core from the data streaming through the processing
queues. For completeness of data, tg-save-doc-embeds should be initiated
before data loading takes place. The default output format, msgpack
should be used. JSON output format is also available - msgpack produces
a more compact representation, which is also more performant to load.
"""
import aiohttp
import asyncio
import msgpack
import json
import sys
import argparse
import os
import signal
class Running:
def __init__(self): self.running = True
def get(self): return self.running
def stop(self): self.running = False
async def fetch_de(running, queue, user, collection, url):
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
while running.get():
try:
msg = await asyncio.wait_for(ws.receive(), 1)
except:
continue
if msg.type == aiohttp.WSMsgType.TEXT:
data = msg.json()
if user:
if data["metadata"]["user"] != user:
continue
if collection:
if data["metadata"]["collection"] != collection:
continue
await queue.put([
"de",
{
"m": {
"i": data["metadata"]["id"],
"m": data["metadata"]["metadata"],
"u": data["metadata"]["user"],
"c": data["metadata"]["collection"],
},
"c": [
{
"c": chunk["chunk"],
"v": chunk["vectors"],
}
for chunk in data["chunks"]
]
}
])
if msg.type == aiohttp.WSMsgType.ERROR:
print("Error")
break
de_counts = 0
async def stats(running):
global t_counts
global de_counts
while running.get():
await asyncio.sleep(2)
print(
f"Document embeddings: {de_counts:10d}"
)
async def output(running, queue, path, format):
global t_counts
global de_counts
with open(path, "wb") as f:
while running.get():
try:
msg = await asyncio.wait_for(queue.get(), 0.5)
except:
# Hopefully it's TimeoutError. Annoying to match since
# it changed in 3.11.
continue
if format == "msgpack":
f.write(msgpack.packb(msg, use_bin_type=True))
else:
f.write(json.dumps(msg).encode("utf-8"))
if msg[0] == "de":
de_counts += 1
print("Output file closed")
async def run(running, **args):
q = asyncio.Queue()
url = args["url"]
flow_id = args["flow_id"]
de_task = asyncio.create_task(
fetch_de(
running=running,
queue=q, user=args["user"], collection=args["collection"],
url = f"{url}api/v1/flow/{flow_id}/export/document-embeddings"
)
)
output_task = asyncio.create_task(
output(
running=running, queue=q,
path=args["output_file"], format=args["format"],
)
)
stats_task = asyncio.create_task(stats(running))
await output_task
await de_task
await stats_task
print("Exiting")
async def main(running):
parser = argparse.ArgumentParser(
prog='tg-save-kg-core',
description=__doc__,
)
default_url = os.getenv("TRUSTGRAPH_API", "http://localhost:8088/")
default_user = "trustgraph"
collection = "default"
parser.add_argument(
'-u', '--url',
default=default_url,
help=f'TrustGraph API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-o', '--output-file',
# Make it mandatory, difficult to over-write an existing file
required=True,
help=f'Output file'
)
parser.add_argument(
'--format',
default="msgpack",
choices=["msgpack", "json"],
help=f'Output format (default: msgpack)',
)
parser.add_argument(
'--user',
help=f'User ID to filter on (default: no filter)'
)
parser.add_argument(
'--collection',
help=f'Collection ID to filter on (default: no filter)'
)
args = parser.parse_args()
await run(running, **vars(args))
running = Running()
def interrupt(sig, frame):
running.stop()
print('Interrupt')
signal.signal(signal.SIGINT, interrupt)
asyncio.run(main(running))

View file

@ -0,0 +1,109 @@
"""
Configures and registers MCP (Model Context Protocol) tools in the
TrustGraph system.
MCP tools are external services that follow the Model Context Protocol
specification. This script stores MCP tool configurations with:
- id: Unique identifier for the tool
- remote-name: Name used by the MCP server (defaults to id)
- url: MCP server endpoint URL
Configurations are stored in the 'mcp' configuration group and can be
referenced by agent tools using the 'mcp-tool' type.
"""
import argparse
import os
from trustgraph.api import Api, ConfigValue
import textwrap
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def set_mcp_tool(
url : str,
id : str,
remote_name : str,
tool_url : str,
):
api = Api(url).config()
# Store the MCP tool configuration in the 'mcp' group
values = api.put([
ConfigValue(
type="mcp", key=id, value=json.dumps({
"remote-name": remote_name,
"url": tool_url,
})
)
])
def main():
parser = argparse.ArgumentParser(
prog='tg-set-mcp-tool',
description=__doc__,
epilog=textwrap.dedent('''
MCP tools are configured with just a name and URL. The URL should point
to the MCP server endpoint that provides the tool functionality.
Examples:
%(prog)s --id weather --tool-url "http://localhost:3000/weather"
%(prog)s --id calculator --tool-url "http://mcp-tools.example.com/calc"
''').strip(),
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-i', '--id',
required=True,
help='MCP tool identifier',
)
parser.add_argument(
'-r', '--remote-name',
required=False,
help='Remote MCP tool name (defaults to --id if not specified)',
)
parser.add_argument(
'--tool-url',
required=True,
help='MCP tool URL endpoint',
)
args = parser.parse_args()
try:
if not args.id:
raise RuntimeError("Must specify --id for MCP tool")
if not args.tool_url:
raise RuntimeError("Must specify --tool-url for MCP tool")
if args.remote_name:
remote_name = args.remote_name
else:
remote_name = args.id
set_mcp_tool(
url=args.api_url,
id=args.id,
remote_name=remote_name,
tool_url=args.tool_url
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,141 @@
"""
Sets a prompt template.
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey, ConfigValue
import json
import tabulate
import textwrap
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def set_system(url, system):
api = Api(url).config()
api.put([
ConfigValue(type="prompt", key="system", value=json.dumps(system))
])
print("System prompt set.")
def set_prompt(url, id, prompt, response, schema):
api = Api(url).config()
values = api.get([
ConfigKey(type="prompt", key="template-index")
])
ix = json.loads(values[0].value)
object = {
"id": id,
"prompt": prompt,
}
if response:
object["response-type"] = response
else:
object["response-type"] = "text"
if schema:
object["schema"] = schema
if id not in ix:
ix.append(id)
values = api.put([
ConfigValue(
type="prompt", key="template-index", value=json.dumps(ix)
),
ConfigValue(
type="prompt", key=f"template.{id}", value=json.dumps(object)
)
])
print("Prompt set.")
def main():
parser = argparse.ArgumentParser(
prog='tg-set-prompt',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'--id',
help=f'Prompt ID',
)
parser.add_argument(
'--response',
help=f'Response form, should be one of: text json',
)
parser.add_argument(
'--schema',
help=f'JSON schema, for JSON response form',
)
parser.add_argument(
'--prompt',
help=f'Prompt template',
)
parser.add_argument(
'--system',
help=f'System prompt',
)
args = parser.parse_args()
try:
if args.system:
if args.id or args.prompt or args.schema or args.response:
raise RuntimeError("Can't use --system with other args")
set_system(
url=args.api_url, system=args.system
)
else:
if args.id is None:
raise RuntimeError("Must specify --id for prompt")
if args.prompt is None:
raise RuntimeError("Must specify --prompt for prompt")
if args.response:
if args.response not in ["text", "json"]:
raise RuntimeError("Response must be one of: text json")
if args.schema:
try:
schobj = json.loads(args.schema)
except:
raise RuntimeError("JSON schema must be valid JSON")
else:
schobj = None
set_prompt(
url=args.api_url, id=args.id, prompt=args.prompt,
response=args.response, schema=schobj
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,109 @@
"""
Sets a model's token costs.
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey, ConfigValue
import json
import tabulate
import textwrap
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def set_costs(api_url, model, input_costs, output_costs):
api = Api(api_url).config()
api.put([
ConfigValue(
type="token-costs", key=model,
value=json.dumps({
"input_price": input_costs / 1000000,
"output_price": output_costs / 1000000,
})
),
])
def set_prompt(url, id, prompt, response, schema):
api = Api(url)
values = api.config_get([
ConfigKey(type="prompt", key="template-index")
])
ix = json.loads(values[0].value)
object = {
"id": id,
"prompt": prompt,
}
if response:
object["response-type"] = response
else:
object["response-type"] = "text"
if schema:
object["schema"] = schema
if id not in ix:
ix.append(id)
values = api.config_put([
ConfigValue(
type="prompt", key="template-index", value=json.dumps(ix)
),
ConfigValue(
type="prompt", key=f"template.{id}", value=json.dumps(object)
)
])
print("Prompt set.")
def main():
parser = argparse.ArgumentParser(
prog='tg-set-token-costs',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'--model',
required=True,
help=f'Model ID',
)
parser.add_argument(
'-i', '--input-costs',
required=True,
type=float,
help=f'Input costs in $ per 1M tokens',
)
parser.add_argument(
'-o', '--output-costs',
required=True,
type=float,
help=f'Input costs in $ per 1M tokens',
)
args = parser.parse_args()
try:
set_costs(**vars(args))
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,224 @@
"""
Configures and registers tools in the TrustGraph system.
This script allows you to define agent tools with various types including:
- knowledge-query: Query knowledge bases
- text-completion: Text generation
- mcp-tool: Reference to MCP (Model Context Protocol) tools
- prompt: Prompt template execution
Tools are stored in the 'tool' configuration group and can include
argument specifications for parameterized execution.
"""
from typing import List
import argparse
import os
from trustgraph.api import Api, ConfigKey, ConfigValue
import json
import tabulate
import textwrap
import dataclasses
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
@dataclasses.dataclass
class Argument:
name : str
type : str
description : str
@staticmethod
def parse(s):
parts = s.split(":")
if len(parts) != 3:
raise RuntimeError(
"Arguments should be form name:type:description"
)
valid_types = [
"string", "number",
]
if parts[1] not in valid_types:
raise RuntimeError(
f"Type {parts[1]} invalid, use: " +
", ".join(valid_types)
)
return Argument(name=parts[0], type=parts[1], description=parts[2])
def set_tool(
url : str,
id : str,
name : str,
description : str,
type : str,
mcp_tool : str,
collection : str,
template : str,
arguments : List[Argument],
):
api = Api(url).config()
values = api.get([
ConfigKey(type="agent", key="tool-index")
])
object = {
"name": name,
"description": description,
"type": type,
}
if mcp_tool: object["mcp-tool"] = mcp_tool
if collection: object["collection"] = collection
if template: object["template"] = template
if arguments:
object["arguments"] = [
{
"name": a.name,
"type": a.type,
"description": a.description,
}
for a in arguments
]
values = api.put([
ConfigValue(
type="tool", key=f"{id}", value=json.dumps(object)
)
])
print("Tool set.")
def main():
parser = argparse.ArgumentParser(
prog='tg-set-tool',
description=__doc__,
epilog=textwrap.dedent('''
Valid tool types:
knowledge-query - Query knowledge bases
text-completion - Text completion/generation
mcp-tool - Model Control Protocol tool
prompt - Prompt template query
Valid argument types:
string - String/text parameter
number - Numeric parameter
Examples:
%(prog)s --id weather --name "Weather lookup" \\
--type knowledge-query \\
--description "Get weather information" \\
--argument location:string:"Location to query" \\
--argument units:string:"Temperature units (C/F)"
%(prog)s --id calculator --name "Calculator" --type mcp-tool \\
--description "Perform calculations" \\
--argument expression:string:"Mathematical expression"
''').strip(),
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'--id',
help=f'Unique tool identifier',
)
parser.add_argument(
'--name',
help=f'Human-readable tool name',
)
parser.add_argument(
'--description',
help=f'Detailed description of what the tool does',
)
parser.add_argument(
'--type',
help=f'Tool type, one of: knowledge-query, text-completion, mcp-tool, prompt',
)
parser.add_argument(
'--mcp-tool',
help=f'For MCP type: ID of MCP tool configuration (as defined by tg-set-mcp-tool)',
)
parser.add_argument(
'--collection',
help=f'For knowledge-query type: collection to query',
)
parser.add_argument(
'--template',
help=f'For prompt type: template ID to use',
)
parser.add_argument(
'--argument',
nargs="*",
help=f'Tool arguments in the form: name:type:description (can specify multiple)',
)
args = parser.parse_args()
try:
valid_types = [
"knowledge-query", "text-completion", "mcp-tool", "prompt"
]
if args.id is None:
raise RuntimeError("Must specify --id for tool")
if args.name is None:
raise RuntimeError("Must specify --name for tool")
if args.type:
if args.type not in valid_types:
raise RuntimeError(
"Type must be one of: " + ", ".join(valid_types)
)
mcp_tool = args.mcp_tool
if args.argument:
arguments = [
Argument.parse(a)
for a in args.argument
]
else:
arguments = []
set_tool(
url=args.api_url,
id=args.id,
name=args.name,
description=args.description,
type=args.type,
mcp_tool=mcp_tool,
collection=args.collection,
template=args.template,
arguments=arguments,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,47 @@
"""
Dumps out the current configuration
"""
import argparse
import os
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def show_config(url):
api = Api(url).config()
config, version = api.all()
print("Version:", version)
print(json.dumps(config, indent=4))
def main():
parser = argparse.ArgumentParser(
prog='tg-show-config',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
show_config(
url=args.api_url,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,67 @@
"""
Shows all defined flow classes.
"""
import argparse
import os
import tabulate
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def show_flow_classes(url):
api = Api(url).flow()
class_names = api.list_classes()
if len(class_names) == 0:
print("No flows.")
return
classes = []
for class_name in class_names:
cls = api.get_class(class_name)
classes.append((
class_name,
cls.get("description", ""),
", ".join(cls.get("tags", [])),
))
print(tabulate.tabulate(
classes,
tablefmt="pretty",
maxcolwidths=[None, 40, 20],
stralign="left",
headers = ["flow class", "description", "tags"],
))
def main():
parser = argparse.ArgumentParser(
prog='tg-show-flow-classes',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
show_flow_classes(
url=args.api_url,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,91 @@
"""
Dump out a flow's processor states
"""
import requests
import argparse
from trustgraph.api import Api
import os
default_metrics_url = "http://localhost:8088/api/metrics"
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def dump_status(metrics_url, api_url, flow_id):
api = Api(api_url).flow()
flow = api.get(flow_id)
class_name = flow["class-name"]
print()
print(f"Flow {flow_id}")
show_processors(metrics_url, flow_id)
print()
print(f"Class {class_name}")
show_processors(metrics_url, class_name)
print()
def show_processors(metrics_url, flow_label):
url = f"{metrics_url}/query"
expr = f"consumer_state=\"running\",flow=\"{flow_label}\""
params = {
"query": "consumer_state{" + expr + "}"
}
resp = requests.get(url, params=params)
obj = resp.json()
tbl = [
[
m["metric"]["job"],
"\U0001f49a" if int(m["value"][1]) > 0 else "\U0000274c"
]
for m in obj["data"]["result"]
]
for row in tbl:
print(f"- {row[0]:30} {row[1]}")
def main():
parser = argparse.ArgumentParser(
prog='tg-show-flow-state',
description=__doc__,
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-m', '--metrics-url',
default=default_metrics_url,
help=f'Metrics URL (default: {default_metrics_url})',
)
args = parser.parse_args()
try:
dump_status(args.metrics_url, args.api_url, args.flow_id)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,112 @@
"""
Shows configured flows.
"""
import argparse
import os
import tabulate
from trustgraph.api import Api, ConfigKey
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def get_interface(config_api, i):
key = ConfigKey("interface-descriptions", i)
value = config_api.get([key])[0].value
return json.loads(value)
def describe_interfaces(intdefs, flow):
intfs = flow.get("interfaces", {})
lst = []
for k, v in intdefs.items():
if intdefs[k].get("visible", False):
label = intdefs[k].get("description", k)
kind = intdefs[k].get("kind", None)
if kind == "request-response":
req = intfs[k]["request"]
resp = intfs[k]["request"]
lst.append(f"{k} request: {req}")
lst.append(f"{k} response: {resp}")
if kind == "send":
q = intfs[k]
lst.append(f"{k}: {q}")
return "\n".join(lst)
def show_flows(url):
api = Api(url)
config_api = api.config()
flow_api = api.flow()
interface_names = config_api.list("interface-descriptions")
interface_defs = {
i: get_interface(config_api, i)
for i in interface_names
}
flow_ids = flow_api.list()
if len(flow_ids) == 0:
print("No flows.")
return
flows = []
for id in flow_ids:
flow = flow_api.get(id)
table = []
table.append(("id", id))
table.append(("class", flow.get("class-name", "")))
table.append(("desc", flow.get("description", "")))
table.append(("queue", describe_interfaces(interface_defs, flow)))
print(tabulate.tabulate(
table,
tablefmt="pretty",
stralign="left",
))
print()
def main():
parser = argparse.ArgumentParser(
prog='tg-show-flows',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
show_flows(
url=args.api_url,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,72 @@
"""
Connects to the graph query service and dumps all graph edges.
"""
import argparse
import os
from trustgraph.api import Api
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = 'trustgraph'
default_collection = 'default'
def show_graph(url, flow_id, user, collection):
api = Api(url).flow().id(flow_id)
rows = api.triples_query(
user=user, collection=collection,
s=None, p=None, o=None, limit=10_000,
)
for row in rows:
print(row.s, row.p, row.o)
def main():
parser = argparse.ArgumentParser(
prog='tg-show-graph',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-f', '--flow-id',
default="default",
help=f'Flow ID (default: default)'
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-C', '--collection',
default=default_collection,
help=f'Collection ID (default: {default_collection})'
)
args = parser.parse_args()
try:
show_graph(
url = args.api_url,
flow_id = args.flow_id,
user = args.user,
collection = args.collection,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,57 @@
"""
Shows knowledge cores
"""
import argparse
import os
import tabulate
from trustgraph.api import Api, ConfigKey
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def show_cores(url, user):
api = Api(url).knowledge()
ids = api.list_kg_cores()
if len(ids) == 0:
print("No knowledge cores.")
for id in ids:
print(id)
def main():
parser = argparse.ArgumentParser(
prog='tg-show-flows',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default="trustgraph",
help='API URL (default: trustgraph)',
)
args = parser.parse_args()
try:
show_cores(
url=args.api_url, user=args.user
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,74 @@
"""
Shows all loaded library documents
"""
import argparse
import os
import tabulate
from trustgraph.api import Api, ConfigKey
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = "trustgraph"
def show_docs(url, user):
api = Api(url).library()
docs = api.get_documents(user=user)
if len(docs) == 0:
print("No documents.")
return
for doc in docs:
table = []
table.append(("id", doc.id))
table.append(("time", doc.time))
table.append(("title", doc.title))
table.append(("kind", doc.kind))
table.append(("note", doc.comments))
table.append(("tags", ", ".join(doc.tags)))
print(tabulate.tabulate(
table,
tablefmt="pretty",
stralign="left",
maxcolwidths=[None, 67],
))
print()
def main():
parser = argparse.ArgumentParser(
prog='tg-show-library-documents',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
args = parser.parse_args()
try:
show_docs(
url = args.api_url, user = args.user
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,73 @@
"""
"""
import argparse
import os
import tabulate
from trustgraph.api import Api, ConfigKey
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = "trustgraph"
def show_procs(url, user):
api = Api(url).library()
procs = api.get_processings(user = user)
if len(procs) == 0:
print("No processing objects.")
return
for proc in procs:
table = []
table.append(("id", proc.id))
table.append(("document-id", proc.document_id))
table.append(("time", proc.time))
table.append(("flow", proc.flow))
table.append(("collection", proc.collection))
table.append(("tags", ", ".join(proc.tags)))
print(tabulate.tabulate(
table,
tablefmt="pretty",
stralign="left",
maxcolwidths=[None, 50],
))
print()
def main():
parser = argparse.ArgumentParser(
prog='tg-show-library-processing',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
args = parser.parse_args()
try:
show_procs(
url = args.api_url, user = args.user
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,67 @@
"""
Displays the current MCP (Model Context Protocol) tool configuration
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey
import json
import tabulate
import textwrap
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def show_config(url):
api = Api(url).config()
values = api.get_values(type="mcp")
for n, value in enumerate(values):
data = json.loads(value.value)
table = []
table.append(("id", value.key))
table.append(("remote-name", data["remote-name"]))
table.append(("url", data["url"]))
print()
print(tabulate.tabulate(
table,
tablefmt="pretty",
maxcolwidths=[None, 70],
stralign="left"
))
print()
def main():
parser = argparse.ArgumentParser(
prog='tg-show-mcp-tools',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
show_config(
url=args.api_url,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,53 @@
"""
Dump out TrustGraph processor states.
"""
import requests
import argparse
default_metrics_url = "http://localhost:8088/api/metrics"
def dump_status(url):
url = f"{url}/query?query=processor_info"
resp = requests.get(url)
obj = resp.json()
tbl = [
[
m["metric"]["job"],
"\U0001f49a"
]
for m in obj["data"]["result"]
]
for row in tbl:
print(f" {row[0]:30} {row[1]}")
def main():
parser = argparse.ArgumentParser(
prog='tg-show-processor-state',
description=__doc__,
)
parser.add_argument(
'-m', '--metrics-url',
default=default_metrics_url,
help=f'Metrics URL (default: {default_metrics_url})',
)
args = parser.parse_args()
try:
dump_status(args.metrics_url)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,94 @@
"""
Dumps out the current prompts
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey
import json
import tabulate
import textwrap
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def show_config(url):
api = Api(url).config()
values = api.get([
ConfigKey(type="prompt", key="system"),
ConfigKey(type="prompt", key="template-index")
])
system = json.loads(values[0].value)
ix = json.loads(values[1].value)
values = api.get([
ConfigKey(type="prompt", key=f"template.{v}")
for v in ix
])
print()
print("System prompt:")
print(tabulate.tabulate(
[["prompt", system]],
tablefmt="pretty",
maxcolwidths=[None, 70],
stralign="left"
))
for n, key in enumerate(ix):
data = json.loads(values[n].value)
table = []
table.append(("prompt", data["prompt"]))
if "response-type" in data:
table.append(("response", data["response-type"]))
if "schema" in data:
table.append(("schema", data["schema"]))
print()
print(key + ":")
print(tabulate.tabulate(
table,
tablefmt="pretty",
maxcolwidths=[None, 70],
stralign="left"
))
print()
def main():
parser = argparse.ArgumentParser(
prog='tg-show-prompts',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
show_config(
url=args.api_url,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,77 @@
"""
Dumps out token cost configuration
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey
import json
import tabulate
import textwrap
tabulate.PRESERVE_WHITESPACE = True
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def show_config(url):
api = Api(url).config()
models = api.list("token-costs")
costs = []
def fmt(x):
return "{price:.3f}".format(price = 1000000 * x)
for model in models:
try:
values = json.loads(api.get([
ConfigKey(type="token-costs", key=model),
])[0].value)
costs.append((
model,
fmt(values.get("input_price")),
fmt(values.get("output_price")),
))
except:
costs.append((
model, "-", "-"
))
print(tabulate.tabulate(
costs,
tablefmt = "pretty",
headers = ["model", "input, $/Mt", "output, $/Mt"],
colalign = ["left", "right", "right"],
# stralign = ["left", "decimal", "decimal"]
))
def main():
parser = argparse.ArgumentParser(
prog='tg-show-token-costs',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
show_config(
url=args.api_url,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,107 @@
"""
Dump out a stream of token rates, input, output and total. This is averaged
across the time since tg-show-token-rate is started.
"""
import requests
import argparse
import json
import time
default_metrics_url = "http://localhost:8088/api/metrics"
class Collate:
def look(self, data):
return sum(
[
float(x["value"][1])
for x in data["data"]["result"]
]
)
def __init__(self, data):
self.last = self.look(data)
self.total = 0
self.time = 0
def record(self, data, time):
value = self.look(data)
delta = value - self.last
self.last = value
self.total += delta
self.time += time
return delta/time, self.total/self.time
def dump_status(metrics_url, number_samples, period):
input_url = f"{metrics_url}/query?query=input_tokens_total"
output_url = f"{metrics_url}/query?query=output_tokens_total"
resp = requests.get(input_url)
obj = resp.json()
input = Collate(obj)
resp = requests.get(output_url)
obj = resp.json()
output = Collate(obj)
print(f"{'Input':>10s} {'Output':>10s} {'Total':>10s}")
print(f"{'-----':>10s} {'------':>10s} {'-----':>10s}")
for i in range(number_samples):
time.sleep(period)
resp = requests.get(input_url)
obj = resp.json()
inr, inl = input.record(obj, period)
resp = requests.get(output_url)
obj = resp.json()
outr, outl = output.record(obj, period)
print(f"{inl:10.1f} {outl:10.1f} {inl+outl:10.1f}")
def main():
parser = argparse.ArgumentParser(
prog='tg-show-processor-state',
description=__doc__,
)
parser.add_argument(
'-m', '--metrics-url',
default=default_metrics_url,
help=f'Metrics URL (default: {default_metrics_url})',
)
parser.add_argument(
'-p', '--period',
type=int,
default=1,
help=f'Metrics period (default: 1)',
)
parser.add_argument(
'-n', '--number-samples',
type=int,
default=100,
help=f'Metrics period (default: 100)',
)
args = parser.parse_args()
try:
dump_status(**vars(args))
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,91 @@
"""
Displays the current agent tool configurations
Shows all configured tools including their types:
- knowledge-query: Tools that query knowledge bases
- text-completion: Tools for text generation
- mcp-tool: References to MCP (Model Context Protocol) tools
- prompt: Tools that execute prompt templates
"""
import argparse
import os
from trustgraph.api import Api, ConfigKey
import json
import tabulate
import textwrap
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def show_config(url):
api = Api(url).config()
values = api.get_values(type="tool")
for item in values:
id = item.key
data = json.loads(item.value)
tp = data["type"]
table = []
table.append(("id", id))
table.append(("name", data["name"]))
table.append(("description", data["description"]))
table.append(("type", tp))
if tp == "mcp-tool":
table.append(("mcp-tool", data["mcp-tool"]))
if tp == "knowledge-query":
table.append(("collection", data["collection"]))
if tp == "prompt":
table.append(("template", data["template"]))
for n, arg in enumerate(data["arguments"]):
table.append((
f"arg {n}",
f"{arg['name']}: {arg['type']}\n{arg['description']}"
))
print()
print(tabulate.tabulate(
table,
tablefmt="pretty",
maxcolwidths=[None, 70],
stralign="left"
))
print()
def main():
parser = argparse.ArgumentParser(
prog='tg-show-tools',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
args = parser.parse_args()
try:
show_config(
url=args.api_url,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,70 @@
"""
Starts a processing flow using a defined flow class
"""
import argparse
import os
import tabulate
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def start_flow(url, class_name, flow_id, description):
api = Api(url).flow()
api.start(
class_name = class_name,
id = flow_id,
description = description,
)
def main():
parser = argparse.ArgumentParser(
prog='tg-start-flow',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-n', '--class-name',
required=True,
help=f'Flow class name',
)
parser.add_argument(
'-i', '--flow-id',
required=True,
help=f'Flow ID',
)
parser.add_argument(
'-d', '--description',
required=True,
help=f'Flow description',
)
args = parser.parse_args()
try:
start_flow(
url = args.api_url,
class_name = args.class_name,
flow_id = args.flow_id,
description = args.description,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,101 @@
"""
Submits a library document for processing
"""
import argparse
import os
import tabulate
from trustgraph.api import Api, ConfigKey
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = "trustgraph"
def start_processing(
url, user, document_id, id, flow, collection, tags
):
api = Api(url).library()
if tags:
tags = tags.split(",")
else:
tags = []
api.start_processing(
id = id,
document_id = document_id,
flow = flow,
user = user,
collection = collection,
tags = tags
)
def main():
parser = argparse.ArgumentParser(
prog='tg-start-library-processing',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'-i', '--flow-id',
default="default",
help=f'Flow ID (default: default)',
)
parser.add_argument(
'-d', '--document-id',
required=True,
help=f'Document ID',
)
parser.add_argument(
'--id', '--processing-id',
required=True,
help=f'Processing ID',
)
parser.add_argument(
'--collection',
default='default',
help=f'Collection (default: default)'
)
parser.add_argument(
'--tags',
help=f'Tags, command separated'
)
args = parser.parse_args()
try:
start_processing(
url = args.api_url,
user = args.user,
document_id = args.document_id,
id = args.id,
flow = args.flow_id,
collection = args.collection,
tags = args.tags
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,52 @@
"""
Stops a processing flow.
"""
import argparse
import os
import tabulate
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
def stop_flow(url, flow_id):
api = Api(url).flow()
api.stop(id = flow_id)
def main():
parser = argparse.ArgumentParser(
prog='tg-stop-flow',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-i', '--flow-id',
required=True,
help=f'Flow ID',
)
args = parser.parse_args()
try:
stop_flow(
url=args.api_url,
flow_id=args.flow_id,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,63 @@
"""
Removes a library document processing record. This is just a record of
procesing, it doesn't stop in-flight processing at the moment.
"""
import argparse
import os
import tabulate
from trustgraph.api import Api, ConfigKey
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_user = "trustgraph"
def stop_processing(
url, user, id
):
api = Api(url).library()
api.stop_processing(user = user, id = id)
def main():
parser = argparse.ArgumentParser(
prog='tg-stop-library-processing',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default=default_user,
help=f'User ID (default: {default_user})'
)
parser.add_argument(
'--id', '--processing-id',
required=True,
help=f'Processing ID',
)
args = parser.parse_args()
try:
stop_processing(
url = args.api_url,
user = args.user,
id = args.id,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,70 @@
"""
Starts a load operation on a knowledge core which is already stored by
the knowledge manager. You could load a core with tg-put-kg-core and then
run this utility.
"""
import argparse
import os
import tabulate
from trustgraph.api import Api
import json
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
default_flow = "default"
default_collection = "default"
def unload_kg_core(url, user, id, flow):
api = Api(url).knowledge()
class_names = api.unload_kg_core(user = user, id = id, flow=flow)
def main():
parser = argparse.ArgumentParser(
prog='tg-delete-flow-class',
description=__doc__,
)
parser.add_argument(
'-u', '--api-url',
default=default_url,
help=f'API URL (default: {default_url})',
)
parser.add_argument(
'-U', '--user',
default="trustgraph",
help='API URL (default: trustgraph)',
)
parser.add_argument(
'--id', '--identifier',
required=True,
help=f'Knowledge core ID',
)
parser.add_argument(
'-f', '--flow-id',
default=default_flow,
help=f'Flow ID (default: {default_flow}',
)
args = parser.parse_args()
try:
unload_kg_core(
url=args.api_url,
user=args.user,
id=args.id,
flow=args.flow_id,
)
except Exception as e:
print("Exception:", e, flush=True)
if __name__ == "__main__":
main()