Feature/library cli (#363)

* Major Python client API rework, break down API & colossal class

* Complete rest of library API

* Library CLI support
This commit is contained in:
cybermaggedon 2025-05-05 11:09:18 +01:00 committed by GitHub
parent 8146f0f2ff
commit 844547ab5f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 1413 additions and 495 deletions

View file

@ -1,33 +1,14 @@
import requests
import json
import dataclasses
import base64
import time
from trustgraph.knowledge import hash, Uri, Literal
class ProtocolException(Exception):
pass
class ApplicationException(Exception):
pass
@dataclasses.dataclass
class Triple:
s : str
p : str
o : str
@dataclasses.dataclass
class ConfigKey:
type : str
key : str
@dataclasses.dataclass
class ConfigValue:
type : str
key : str
value : str
from . library import Library
from . flow import Flow
from . config import Config
from . exceptions import *
from . types import *
def check_error(response):
@ -52,13 +33,19 @@ class Api:
self.url += "api/v1/"
def flow(self, flow="0000"):
return Flow(api=self, flow=flow)
def flow(self):
return Flow(api=self)
def config(self):
return Config(api=self)
def request(self, path, request):
url = f"{self.url}{path}"
# print("uri:", url)
# print(json.dumps(request, indent=4))
# Invoke the API, input is passed as JSON
resp = requests.post(url, json=request)
@ -66,6 +53,8 @@ class Api:
if resp.status_code != 200:
raise ProtocolException(f"Status code {resp.status_code}")
# print(resp.text)
try:
# Parse the response as JSON
object = resp.json()
@ -76,418 +65,5 @@ class Api:
return object
def config_all(self):
# The input consists of system and prompt strings
input = {
"operation": "config"
}
object = self.request("config", input)
try:
return object["config"], object["version"]
except:
raise ProtocolException(f"Response not formatted correctly")
def config_get(self, keys):
# The input consists of system and prompt strings
input = {
"operation": "get",
"keys": [
{ "type": k.type, "key": k.key }
for k in keys
]
}
object = self.request("config", input)
try:
return [
ConfigValue(
type = v["type"],
key = v["key"],
value = v["value"]
)
for v in object["values"]
]
except:
raise ProtocolException(f"Response not formatted correctly")
def config_put(self, values):
# The input consists of system and prompt strings
input = {
"operation": "put",
"values": [
{ "type": v.type, "key": v.key, "value": v.value }
for v in values
]
}
self.request("config", input)
def config_list(self, type):
# The input consists of system and prompt strings
input = {
"operation": "list",
"type": type,
}
return self.request("config", input)["directory"]
def config_getvalues(self, type):
# The input consists of system and prompt strings
input = {
"operation": "getvalues",
"type": type,
}
object = self.request("config", input)["directory"]
try:
return [
ConfigValue(
type = v["type"],
key = v["key"],
value = v["value"]
)
for v in object["values"]
]
except:
raise ProtocolException(f"Response not formatted correctly")
def flow_list_classes(self):
# The input consists of system and prompt strings
input = {
"operation": "list-classes",
}
return self.request("flow", input)["class-names"]
def flow_get_class(self, class_name):
# The input consists of system and prompt strings
input = {
"operation": "get-class",
"class-name": class_name,
}
return json.loads(self.request("flow", input)["class-definition"])
def flow_put_class(self, class_name, definition):
# The input consists of system and prompt strings
input = {
"operation": "put-class",
"class-name": class_name,
"class-definition": json.dumps(definition),
}
self.request("flow", input)
def flow_delete_class(self, class_name):
# The input consists of system and prompt strings
input = {
"operation": "delete-class",
"class-name": class_name,
}
self.request("flow", input)
def flow_list(self):
# The input consists of system and prompt strings
input = {
"operation": "list-flows",
}
return self.request("flow", input)["flow-ids"]
def flow_get(self, id):
# The input consists of system and prompt strings
input = {
"operation": "get-flow",
"flow-id": id,
}
return json.loads(self.request("flow", input)["flow"])
def flow_start(self, class_name, id, description):
# The input consists of system and prompt strings
input = {
"operation": "start-flow",
"flow-id": id,
"class-name": class_name,
"description": description,
}
self.request("flow", input)
def flow_stop(self, id):
# The input consists of system and prompt strings
input = {
"operation": "stop-flow",
"flow-id": id,
}
self.request("flow", input)
class Flow:
def __init__(self, api, flow):
self.api = api
self.flow = flow
def text_completion(self, system, prompt):
# The input consists of system and prompt strings
input = {
"system": system,
"prompt": prompt
}
return self.api.request(
f"flow/{self.flow}/service/text-completion",
input
)["response"]
def agent(self, question):
# The input consists of a question
input = {
"question": question
}
return self.api.request(
f"flow/{self.flow}/service/agent",
input
)["answer"]
def graph_rag(
self, question, user="trustgraph", collection="default",
entity_limit=50, triple_limit=30, max_subgraph_size=150,
max_path_length=2,
):
# The input consists of a question
input = {
"query": question,
"user": user,
"collection": collection,
"entity-limit": entity_limit,
"triple-limit": triple_limit,
"max-subgraph-size": max_subgraph_size,
"max-path-length": max_path_length,
}
return self.api.request(
f"flow/{self.flow}/service/graph-rag",
input
)["response"]
def document_rag(
self, question, user="trustgraph", collection="default",
doc_limit=10,
):
# The input consists of a question
input = {
"query": question,
"user": user,
"collection": collection,
"doc-limit": doc_limit,
}
return self.api.request(
f"flow/{self.flow}/service/document-rag",
input
)["response"]
def embeddings(self, text):
# The input consists of a text block
input = {
"text": text
}
return self.api.request(
f"flow/{self.flow}/service/embeddings",
input
)["vectors"]
def prompt(self, id, variables):
# The input consists of system and prompt strings
input = {
"id": id,
"variables": variables
}
object = self.api.request(
f"flow/{self.flow}/service/prompt",
input
)
if "text" in object:
return object["text"]
if "object" in object:
try:
return json.loads(object["object"])
except Exception as e:
raise ProtocolException(
"Returned object not well-formed JSON"
)
raise ProtocolException("Response not formatted correctly")
def triples_query(
self, s=None, p=None, o=None,
user=None, collection=None, limit=10000
):
# The input consists of system and prompt strings
input = {
"limit": limit
}
if user:
input["user"] = user
if collection:
input["collection"] = collection
if s:
if not isinstance(s, Uri):
raise RuntimeError("s must be Uri")
input["s"] = { "v": str(s), "e": isinstance(s, Uri), }
if p:
if not isinstance(p, Uri):
raise RuntimeError("p must be Uri")
input["p"] = { "v": str(p), "e": isinstance(p, Uri), }
if o:
if not isinstance(o, Uri) and not isinstance(o, Literal):
raise RuntimeError("o must be Uri or Literal")
input["o"] = { "v": str(o), "e": isinstance(o, Uri), }
object = self.api.request(
f"flow/{self.flow}/service/triples",
input
)
def to_value(x):
if x["e"]: return Uri(x["v"])
return Literal(x["v"])
return [
Triple(
s=to_value(t["s"]),
p=to_value(t["p"]),
o=to_value(t["o"])
)
for t in object["response"]
]
def load_document(
self, document, id=None, metadata=None, user=None,
collection=None,
):
if id is None:
if metadata is not None:
# Situation makes no sense. What can the metadata possibly
# mean if the caller doesn't know the document ID.
# Metadata should relate to the document by ID
raise RuntimeError("Can't specify metadata without id")
id = hash(document)
triples = []
def emit(t):
triples.append(t)
if metadata:
metadata.emit(
lambda t: triples.append({
"s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
})
)
input = {
"id": id,
"metadata": triples,
"data": base64.b64encode(document).decode("utf-8"),
}
if user:
input["user"] = user
if collection:
input["collection"] = collection
return self.api.request(
f"flow/{self.flow}/service/document-load",
input
)
def load_text(
self, text, id=None, metadata=None, charset="utf-8",
user=None, collection=None,
):
if id is None:
if metadata is not None:
# Situation makes no sense. What can the metadata possibly
# mean if the caller doesn't know the document ID.
# Metadata should relate to the document by ID
raise RuntimeError("Can't specify metadata without id")
id = hash(text)
triples = []
if metadata:
metadata.emit(
lambda t: triples.append({
"s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
})
)
input = {
"id": id,
"metadata": triples,
"charset": charset,
"text": base64.b64encode(text).decode("utf-8"),
}
if user:
input["user"] = user
if collection:
input["collection"] = collection
return self.api.request(
f"flow/{self.flow}/service/text-load",
input
)
def library(self):
return Library(self)

View file

@ -0,0 +1,97 @@
from . exceptions import *
from . types import ConfigValue
class Config:
def __init__(self, api):
self.api = api
def request(self, request):
return self.api.request("config", request)
def get(self, keys):
# The input consists of system and prompt strings
input = {
"operation": "get",
"keys": [
{ "type": k.type, "key": k.key }
for k in keys
]
}
object = self.request(input)
try:
return [
ConfigValue(
type = v["type"],
key = v["key"],
value = v["value"]
)
for v in object["values"]
]
except Exception as e:
print(e)
raise ProtocolException("Response not formatted correctly")
def put(self, values):
# The input consists of system and prompt strings
input = {
"operation": "put",
"values": [
{ "type": v.type, "key": v.key, "value": v.value }
for v in values
]
}
self.request(input)
def list(self, type):
# The input consists of system and prompt strings
input = {
"operation": "list",
"type": type,
}
return self.request(input)["directory"]
def get_values(self, type):
# The input consists of system and prompt strings
input = {
"operation": "getvalues",
"type": type,
}
object = self.request(input)["directory"]
try:
return [
ConfigValue(
type = v["type"],
key = v["key"],
value = v["value"]
)
for v in object["values"]
]
except:
raise ProtocolException(f"Response not formatted correctly")
def all(self):
# The input consists of system and prompt strings
input = {
"operation": "config"
}
object = self.request(input)
try:
return object["config"], object["version"]
except:
raise ProtocolException(f"Response not formatted correctly")

View file

@ -0,0 +1,6 @@
class ProtocolException(Exception):
pass
class ApplicationException(Exception):
pass

View file

@ -0,0 +1,359 @@
import json
import base64
from .. knowledge import hash, Uri, Literal
def to_value(x):
if x["e"]: return Uri(x["v"])
return Literal(x["v"])
class Flow:
def __init__(self, api):
self.api = api
def request(self, path=None, request=None):
if request is None:
raise RuntimeError("request must be specified")
if path:
return self.api.request(f"flow/{path}", request)
else:
return self.api.request(f"flow", request)
def id(self, id="0000"):
return FlowInstance(api=self, id=id)
def list_classes(self):
# The input consists of system and prompt strings
input = {
"operation": "list-classes",
}
return self.request(request = input)["class-names"]
def get_class(self, class_name):
# The input consists of system and prompt strings
input = {
"operation": "get-class",
"class-name": class_name,
}
return json.loads(self.request(request = input)["class-definition"])
def put_class(self, class_name, definition):
# The input consists of system and prompt strings
input = {
"operation": "put-class",
"class-name": class_name,
"class-definition": json.dumps(definition),
}
self.request(request = input)
def delete_class(self, class_name):
# The input consists of system and prompt strings
input = {
"operation": "delete-class",
"class-name": class_name,
}
self.request(request = input)
def list(self):
# The input consists of system and prompt strings
input = {
"operation": "list-flows",
}
return self.request(request = input)["flow-ids"]
def get(self, id):
# The input consists of system and prompt strings
input = {
"operation": "get-flow",
"flow-id": id,
}
return json.loads(self.request(request = input)["flow"])
def start(self, class_name, id, description):
# The input consists of system and prompt strings
input = {
"operation": "start-flow",
"flow-id": id,
"class-name": class_name,
"description": description,
}
self.request(request = input)
def stop(self, id):
# The input consists of system and prompt strings
input = {
"operation": "stop-flow",
"flow-id": id,
}
self.request(request = input)
class FlowInstance:
def __init__(self, api, id):
self.api = api
self.id = id
def request(self, path, request):
return self.api.request(path = f"{self.id}/{path}", request = request)
def text_completion(self, system, prompt):
# The input consists of system and prompt strings
input = {
"system": system,
"prompt": prompt
}
return self.request(
"service/text-completion",
input
)["response"]
def agent(self, question):
# The input consists of a question
input = {
"question": question
}
return self.request(
"service/agent",
input
)["answer"]
def graph_rag(
self, question, user="trustgraph", collection="default",
entity_limit=50, triple_limit=30, max_subgraph_size=150,
max_path_length=2,
):
# The input consists of a question
input = {
"query": question,
"user": user,
"collection": collection,
"entity-limit": entity_limit,
"triple-limit": triple_limit,
"max-subgraph-size": max_subgraph_size,
"max-path-length": max_path_length,
}
return self.request(
"service/graph-rag",
input
)["response"]
def document_rag(
self, question, user="trustgraph", collection="default",
doc_limit=10,
):
# The input consists of a question
input = {
"query": question,
"user": user,
"collection": collection,
"doc-limit": doc_limit,
}
return self.request(
"service/document-rag",
input
)["response"]
def embeddings(self, text):
# The input consists of a text block
input = {
"text": text
}
return self.request(
"service/embeddings",
input
)["vectors"]
def prompt(self, id, variables):
# The input consists of system and prompt strings
input = {
"id": id,
"variables": variables
}
object = self.request(
"service/prompt",
input
)
if "text" in object:
return object["text"]
if "object" in object:
try:
return json.loads(object["object"])
except Exception as e:
raise ProtocolException(
"Returned object not well-formed JSON"
)
raise ProtocolException("Response not formatted correctly")
def triples_query(
self, s=None, p=None, o=None,
user=None, collection=None, limit=10000
):
# The input consists of system and prompt strings
input = {
"limit": limit
}
if user:
input["user"] = user
if collection:
input["collection"] = collection
if s:
if not isinstance(s, Uri):
raise RuntimeError("s must be Uri")
input["s"] = { "v": str(s), "e": isinstance(s, Uri), }
if p:
if not isinstance(p, Uri):
raise RuntimeError("p must be Uri")
input["p"] = { "v": str(p), "e": isinstance(p, Uri), }
if o:
if not isinstance(o, Uri) and not isinstance(o, Literal):
raise RuntimeError("o must be Uri or Literal")
input["o"] = { "v": str(o), "e": isinstance(o, Uri), }
object = self.request(
"service/triples",
input
)
return [
Triple(
s=to_value(t["s"]),
p=to_value(t["p"]),
o=to_value(t["o"])
)
for t in object["response"]
]
def load_document(
self, document, id=None, metadata=None, user=None,
collection=None,
):
if id is None:
if metadata is not None:
# Situation makes no sense. What can the metadata possibly
# mean if the caller doesn't know the document ID.
# Metadata should relate to the document by ID
raise RuntimeError("Can't specify metadata without id")
id = hash(document)
triples = []
def emit(t):
triples.append(t)
if metadata:
metadata.emit(
lambda t: triples.append({
"s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
})
)
input = {
"id": id,
"metadata": triples,
"data": base64.b64encode(document).decode("utf-8"),
}
if user:
input["user"] = user
if collection:
input["collection"] = collection
return self.request(
"service/document-load",
input
)
def load_text(
self, text, id=None, metadata=None, charset="utf-8",
user=None, collection=None,
):
if id is None:
if metadata is not None:
# Situation makes no sense. What can the metadata possibly
# mean if the caller doesn't know the document ID.
# Metadata should relate to the document by ID
raise RuntimeError("Can't specify metadata without id")
id = hash(text)
triples = []
if metadata:
metadata.emit(
lambda t: triples.append({
"s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
})
)
input = {
"id": id,
"metadata": triples,
"charset": charset,
"text": base64.b64encode(text).decode("utf-8"),
}
if user:
input["user"] = user
if collection:
input["collection"] = collection
return self.request(
"service/text-load",
input
)

View file

@ -0,0 +1,259 @@
import datetime
import time
import base64
from . types import DocumentMetadata, ProcessingMetadata, Triple
from .. knowledge import hash, Uri, Literal
from . exceptions import *
def to_value(x):
if x["e"]: return Uri(x["v"])
return Literal(x["v"])
class Library:
def __init__(self, api):
self.api = api
def request(self, request):
return self.api.request(f"librarian", request)
def add_document(
self, document, id, metadata, user, title, comments,
kind="text/plain", tags=[],
):
if id is None:
if metadata is not None:
# Situation makes no sense. What can the metadata possibly
# mean if the caller doesn't know the document ID.
# Metadata should relate to the document by ID
raise RuntimeError("Can't specify metadata without id")
id = hash(document)
if not title: title = ""
if not comments: comments = ""
triples = []
def emit(t):
triples.append(t)
if metadata:
metadata.emit(
lambda t: triples.append({
"s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
})
)
input = {
"operation": "add-document",
"document-metadata": {
"id": id,
"time": int(time.time()),
"kind": kind,
"title": title,
"comments": comments,
"metadata": triples,
"user": user,
"tags": tags
},
"content": base64.b64encode(document).decode("utf-8"),
}
return self.request(input)
def get_documents(self, user):
input = {
"operation": "list-documents",
"user": user,
}
object = self.request(input)
try:
return [
DocumentMetadata(
id = v["id"],
time = datetime.datetime.fromtimestamp(v["time"]),
kind = v["kind"],
title = v["title"],
comments = v.get("comments", ""),
metadata = [
Triple(
s = to_value(w["s"]),
p = to_value(w["p"]),
o = to_value(w["o"])
)
for w in v["metadata"]
],
user = v["user"],
tags = v["tags"]
)
for v in object["document-metadatas"]
]
except Exception as e:
print(e)
raise ProtocolException(f"Response not formatted correctly")
def get_document(self, user, id):
input = {
"operation": "get-document",
"user": user,
"document-id": id,
}
object = self.request(input)
doc = object["document-metadata"]
try:
DocumentMetadata(
id = doc["id"],
time = datetime.datetime.fromtimestamp(doc["time"]),
kind = doc["kind"],
title = doc["title"],
comments = doc.get("comments", ""),
metadata = [
Triple(
s = to_value(w["s"]),
p = to_value(w["p"]),
o = to_value(w["o"])
)
for w in doc["metadata"]
],
user = doc["user"],
tags = doc["tags"]
)
except Exception as e:
print(e)
raise ProtocolException(f"Response not formatted correctly")
def update_document(self, user, id, metadata):
input = {
"operation": "update-document",
"document-metadata": {
"user": user,
"document-id": id,
"time": metadata.time,
"title": metadata.title,
"comments": metadata.comments,
"metadata": [
{
"s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
}
for t in metadata.metadata
],
"tags": metadata.tags,
}
}
object = self.request(input)
doc = object["document-metadata"]
try:
DocumentMetadata(
id = doc["id"],
time = datetime.datetime.fromtimestamp(doc["time"]),
kind = doc["kind"],
title = doc["title"],
comments = doc.get("comments", ""),
metadata = [
Triple(
s = to_value(w["s"]),
p = to_value(w["p"]),
o = to_value(w["o"])
)
for w in doc["metadata"]
],
user = doc["user"],
tags = doc["tags"]
)
except Exception as e:
print(e)
raise ProtocolException(f"Response not formatted correctly")
def remove_document(self, user, id):
input = {
"operation": "remove-document",
"user": user,
"document-id": id,
}
object = self.request(input)
return {}
def start_processing(
self, id, document_id, flow="0000",
user="trustgraph", collection="default", tags=[],
):
input = {
"operation": "add-processing",
"processing-metadata": {
"id": id,
"document-id": document_id,
"time": int(time.time()),
"flow": flow,
"user": user,
"collection": collection,
"tags": tags,
}
}
object = self.request(input)
return {}
def stop_processing(
self, id, user="trustgraph",
):
input = {
"operation": "remove-processing",
"processing-id": id,
"user": user,
}
object = self.request(input)
return {}
def get_processings(self, user="trustgraph"):
input = {
"operation": "list-processing",
"user": user,
}
object = self.request(input)
try:
return [
ProcessingMetadata(
id = v["id"],
document_id = v["document-id"],
time = datetime.datetime.fromtimestamp(v["time"]),
flow = v["flow"],
user = v["user"],
collection = v["collection"],
tags = v["tags"],
)
for v in object["processing-metadatas"]
]
except Exception as e:
print(e)
raise ProtocolException(f"Response not formatted correctly")

View file

@ -0,0 +1,42 @@
import dataclasses
import datetime
from typing import List
@dataclasses.dataclass
class Triple:
s : str
p : str
o : str
@dataclasses.dataclass
class ConfigKey:
type : str
key : str
@dataclasses.dataclass
class ConfigValue:
type : str
key : str
value : str
@dataclasses.dataclass
class DocumentMetadata:
id : str
time : datetime.datetime
kind : str
title : str
comments : str
metadata : List[Triple]
user : str
tags : List[str]
@dataclasses.dataclass
class ProcessingMetadata:
id : str
document_id : str
time : datetime.datetime
flow : str
user : str
collection : str
tags : List[str]