trustgraph/trustgraph-base/trustgraph/api/flow.py
cybermaggedon b08db761d7
Fix config inconsistency (#609)
* Plural/singular confusion in config key

* Flow class vs flow blueprint nomenclature change

* Update docs & CLI to reflect the above
2026-01-14 12:31:40 +00:00

657 lines
18 KiB
Python

import json
import base64
from .. knowledge import hash, Uri, Literal
from . types import Triple
from . exceptions import ProtocolException
def to_value(x):
if x["e"]: return Uri(x["v"])
return Literal(x["v"])
class Flow:
def __init__(self, api):
self.api = api
def request(self, path=None, request=None):
if request is None:
raise RuntimeError("request must be specified")
if path:
return self.api.request(f"flow/{path}", request)
else:
return self.api.request(f"flow", request)
def id(self, id="default"):
return FlowInstance(api=self, id=id)
def list_blueprints(self):
# The input consists of system and prompt strings
input = {
"operation": "list-blueprints",
}
return self.request(request = input)["blueprint-names"]
def get_blueprint(self, blueprint_name):
# The input consists of system and prompt strings
input = {
"operation": "get-blueprint",
"blueprint-name": blueprint_name,
}
return json.loads(self.request(request = input)["blueprint-definition"])
def put_blueprint(self, blueprint_name, definition):
# The input consists of system and prompt strings
input = {
"operation": "put-blueprint",
"blueprint-name": blueprint_name,
"blueprint-definition": json.dumps(definition),
}
self.request(request = input)
def delete_blueprint(self, blueprint_name):
# The input consists of system and prompt strings
input = {
"operation": "delete-blueprint",
"blueprint-name": blueprint_name,
}
self.request(request = input)
def list(self):
# The input consists of system and prompt strings
input = {
"operation": "list-flows",
}
return self.request(request = input)["flow-ids"]
def get(self, id):
# The input consists of system and prompt strings
input = {
"operation": "get-flow",
"flow-id": id,
}
return json.loads(self.request(request = input)["flow"])
def start(self, blueprint_name, id, description, parameters=None):
# The input consists of system and prompt strings
input = {
"operation": "start-flow",
"flow-id": id,
"blueprint-name": blueprint_name,
"description": description,
}
if parameters:
input["parameters"] = parameters
self.request(request = input)
def stop(self, id):
# The input consists of system and prompt strings
input = {
"operation": "stop-flow",
"flow-id": id,
}
self.request(request = input)
class FlowInstance:
def __init__(self, api, id):
self.api = api
self.id = id
def request(self, path, request):
return self.api.request(path = f"{self.id}/{path}", request = request)
def text_completion(self, system, prompt):
# The input consists of system and prompt strings
input = {
"system": system,
"prompt": prompt
}
return self.request(
"service/text-completion",
input
)["response"]
def agent(self, question, user="trustgraph", state=None, group=None, history=None):
# The input consists of a question and optional context
input = {
"question": question,
"user": user,
}
# Only include state if it has a value
if state is not None:
input["state"] = state
# Only include group if it has a value
if group is not None:
input["group"] = group
# Always include history (empty list if None)
input["history"] = history or []
return self.request(
"service/agent",
input
)["answer"]
def graph_rag(
self, query, user="trustgraph", collection="default",
entity_limit=50, triple_limit=30, max_subgraph_size=150,
max_path_length=2,
):
# The input consists of a question
input = {
"query": query,
"user": user,
"collection": collection,
"entity-limit": entity_limit,
"triple-limit": triple_limit,
"max-subgraph-size": max_subgraph_size,
"max-path-length": max_path_length,
}
return self.request(
"service/graph-rag",
input
)["response"]
def document_rag(
self, query, user="trustgraph", collection="default",
doc_limit=10,
):
# The input consists of a question
input = {
"query": query,
"user": user,
"collection": collection,
"doc-limit": doc_limit,
}
return self.request(
"service/document-rag",
input
)["response"]
def embeddings(self, text):
# The input consists of a text block
input = {
"text": text
}
return self.request(
"service/embeddings",
input
)["vectors"]
def graph_embeddings_query(self, text, user, collection, limit=10):
# Query graph embeddings for semantic search
input = {
"text": text,
"user": user,
"collection": collection,
"limit": limit
}
return self.request(
"service/graph-embeddings",
input
)
def prompt(self, id, variables):
input = {
"id": id,
"variables": variables
}
object = self.request(
"service/prompt",
input
)
if "text" in object:
return object["text"]
if "object" in object:
try:
return json.loads(object["object"])
except Exception as e:
raise ProtocolException(
"Returned object not well-formed JSON"
)
raise ProtocolException("Response not formatted correctly")
def mcp_tool(self, name, parameters={}):
# The input consists of name and parameters
input = {
"name": name,
"parameters": parameters,
}
object = self.request(
"service/mcp-tool",
input
)
if "text" in object:
return object["text"]
if "object" in object:
try:
return object["object"]
except Exception as e:
raise ProtocolException(
"Returned object not well-formed JSON"
)
raise ProtocolException("Response not formatted correctly")
def triples_query(
self, s=None, p=None, o=None,
user=None, collection=None, limit=10000
):
input = {
"limit": limit
}
if user:
input["user"] = user
if collection:
input["collection"] = collection
if s:
if not isinstance(s, Uri):
raise RuntimeError("s must be Uri")
input["s"] = { "v": str(s), "e": isinstance(s, Uri), }
if p:
if not isinstance(p, Uri):
raise RuntimeError("p must be Uri")
input["p"] = { "v": str(p), "e": isinstance(p, Uri), }
if o:
if not isinstance(o, Uri) and not isinstance(o, Literal):
raise RuntimeError("o must be Uri or Literal")
input["o"] = { "v": str(o), "e": isinstance(o, Uri), }
object = self.request(
"service/triples",
input
)
return [
Triple(
s=to_value(t["s"]),
p=to_value(t["p"]),
o=to_value(t["o"])
)
for t in object["response"]
]
def load_document(
self, document, id=None, metadata=None, user=None,
collection=None,
):
if id is None:
if metadata is not None:
# Situation makes no sense. What can the metadata possibly
# mean if the caller doesn't know the document ID.
# Metadata should relate to the document by ID
raise RuntimeError("Can't specify metadata without id")
id = hash(document)
triples = []
def emit(t):
triples.append(t)
if metadata:
metadata.emit(
lambda t: triples.append({
"s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
})
)
input = {
"id": id,
"metadata": triples,
"data": base64.b64encode(document).decode("utf-8"),
}
if user:
input["user"] = user
if collection:
input["collection"] = collection
return self.request(
"service/document-load",
input
)
def load_text(
self, text, id=None, metadata=None, charset="utf-8",
user=None, collection=None,
):
if id is None:
if metadata is not None:
# Situation makes no sense. What can the metadata possibly
# mean if the caller doesn't know the document ID.
# Metadata should relate to the document by ID
raise RuntimeError("Can't specify metadata without id")
id = hash(text)
triples = []
if metadata:
metadata.emit(
lambda t: triples.append({
"s": { "v": t["s"], "e": isinstance(t["s"], Uri) },
"p": { "v": t["p"], "e": isinstance(t["p"], Uri) },
"o": { "v": t["o"], "e": isinstance(t["o"], Uri) }
})
)
input = {
"id": id,
"metadata": triples,
"charset": charset,
"text": base64.b64encode(text).decode("utf-8"),
}
if user:
input["user"] = user
if collection:
input["collection"] = collection
return self.request(
"service/text-load",
input
)
def objects_query(
self, query, user="trustgraph", collection="default",
variables=None, operation_name=None
):
# The input consists of a GraphQL query and optional variables
input = {
"query": query,
"user": user,
"collection": collection,
}
if variables:
input["variables"] = variables
if operation_name:
input["operation_name"] = operation_name
response = self.request(
"service/objects",
input
)
# Check for system-level error
if "error" in response and response["error"]:
error_type = response["error"].get("type", "unknown")
error_message = response["error"].get("message", "Unknown error")
raise ProtocolException(f"{error_type}: {error_message}")
# Return the GraphQL response structure
result = {}
if "data" in response:
result["data"] = response["data"]
if "errors" in response and response["errors"]:
result["errors"] = response["errors"]
if "extensions" in response and response["extensions"]:
result["extensions"] = response["extensions"]
return result
def nlp_query(self, question, max_results=100):
"""
Convert a natural language question to a GraphQL query.
Args:
question: Natural language question
max_results: Maximum number of results to return (default: 100)
Returns:
dict with graphql_query, variables, detected_schemas, confidence
"""
input = {
"question": question,
"max_results": max_results
}
response = self.request(
"service/nlp-query",
input
)
# Check for system-level error
if "error" in response and response["error"]:
error_type = response["error"].get("type", "unknown")
error_message = response["error"].get("message", "Unknown error")
raise ProtocolException(f"{error_type}: {error_message}")
return response
def structured_query(self, question, user="trustgraph", collection="default"):
"""
Execute a natural language question against structured data.
Combines NLP query conversion and GraphQL execution.
Args:
question: Natural language question
user: Cassandra keyspace identifier (default: "trustgraph")
collection: Data collection identifier (default: "default")
Returns:
dict with data and optional errors
"""
input = {
"question": question,
"user": user,
"collection": collection
}
response = self.request(
"service/structured-query",
input
)
# Check for system-level error
if "error" in response and response["error"]:
error_type = response["error"].get("type", "unknown")
error_message = response["error"].get("message", "Unknown error")
raise ProtocolException(f"{error_type}: {error_message}")
return response
def detect_type(self, sample):
"""
Detect the data type of a structured data sample.
Args:
sample: Data sample to analyze (string content)
Returns:
dict with detected_type, confidence, and optional metadata
"""
input = {
"operation": "detect-type",
"sample": sample
}
response = self.request(
"service/structured-diag",
input
)
# Check for system-level error
if "error" in response and response["error"]:
error_type = response["error"].get("type", "unknown")
error_message = response["error"].get("message", "Unknown error")
raise ProtocolException(f"{error_type}: {error_message}")
return response["detected-type"]
def generate_descriptor(self, sample, data_type, schema_name, options=None):
"""
Generate a descriptor for structured data mapping to a specific schema.
Args:
sample: Data sample to analyze (string content)
data_type: Data type (csv, json, xml)
schema_name: Target schema name for descriptor generation
options: Optional parameters (e.g., delimiter for CSV)
Returns:
dict with descriptor and metadata
"""
input = {
"operation": "generate-descriptor",
"sample": sample,
"type": data_type,
"schema-name": schema_name
}
if options:
input["options"] = options
response = self.request(
"service/structured-diag",
input
)
# Check for system-level error
if "error" in response and response["error"]:
error_type = response["error"].get("type", "unknown")
error_message = response["error"].get("message", "Unknown error")
raise ProtocolException(f"{error_type}: {error_message}")
return response["descriptor"]
def diagnose_data(self, sample, schema_name=None, options=None):
"""
Perform combined data diagnosis: detect type and generate descriptor.
Args:
sample: Data sample to analyze (string content)
schema_name: Optional target schema name for descriptor generation
options: Optional parameters (e.g., delimiter for CSV)
Returns:
dict with detected_type, confidence, descriptor, and metadata
"""
input = {
"operation": "diagnose",
"sample": sample
}
if schema_name:
input["schema-name"] = schema_name
if options:
input["options"] = options
response = self.request(
"service/structured-diag",
input
)
# Check for system-level error
if "error" in response and response["error"]:
error_type = response["error"].get("type", "unknown")
error_message = response["error"].get("message", "Unknown error")
raise ProtocolException(f"{error_type}: {error_message}")
return response
def schema_selection(self, sample, options=None):
"""
Select matching schemas for a data sample using prompt analysis.
Args:
sample: Data sample to analyze (string content)
options: Optional parameters
Returns:
dict with schema_matches array and metadata
"""
input = {
"operation": "schema-selection",
"sample": sample
}
if options:
input["options"] = options
response = self.request(
"service/structured-diag",
input
)
# Check for system-level error
if "error" in response and response["error"]:
error_type = response["error"].get("type", "unknown")
error_message = response["error"].get("message", "Unknown error")
raise ProtocolException(f"{error_type}: {error_message}")
return response["schema-matches"]