trustgraph/trustgraph-base/trustgraph/api/async_flow.py
cybermaggedon d35473f7f7
feat: workspace-based multi-tenancy, replacing user as tenancy axis (#840)
Introduces `workspace` as the isolation boundary for config, flows,
library, and knowledge data. Removes `user` as a schema-level field
throughout the code, API specs, and tests; workspace provides the
same separation more cleanly at the trusted flow.workspace layer
rather than through client-supplied message fields.

Design
------
- IAM tech spec (docs/tech-specs/iam.md) documents current state,
  proposed auth/access model, and migration direction.
- Data ownership model (docs/tech-specs/data-ownership-model.md)
  captures the workspace/collection/flow hierarchy.

Schema + messaging
------------------
- Drop `user` field from AgentRequest/Step, GraphRagQuery,
  DocumentRagQuery, Triples/Graph/Document/Row EmbeddingsRequest,
  Sparql/Rows/Structured QueryRequest, ToolServiceRequest.
- Keep collection/workspace routing via flow.workspace at the
  service layer.
- Translators updated to not serialise/deserialise user.

API specs
---------
- OpenAPI schemas and path examples cleaned of user fields.
- Websocket async-api messages updated.
- Removed the unused parameters/User.yaml.

Services + base
---------------
- Librarian, collection manager, knowledge, config: all operations
  scoped by workspace. Config client API takes workspace as first
  positional arg.
- `flow.workspace` set at flow start time by the infrastructure;
  no longer pass-through from clients.
- Tool service drops user-personalisation passthrough.

CLI + SDK
---------
- tg-init-workspace and workspace-aware import/export.
- All tg-* commands drop user args; accept --workspace.
- Python API/SDK (flow, socket_client, async_*, explainability,
  library) drop user kwargs from every method signature.

MCP server
----------
- All tool endpoints drop user parameters; socket_manager no longer
  keyed per user.

Flow service
------------
- Closure-based topic cleanup on flow stop: only delete topics
  whose blueprint template was parameterised AND no remaining
  live flow (across all workspaces) still resolves to that topic.
  Three scopes fall out naturally from template analysis:
    * {id} -> per-flow, deleted on stop
    * {blueprint} -> per-blueprint, kept while any flow of the
      same blueprint exists
    * {workspace} -> per-workspace, kept while any flow in the
      workspace exists
    * literal -> global, never deleted (e.g. tg.request.librarian)
  Fixes a bug where stopping a flow silently destroyed the global
  librarian exchange, wedging all library operations until manual
  restart.

RabbitMQ backend
----------------
- heartbeat=60, blocked_connection_timeout=300. Catches silently
  dead connections (broker restart, orphaned channels, network
  partitions) within ~2 heartbeat windows, so the consumer
  reconnects and re-binds its queue rather than sitting forever
  on a zombie connection.

Tests
-----
- Full test refresh: unit, integration, contract, provenance.
- Dropped user-field assertions and constructor kwargs across
  ~100 test files.
- Renamed user-collection isolation tests to workspace-collection.
2026-04-21 23:23:01 +01:00

804 lines
26 KiB
Python

"""
TrustGraph Asynchronous Flow Management
This module provides async/await based interfaces for managing and interacting
with TrustGraph flows using REST API calls. Unlike async_socket_client which
provides streaming support, this module is focused on non-streaming operations
that return complete responses.
For streaming support (e.g., real-time agent responses, streaming RAG), use
AsyncSocketClient instead.
"""
import aiohttp
import json
from typing import Optional, Dict, Any, List
from . types import TextCompletionResult
from . exceptions import ProtocolException, ApplicationException
def check_error(response):
if "error" in response:
try:
msg = response["error"]["message"]
tp = response["error"]["type"]
except:
raise ApplicationException(response["error"])
raise ApplicationException(f"{tp}: {msg}")
class AsyncFlow:
"""
Asynchronous flow management client using REST API.
Provides async/await based flow management operations including listing,
starting, stopping flows, and managing flow class definitions. Also provides
access to flow-scoped services like agents, RAG, and queries via non-streaming
REST endpoints.
Note: For streaming support, use AsyncSocketClient instead.
"""
def __init__(self, url: str, timeout: int, token: Optional[str]) -> None:
"""
Initialize async flow client.
Args:
url: Base URL for TrustGraph API
timeout: Request timeout in seconds
token: Optional bearer token for authentication
"""
self.url: str = url
self.timeout: int = timeout
self.token: Optional[str] = token
async def request(self, path: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Make async HTTP POST request to Gateway API.
Internal method for making authenticated requests to the TrustGraph API.
Args:
path: API endpoint path (relative to base URL)
request_data: Request payload dictionary
Returns:
dict: Response object from API
Raises:
ProtocolException: If HTTP status is not 200 or response is not valid JSON
ApplicationException: If API returns an error response
"""
url = f"{self.url}{path}"
headers = {"Content-Type": "application/json"}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, json=request_data, headers=headers) as resp:
if resp.status != 200:
raise ProtocolException(f"Status code {resp.status}")
try:
obj = await resp.json()
except:
raise ProtocolException(f"Expected JSON response")
check_error(obj)
return obj
async def list(self) -> List[str]:
"""
List all flow identifiers.
Retrieves IDs of all flows currently deployed in the system.
Returns:
list[str]: List of flow identifiers
Example:
```python
async_flow = await api.async_flow()
# List all flows
flows = await async_flow.list()
print(f"Available flows: {flows}")
```
"""
result = await self.request("flow", {"operation": "list-flows"})
return result.get("flow-ids", [])
async def get(self, id: str) -> Dict[str, Any]:
"""
Get flow definition.
Retrieves the complete flow configuration including its class name,
description, and parameters.
Args:
id: Flow identifier
Returns:
dict: Flow definition object
Example:
```python
async_flow = await api.async_flow()
# Get flow definition
flow_def = await async_flow.get("default")
print(f"Flow class: {flow_def.get('class-name')}")
print(f"Description: {flow_def.get('description')}")
```
"""
result = await self.request("flow", {
"operation": "get-flow",
"flow-id": id
})
return json.loads(result.get("flow", "{}"))
async def start(self, class_name: str, id: str, description: str, parameters: Optional[Dict] = None):
"""
Start a new flow instance.
Creates and starts a flow from a flow class definition with the specified
parameters.
Args:
class_name: Flow class name to instantiate
id: Identifier for the new flow instance
description: Human-readable description of the flow
parameters: Optional configuration parameters for the flow
Example:
```python
async_flow = await api.async_flow()
# Start a flow from a class
await async_flow.start(
class_name="default",
id="my-flow",
description="Custom flow instance",
parameters={"model": "claude-3-opus"}
)
```
"""
request_data = {
"operation": "start-flow",
"flow-id": id,
"class-name": class_name,
"description": description
}
if parameters:
request_data["parameters"] = json.dumps(parameters)
await self.request("flow", request_data)
async def stop(self, id: str):
"""
Stop a running flow.
Stops and removes a flow instance, freeing its resources.
Args:
id: Flow identifier to stop
Example:
```python
async_flow = await api.async_flow()
# Stop a flow
await async_flow.stop("my-flow")
```
"""
await self.request("flow", {
"operation": "stop-flow",
"flow-id": id
})
async def list_classes(self) -> List[str]:
"""
List all flow class names.
Retrieves names of all flow classes (blueprints) available in the system.
Returns:
list[str]: List of flow class names
Example:
```python
async_flow = await api.async_flow()
# List available flow classes
classes = await async_flow.list_classes()
print(f"Available flow classes: {classes}")
```
"""
result = await self.request("flow", {"operation": "list-classes"})
return result.get("class-names", [])
async def get_class(self, class_name: str) -> Dict[str, Any]:
"""
Get flow class definition.
Retrieves the blueprint definition for a flow class, including its
configuration schema and service bindings.
Args:
class_name: Flow class name
Returns:
dict: Flow class definition object
Example:
```python
async_flow = await api.async_flow()
# Get flow class definition
class_def = await async_flow.get_class("default")
print(f"Services: {class_def.get('services')}")
```
"""
result = await self.request("flow", {
"operation": "get-class",
"class-name": class_name
})
return json.loads(result.get("class-definition", "{}"))
async def put_class(self, class_name: str, definition: Dict[str, Any]):
"""
Create or update a flow class definition.
Stores a flow class blueprint that can be used to instantiate flows.
Args:
class_name: Flow class name
definition: Flow class definition object
Example:
```python
async_flow = await api.async_flow()
# Create a custom flow class
class_def = {
"services": {
"agent": {"module": "agent", "config": {...}},
"graph-rag": {"module": "graph-rag", "config": {...}}
}
}
await async_flow.put_class("custom-flow", class_def)
```
"""
await self.request("flow", {
"operation": "put-class",
"class-name": class_name,
"class-definition": json.dumps(definition)
})
async def delete_class(self, class_name: str):
"""
Delete a flow class definition.
Removes a flow class blueprint from the system. Does not affect
running flow instances.
Args:
class_name: Flow class name to delete
Example:
```python
async_flow = await api.async_flow()
# Delete a flow class
await async_flow.delete_class("old-flow-class")
```
"""
await self.request("flow", {
"operation": "delete-class",
"class-name": class_name
})
def id(self, flow_id: str):
"""
Get an async flow instance client.
Returns a client for interacting with a specific flow's services
(agent, RAG, queries, embeddings, etc.).
Args:
flow_id: Flow identifier
Returns:
AsyncFlowInstance: Client for flow-specific operations
Example:
```python
async_flow = await api.async_flow()
# Get flow instance
flow = async_flow.id("default")
# Use flow services
result = await flow.graph_rag(
query="What is TrustGraph?",collection="default"
)
```
"""
return AsyncFlowInstance(self, flow_id)
async def aclose(self) -> None:
"""
Close async client and cleanup resources.
Note: Cleanup is handled automatically by aiohttp session context managers.
This method is provided for consistency with other async clients.
"""
pass
class AsyncFlowInstance:
"""
Asynchronous flow instance client.
Provides async/await access to flow-scoped services including agents,
RAG queries, embeddings, and graph queries. All operations return complete
responses (non-streaming).
Note: For streaming support, use AsyncSocketFlowInstance instead.
"""
def __init__(self, flow: AsyncFlow, flow_id: str):
"""
Initialize async flow instance.
Args:
flow: Parent AsyncFlow client
flow_id: Flow identifier
"""
self.flow = flow
self.flow_id = flow_id
async def request(self, service: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Make request to a flow-scoped service.
Internal method for calling services within this flow instance.
Args:
service: Service name (e.g., "agent", "graph-rag", "triples")
request_data: Service request payload
Returns:
dict: Service response object
Raises:
ProtocolException: If request fails or response is invalid
ApplicationException: If service returns an error
"""
return await self.flow.request(f"flow/{self.flow_id}/service/{service}", request_data)
async def agent(self, question: str, state: Optional[Dict] = None,
group: Optional[str] = None, history: Optional[List] = None, **kwargs: Any) -> Dict[str, Any]:
"""
Execute an agent operation (non-streaming).
Runs an agent to answer a question, with optional conversation state and
history. Returns the complete response after the agent has finished
processing.
Note: This method does not support streaming. For real-time agent thoughts
and observations, use AsyncSocketFlowInstance.agent() instead.
Args:
question: User question or instruction
state: Optional state dictionary for conversation context
group: Optional group identifier for session management
history: Optional conversation history list
**kwargs: Additional service-specific parameters
Returns:
dict: Complete agent response including answer and metadata
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
# Execute agent
result = await flow.agent(
question="What is the capital of France?",
)
print(f"Answer: {result.get('response')}")
```
"""
request_data = {
"question": question,
"streaming": False # REST doesn't support streaming
}
if state is not None:
request_data["state"] = state
if group is not None:
request_data["group"] = group
if history is not None:
request_data["history"] = history
request_data.update(kwargs)
return await self.request("agent", request_data)
async def text_completion(self, system: str, prompt: str, **kwargs: Any) -> TextCompletionResult:
"""
Generate text completion (non-streaming).
Generates a text response from an LLM given a system prompt and user prompt.
Note: This method does not support streaming. For streaming text generation,
use AsyncSocketFlowInstance.text_completion() instead.
Args:
system: System prompt defining the LLM's behavior
prompt: User prompt or question
**kwargs: Additional service-specific parameters
Returns:
TextCompletionResult: Result with text, in_token, out_token, model
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
result = await flow.text_completion(
system="You are a helpful assistant.",
prompt="Explain quantum computing in simple terms."
)
print(result.text)
print(f"Tokens: {result.in_token} in, {result.out_token} out")
```
"""
request_data = {
"system": system,
"prompt": prompt,
"streaming": False
}
request_data.update(kwargs)
result = await self.request("text-completion", request_data)
return TextCompletionResult(
text=result.get("response", ""),
in_token=result.get("in_token"),
out_token=result.get("out_token"),
model=result.get("model"),
)
async def graph_rag(self, query: str, collection: str,
max_subgraph_size: int = 1000, max_subgraph_count: int = 5,
max_entity_distance: int = 3, **kwargs: Any) -> str:
"""
Execute graph-based RAG query (non-streaming).
Performs Retrieval-Augmented Generation using knowledge graph data.
Identifies relevant entities and their relationships, then generates a
response grounded in the graph structure. Returns complete response.
Note: This method does not support streaming. For streaming RAG responses,
use AsyncSocketFlowInstance.graph_rag() instead.
Args:
query: User query text
collection: Collection identifier containing the knowledge graph
max_subgraph_size: Maximum number of triples per subgraph (default: 1000)
max_subgraph_count: Maximum number of subgraphs to retrieve (default: 5)
max_entity_distance: Maximum graph distance for entity expansion (default: 3)
**kwargs: Additional service-specific parameters
Returns:
str: Complete generated response grounded in graph data
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
# Query knowledge graph
response = await flow.graph_rag(
query="What are the relationships between these entities?",collection="medical-kb",
max_subgraph_count=3
)
print(response)
```
"""
request_data = {
"query": query,
"collection": collection,
"max-subgraph-size": max_subgraph_size,
"max-subgraph-count": max_subgraph_count,
"max-entity-distance": max_entity_distance,
"streaming": False
}
request_data.update(kwargs)
result = await self.request("graph-rag", request_data)
return result.get("response", "")
async def document_rag(self, query: str, collection: str,
doc_limit: int = 10, **kwargs: Any) -> str:
"""
Execute document-based RAG query (non-streaming).
Performs Retrieval-Augmented Generation using document embeddings.
Retrieves relevant document chunks via semantic search, then generates
a response grounded in the retrieved documents. Returns complete response.
Note: This method does not support streaming. For streaming RAG responses,
use AsyncSocketFlowInstance.document_rag() instead.
Args:
query: User query text
collection: Collection identifier containing documents
doc_limit: Maximum number of document chunks to retrieve (default: 10)
**kwargs: Additional service-specific parameters
Returns:
str: Complete generated response grounded in document data
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
# Query documents
response = await flow.document_rag(
query="What does the documentation say about authentication?",collection="docs",
doc_limit=5
)
print(response)
```
"""
request_data = {
"query": query,
"collection": collection,
"doc-limit": doc_limit,
"streaming": False
}
request_data.update(kwargs)
result = await self.request("document-rag", request_data)
return result.get("response", "")
async def graph_embeddings_query(self, text: str, collection: str, limit: int = 10, **kwargs: Any):
"""
Query graph embeddings for semantic entity search.
Performs semantic search over graph entity embeddings to find entities
most relevant to the input text. Returns entities ranked by similarity.
Args:
text: Query text for semantic search
collection: Collection identifier containing graph embeddings
limit: Maximum number of results to return (default: 10)
**kwargs: Additional service-specific parameters
Returns:
dict: Response containing ranked entity matches with similarity scores
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
# Find related entities
results = await flow.graph_embeddings_query(
text="machine learning algorithms",collection="tech-kb",
limit=5
)
for entity in results.get("entities", []):
print(f"{entity['name']}: {entity['score']}")
```
"""
# First convert text to embedding vector
emb_result = await self.embeddings(texts=[text])
vector = emb_result.get("vectors", [[]])[0]
request_data = {
"vector": vector,
"collection": collection,
"limit": limit
}
request_data.update(kwargs)
return await self.request("graph-embeddings", request_data)
async def embeddings(self, texts: list, **kwargs: Any):
"""
Generate embeddings for input texts.
Converts texts into numerical vector representations using the flow's
configured embedding model. Useful for semantic search and similarity
comparisons.
Args:
texts: List of input texts to embed
**kwargs: Additional service-specific parameters
Returns:
dict: Response containing embedding vectors
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
# Generate embeddings
result = await flow.embeddings(texts=["Sample text to embed"])
vectors = result.get("vectors")
print(f"Embedding dimension: {len(vectors[0][0])}")
```
"""
request_data = {"texts": texts}
request_data.update(kwargs)
return await self.request("embeddings", request_data)
async def triples_query(self, s=None, p=None, o=None, collection=None, limit=100, **kwargs: Any):
"""
Query RDF triples using pattern matching.
Searches for triples matching the specified subject, predicate, and/or
object patterns. Patterns use None as a wildcard to match any value.
Args:
s: Subject pattern (None for wildcard)
p: Predicate pattern (None for wildcard)
o: Object pattern (None for wildcard)
collection: Collection identifier (None for all collections)
limit: Maximum number of triples to return (default: 100)
**kwargs: Additional service-specific parameters
Returns:
dict: Response containing matching triples
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
# Find all triples with a specific predicate
results = await flow.triples_query(
p="knows",collection="social",
limit=50
)
for triple in results.get("triples", []):
print(f"{triple['s']} knows {triple['o']}")
```
"""
request_data = {"limit": limit}
if s is not None:
request_data["s"] = str(s)
if p is not None:
request_data["p"] = str(p)
if o is not None:
request_data["o"] = str(o)
if collection is not None:
request_data["collection"] = collection
request_data.update(kwargs)
return await self.request("triples", request_data)
async def rows_query(self, query: str, collection: str, variables: Optional[Dict] = None,
operation_name: Optional[str] = None, **kwargs: Any):
"""
Execute a GraphQL query on stored rows.
Queries structured data rows using GraphQL syntax. Supports complex
queries with variables and named operations.
Args:
query: GraphQL query string
collection: Collection identifier containing rows
variables: Optional GraphQL query variables
operation_name: Optional operation name for multi-operation queries
**kwargs: Additional service-specific parameters
Returns:
dict: GraphQL response with data and/or errors
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
# Execute GraphQL query
query = '''
query GetUsers($status: String!) {
users(status: $status) {
id
name
email
}
}
'''
result = await flow.rows_query(
query=query,collection="users",
variables={"status": "active"}
)
for user in result.get("data", {}).get("users", []):
print(f"{user['name']}: {user['email']}")
```
"""
request_data = {
"query": query,
"collection": collection
}
if variables:
request_data["variables"] = variables
if operation_name:
request_data["operationName"] = operation_name
request_data.update(kwargs)
return await self.request("rows", request_data)
async def row_embeddings_query(
self, text: str, schema_name: str,
collection: str = "default", index_name: Optional[str] = None,
limit: int = 10, **kwargs: Any
):
"""
Query row embeddings for semantic search on structured data.
Performs semantic search over row index embeddings to find rows whose
indexed field values are most similar to the input text. Enables
fuzzy/semantic matching on structured data.
Args:
text: Query text for semantic search
schema_name: Schema name to search within
collection: Collection identifier (default: "default")
index_name: Optional index name to filter search to specific index
limit: Maximum number of results to return (default: 10)
**kwargs: Additional service-specific parameters
Returns:
dict: Response containing matches with index_name, index_value,
text, and score
Example:
```python
async_flow = await api.async_flow()
flow = async_flow.id("default")
# Search for customers by name similarity
results = await flow.row_embeddings_query(
text="John Smith",
schema_name="customers",collection="sales",
limit=5
)
for match in results.get("matches", []):
print(f"{match['index_name']}: {match['index_value']} (score: {match['score']})")
```
"""
# First convert text to embedding vector
emb_result = await self.embeddings(texts=[text])
vector = emb_result.get("vectors", [[]])[0]
request_data = {
"vector": vector,
"schema_name": schema_name,
"collection": collection,
"limit": limit
}
if index_name:
request_data["index_name"] = index_name
request_data.update(kwargs)
return await self.request("row-embeddings", request_data)