Update docs for 2.2 release (#766)

- Update protocol specs
- Update protocol docs
- Update API specs
This commit is contained in:
cybermaggedon 2026-04-07 22:24:59 +01:00 committed by GitHub
parent c20e6540ec
commit e899370d98
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 488 additions and 867 deletions

View file

@ -911,7 +911,7 @@ results = flow.graph_embeddings_query(
# results contains {"entities": [{"entity": {...}, "score": 0.95}, ...]}
```
### `graph_rag(self, query, user='trustgraph', collection='default', entity_limit=50, triple_limit=30, max_subgraph_size=150, max_path_length=2)`
### `graph_rag(self, query, user='trustgraph', collection='default', entity_limit=50, triple_limit=30, max_subgraph_size=150, max_path_length=2, edge_score_limit=30, edge_limit=25)`
Execute graph-based Retrieval-Augmented Generation (RAG) query.
@ -927,6 +927,8 @@ traversing entity relationships, then generates a response using an LLM.
- `triple_limit`: Maximum triples per entity (default: 30)
- `max_subgraph_size`: Maximum total triples in subgraph (default: 150)
- `max_path_length`: Maximum traversal depth (default: 2)
- `edge_score_limit`: Max edges for semantic pre-filter (default: 50)
- `edge_limit`: Max edges after LLM scoring (default: 25)
**Returns:** str: Generated response incorporating graph context
@ -1216,6 +1218,23 @@ Select matching schemas for a data sample using prompt analysis.
**Returns:** dict with schema_matches array and metadata
### `sparql_query(self, query, user='trustgraph', collection='default', limit=10000)`
Execute a SPARQL query against the knowledge graph.
**Arguments:**
- `query`: SPARQL 1.1 query string
- `user`: User/keyspace identifier (default: "trustgraph")
- `collection`: Collection identifier (default: "default")
- `limit`: Safety limit on results (default: 10000)
**Returns:** dict with query results. Structure depends on query type: - SELECT: {"query-type": "select", "variables": [...], "bindings": [...]} - ASK: {"query-type": "ask", "ask-result": bool} - CONSTRUCT/DESCRIBE: {"query-type": "construct", "triples": [...]}
**Raises:**
- `ProtocolException`: If an error occurs
### `structured_query(self, question, user='trustgraph', collection='default')`
Execute a natural language question against structured data.
@ -1937,54 +1956,24 @@ for triple in results.get("triples", []):
from trustgraph.api import SocketClient
```
Synchronous WebSocket client for streaming operations.
Synchronous WebSocket client with persistent connection.
Provides a synchronous interface to WebSocket-based TrustGraph services,
wrapping async websockets library with synchronous generators for ease of use.
Supports streaming responses from agents, RAG queries, and text completions.
Note: This is a synchronous wrapper around async WebSocket operations. For
true async support, use AsyncSocketClient instead.
Maintains a single websocket connection and multiplexes requests
by ID via a background reader task. Provides synchronous generators
for streaming responses.
### Methods
### `__init__(self, url: str, timeout: int, token: str | None) -> None`
Initialize synchronous WebSocket client.
**Arguments:**
- `url`: Base URL for TrustGraph API (HTTP/HTTPS will be converted to WS/WSS)
- `timeout`: WebSocket timeout in seconds
- `token`: Optional bearer token for authentication
Initialize self. See help(type(self)) for accurate signature.
### `close(self) -> None`
Close WebSocket connections.
Note: Cleanup is handled automatically by context managers in async code.
Close the persistent WebSocket connection.
### `flow(self, flow_id: str) -> 'SocketFlowInstance'`
Get a flow instance for WebSocket streaming operations.
**Arguments:**
- `flow_id`: Flow identifier
**Returns:** SocketFlowInstance: Flow instance with streaming methods
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
# Stream agent responses
for chunk in flow.agent(question="Hello", user="trustgraph", streaming=True):
print(chunk.content, end='', flush=True)
```
---
@ -1997,618 +1986,82 @@ from trustgraph.api import SocketFlowInstance
Synchronous WebSocket flow instance for streaming operations.
Provides the same interface as REST FlowInstance but with WebSocket-based
streaming support for real-time responses. All methods support an optional
`streaming` parameter to enable incremental result delivery.
streaming support for real-time responses.
### Methods
### `__init__(self, client: trustgraph.api.socket_client.SocketClient, flow_id: str) -> None`
Initialize socket flow instance.
**Arguments:**
- `client`: Parent SocketClient
- `flow_id`: Flow identifier
Initialize self. See help(type(self)) for accurate signature.
### `agent(self, question: str, user: str, state: Dict[str, Any] | None = None, group: str | None = None, history: List[Dict[str, Any]] | None = None, streaming: bool = False, **kwargs: Any) -> Dict[str, Any] | Iterator[trustgraph.api.types.StreamingChunk]`
Execute an agent operation with streaming support.
Agents can perform multi-step reasoning with tool use. This method always
returns streaming chunks (thoughts, observations, answers) even when
streaming=False, to show the agent's reasoning process.
**Arguments:**
- `question`: User question or instruction
- `user`: User identifier
- `state`: Optional state dictionary for stateful conversations
- `group`: Optional group identifier for multi-user contexts
- `history`: Optional conversation history as list of message dicts
- `streaming`: Enable streaming mode (default: False)
- `**kwargs`: Additional parameters passed to the agent service
**Returns:** Iterator[StreamingChunk]: Stream of agent thoughts, observations, and answers
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
# Stream agent reasoning
for chunk in flow.agent(
question="What is quantum computing?",
user="trustgraph",
streaming=True
):
if isinstance(chunk, AgentThought):
print(f"[Thinking] {chunk.content}")
elif isinstance(chunk, AgentObservation):
print(f"[Observation] {chunk.content}")
elif isinstance(chunk, AgentAnswer):
print(f"[Answer] {chunk.content}")
```
### `agent_explain(self, question: str, user: str, collection: str, state: Dict[str, Any] | None = None, group: str | None = None, history: List[Dict[str, Any]] | None = None, **kwargs: Any) -> Iterator[trustgraph.api.types.StreamingChunk | trustgraph.api.types.ProvenanceEvent]`
Execute an agent operation with explainability support.
Streams both content chunks (AgentThought, AgentObservation, AgentAnswer)
and provenance events (ProvenanceEvent). Provenance events contain URIs
that can be fetched using ExplainabilityClient to get detailed information
about the agent's reasoning process.
Agent trace consists of:
- Session: The initial question and session metadata
- Iterations: Each thought/action/observation cycle
- Conclusion: The final answer
**Arguments:**
- `question`: User question or instruction
- `user`: User identifier
- `collection`: Collection identifier for provenance storage
- `state`: Optional state dictionary for stateful conversations
- `group`: Optional group identifier for multi-user contexts
- `history`: Optional conversation history as list of message dicts
- `**kwargs`: Additional parameters passed to the agent service
- `Yields`:
- `Union[StreamingChunk, ProvenanceEvent]`: Agent chunks and provenance events
**Example:**
```python
from trustgraph.api import Api, ExplainabilityClient, ProvenanceEvent
from trustgraph.api import AgentThought, AgentObservation, AgentAnswer
socket = api.socket()
flow = socket.flow("default")
explain_client = ExplainabilityClient(flow)
provenance_ids = []
for item in flow.agent_explain(
question="What is the capital of France?",
user="trustgraph",
collection="default"
):
if isinstance(item, AgentThought):
print(f"[Thought] {item.content}")
elif isinstance(item, AgentObservation):
print(f"[Observation] {item.content}")
elif isinstance(item, AgentAnswer):
print(f"[Answer] {item.content}")
elif isinstance(item, ProvenanceEvent):
provenance_ids.append(item.explain_id)
# Fetch session trace after completion
if provenance_ids:
trace = explain_client.fetch_agent_trace(
provenance_ids[0], # Session URI is first
graph="urn:graph:retrieval",
user="trustgraph",
collection="default"
)
```
### `document_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any) -> Dict[str, Any]`
Query document chunks using semantic similarity.
**Arguments:**
- `text`: Query text for semantic search
- `user`: User/keyspace identifier
- `collection`: Collection identifier
- `limit`: Maximum number of results (default: 10)
- `**kwargs`: Additional parameters passed to the service
**Returns:** dict: Query results with chunk_ids of matching document chunks
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
results = flow.document_embeddings_query(
text="machine learning algorithms",
user="trustgraph",
collection="research-papers",
limit=5
)
# results contains {"chunks": [{"chunk_id": "...", "score": 0.95}, ...]}
```
### `document_rag(self, query: str, user: str, collection: str, doc_limit: int = 10, streaming: bool = False, **kwargs: Any) -> str | Iterator[str]`
Execute document-based RAG query with optional streaming.
Uses vector embeddings to find relevant document chunks, then generates
a response using an LLM. Streaming mode delivers results incrementally.
**Arguments:**
- `query`: Natural language query
- `user`: User/keyspace identifier
- `collection`: Collection identifier
- `doc_limit`: Maximum document chunks to retrieve (default: 10)
- `streaming`: Enable streaming mode (default: False)
- `**kwargs`: Additional parameters passed to the service
**Returns:** Union[str, Iterator[str]]: Complete response or stream of text chunks
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
# Streaming document RAG
for chunk in flow.document_rag(
query="Summarize the key findings",
user="trustgraph",
collection="research-papers",
doc_limit=5,
streaming=True
):
print(chunk, end='', flush=True)
```
### `document_rag_explain(self, query: str, user: str, collection: str, doc_limit: int = 10, **kwargs: Any) -> Iterator[trustgraph.api.types.RAGChunk | trustgraph.api.types.ProvenanceEvent]`
Execute document-based RAG query with explainability support.
Streams both content chunks (RAGChunk) and provenance events (ProvenanceEvent).
Provenance events contain URIs that can be fetched using ExplainabilityClient
to get detailed information about how the response was generated.
Document RAG trace consists of:
- Question: The user's query
- Exploration: Chunks retrieved from document store (chunk_count)
- Synthesis: The generated answer
**Arguments:**
- `query`: Natural language query
- `user`: User/keyspace identifier
- `collection`: Collection identifier
- `doc_limit`: Maximum document chunks to retrieve (default: 10)
- `**kwargs`: Additional parameters passed to the service
- `Yields`:
- `Union[RAGChunk, ProvenanceEvent]`: Content chunks and provenance events
**Example:**
```python
from trustgraph.api import Api, ExplainabilityClient, RAGChunk, ProvenanceEvent
socket = api.socket()
flow = socket.flow("default")
explain_client = ExplainabilityClient(flow)
for item in flow.document_rag_explain(
query="Summarize the key findings",
user="trustgraph",
collection="research-papers",
doc_limit=5
):
if isinstance(item, RAGChunk):
print(item.content, end='', flush=True)
elif isinstance(item, ProvenanceEvent):
# Fetch entity details
entity = explain_client.fetch_entity(
item.explain_id,
graph=item.explain_graph,
user="trustgraph",
collection="research-papers"
)
print(f"Event: {entity}", file=sys.stderr)
```
### `embeddings(self, texts: list, **kwargs: Any) -> Dict[str, Any]`
Generate vector embeddings for one or more texts.
**Arguments:**
- `texts`: List of input texts to embed
- `**kwargs`: Additional parameters passed to the service
**Returns:** dict: Response containing vectors (one set per input text)
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
result = flow.embeddings(["quantum computing"])
vectors = result.get("vectors", [])
```
### `graph_embeddings_query(self, text: str, user: str, collection: str, limit: int = 10, **kwargs: Any) -> Dict[str, Any]`
Query knowledge graph entities using semantic similarity.
**Arguments:**
- `text`: Query text for semantic search
- `user`: User/keyspace identifier
- `collection`: Collection identifier
- `limit`: Maximum number of results (default: 10)
- `**kwargs`: Additional parameters passed to the service
**Returns:** dict: Query results with similar entities
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
results = flow.graph_embeddings_query(
text="physicist who discovered radioactivity",
user="trustgraph",
collection="scientists",
limit=5
)
```
### `graph_rag(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, streaming: bool = False, **kwargs: Any) -> str | Iterator[str]`
### `graph_rag(self, query: str, user: str, collection: str, entity_limit: int = 50, triple_limit: int = 30, max_subgraph_size: int = 1000, max_path_length: int = 2, edge_score_limit: int = 30, edge_limit: int = 25, streaming: bool = False, **kwargs: Any) -> str | Iterator[str]`
Execute graph-based RAG query with optional streaming.
Uses knowledge graph structure to find relevant context, then generates
a response using an LLM. Streaming mode delivers results incrementally.
**Arguments:**
- `query`: Natural language query
- `user`: User/keyspace identifier
- `collection`: Collection identifier
- `max_subgraph_size`: Maximum total triples in subgraph (default: 1000)
- `max_subgraph_count`: Maximum number of subgraphs (default: 5)
- `max_entity_distance`: Maximum traversal depth (default: 3)
- `streaming`: Enable streaming mode (default: False)
- `**kwargs`: Additional parameters passed to the service
**Returns:** Union[str, Iterator[str]]: Complete response or stream of text chunks
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
# Streaming graph RAG
for chunk in flow.graph_rag(
query="Tell me about Marie Curie",
user="trustgraph",
collection="scientists",
streaming=True
):
print(chunk, end='', flush=True)
```
### `graph_rag_explain(self, query: str, user: str, collection: str, max_subgraph_size: int = 1000, max_subgraph_count: int = 5, max_entity_distance: int = 3, **kwargs: Any) -> Iterator[trustgraph.api.types.RAGChunk | trustgraph.api.types.ProvenanceEvent]`
### `graph_rag_explain(self, query: str, user: str, collection: str, entity_limit: int = 50, triple_limit: int = 30, max_subgraph_size: int = 1000, max_path_length: int = 2, edge_score_limit: int = 30, edge_limit: int = 25, **kwargs: Any) -> Iterator[trustgraph.api.types.RAGChunk | trustgraph.api.types.ProvenanceEvent]`
Execute graph-based RAG query with explainability support.
Streams both content chunks (RAGChunk) and provenance events (ProvenanceEvent).
Provenance events contain URIs that can be fetched using ExplainabilityClient
to get detailed information about how the response was generated.
**Arguments:**
- `query`: Natural language query
- `user`: User/keyspace identifier
- `collection`: Collection identifier
- `max_subgraph_size`: Maximum total triples in subgraph (default: 1000)
- `max_subgraph_count`: Maximum number of subgraphs (default: 5)
- `max_entity_distance`: Maximum traversal depth (default: 3)
- `**kwargs`: Additional parameters passed to the service
- `Yields`:
- `Union[RAGChunk, ProvenanceEvent]`: Content chunks and provenance events
**Example:**
```python
from trustgraph.api import Api, ExplainabilityClient, RAGChunk, ProvenanceEvent
socket = api.socket()
flow = socket.flow("default")
explain_client = ExplainabilityClient(flow)
provenance_ids = []
response_text = ""
for item in flow.graph_rag_explain(
query="Tell me about Marie Curie",
user="trustgraph",
collection="scientists"
):
if isinstance(item, RAGChunk):
response_text += item.content
print(item.content, end='', flush=True)
elif isinstance(item, ProvenanceEvent):
provenance_ids.append(item.provenance_id)
# Fetch explainability details
for prov_id in provenance_ids:
entity = explain_client.fetch_entity(
prov_id,
graph="urn:graph:retrieval",
user="trustgraph",
collection="scientists"
)
print(f"Entity: {entity}")
```
### `mcp_tool(self, name: str, parameters: Dict[str, Any], **kwargs: Any) -> Dict[str, Any]`
Execute a Model Context Protocol (MCP) tool.
**Arguments:**
- `name`: Tool name/identifier
- `parameters`: Tool parameters dictionary
- `**kwargs`: Additional parameters passed to the service
**Returns:** dict: Tool execution result
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
result = flow.mcp_tool(
name="search-web",
parameters={"query": "latest AI news", "limit": 5}
)
```
### `prompt(self, id: str, variables: Dict[str, str], streaming: bool = False, **kwargs: Any) -> str | Iterator[str]`
Execute a prompt template with optional streaming.
**Arguments:**
- `id`: Prompt template identifier
- `variables`: Dictionary of variable name to value mappings
- `streaming`: Enable streaming mode (default: False)
- `**kwargs`: Additional parameters passed to the service
**Returns:** Union[str, Iterator[str]]: Complete response or stream of text chunks
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
# Streaming prompt execution
for chunk in flow.prompt(
id="summarize-template",
variables={"topic": "quantum computing", "length": "brief"},
streaming=True
):
print(chunk, end='', flush=True)
```
### `row_embeddings_query(self, text: str, schema_name: str, user: str = 'trustgraph', collection: str = 'default', index_name: str | None = None, limit: int = 10, **kwargs: Any) -> Dict[str, Any]`
Query row data using semantic similarity on indexed fields.
Finds rows whose indexed field values are semantically similar to the
input text, using vector embeddings. This enables fuzzy/semantic matching
on structured data.
**Arguments:**
- `text`: Query text for semantic search
- `schema_name`: Schema name to search within
- `user`: User/keyspace identifier (default: "trustgraph")
- `collection`: Collection identifier (default: "default")
- `index_name`: Optional index name to filter search to specific index
- `limit`: Maximum number of results (default: 10)
- `**kwargs`: Additional parameters passed to the service
**Returns:** dict: Query results with matches containing index_name, index_value, text, and score
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
# Search for customers by name similarity
results = flow.row_embeddings_query(
text="John Smith",
schema_name="customers",
user="trustgraph",
collection="sales",
limit=5
)
# Filter to specific index
results = flow.row_embeddings_query(
text="machine learning engineer",
schema_name="employees",
index_name="job_title",
limit=10
)
```
### `rows_query(self, query: str, user: str, collection: str, variables: Dict[str, Any] | None = None, operation_name: str | None = None, **kwargs: Any) -> Dict[str, Any]`
Execute a GraphQL query against structured rows.
**Arguments:**
### `sparql_query_stream(self, query: str, user: str = 'trustgraph', collection: str = 'default', limit: int = 10000, batch_size: int = 20, **kwargs: Any) -> Iterator[Dict[str, Any]]`
- `query`: GraphQL query string
- `user`: User/keyspace identifier
- `collection`: Collection identifier
- `variables`: Optional query variables dictionary
- `operation_name`: Optional operation name for multi-operation documents
- `**kwargs`: Additional parameters passed to the service
**Returns:** dict: GraphQL response with data, errors, and/or extensions
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
query = '''
{
scientists(limit: 10) {
name
field
discoveries
}
}
'''
result = flow.rows_query(
query=query,
user="trustgraph",
collection="scientists"
)
```
Execute a SPARQL query with streaming batches.
### `text_completion(self, system: str, prompt: str, streaming: bool = False, **kwargs) -> str | Iterator[str]`
Execute text completion with optional streaming.
**Arguments:**
- `system`: System prompt defining the assistant's behavior
- `prompt`: User prompt/question
- `streaming`: Enable streaming mode (default: False)
- `**kwargs`: Additional parameters passed to the service
**Returns:** Union[str, Iterator[str]]: Complete response or stream of text chunks
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
# Non-streaming
response = flow.text_completion(
system="You are helpful",
prompt="Explain quantum computing",
streaming=False
)
print(response)
# Streaming
for chunk in flow.text_completion(
system="You are helpful",
prompt="Explain quantum computing",
streaming=True
):
print(chunk, end='', flush=True)
```
### `triples_query(self, s: str | Dict[str, Any] | None = None, p: str | Dict[str, Any] | None = None, o: str | Dict[str, Any] | None = None, g: str | None = None, user: str | None = None, collection: str | None = None, limit: int = 100, **kwargs: Any) -> List[Dict[str, Any]]`
Query knowledge graph triples using pattern matching.
**Arguments:**
- `s`: Subject filter - URI string, Term dict, or None for wildcard
- `p`: Predicate filter - URI string, Term dict, or None for wildcard
- `o`: Object filter - URI/literal string, Term dict, or None for wildcard
- `g`: Named graph filter - URI string or None for all graphs
- `user`: User/keyspace identifier (optional)
- `collection`: Collection identifier (optional)
- `limit`: Maximum results to return (default: 100)
- `**kwargs`: Additional parameters passed to the service
**Returns:** List[Dict]: List of matching triples in wire format
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
# Find all triples about a specific subject
triples = flow.triples_query(
s="http://example.org/person/marie-curie",
user="trustgraph",
collection="scientists"
)
# Query with named graph filter
triples = flow.triples_query(
s="urn:trustgraph:session:abc123",
g="urn:graph:retrieval",
user="trustgraph",
collection="default"
)
```
### `triples_query_stream(self, s: str | Dict[str, Any] | None = None, p: str | Dict[str, Any] | None = None, o: str | Dict[str, Any] | None = None, g: str | None = None, user: str | None = None, collection: str | None = None, limit: int = 100, batch_size: int = 20, **kwargs: Any) -> Iterator[List[Dict[str, Any]]]`
Query knowledge graph triples with streaming batches.
Yields batches of triples as they arrive, reducing time-to-first-result
and memory overhead for large result sets.
**Arguments:**
- `s`: Subject filter - URI string, Term dict, or None for wildcard
- `p`: Predicate filter - URI string, Term dict, or None for wildcard
- `o`: Object filter - URI/literal string, Term dict, or None for wildcard
- `g`: Named graph filter - URI string or None for all graphs
- `user`: User/keyspace identifier (optional)
- `collection`: Collection identifier (optional)
- `limit`: Maximum results to return (default: 100)
- `batch_size`: Triples per batch (default: 20)
- `**kwargs`: Additional parameters passed to the service
- `Yields`:
- `List[Dict]`: Batches of triples in wire format
**Example:**
```python
socket = api.socket()
flow = socket.flow("default")
for batch in flow.triples_query_stream(
user="trustgraph",
collection="default"
):
for triple in batch:
print(triple["s"], triple["p"], triple["o"])
```
---
@ -2618,17 +2071,35 @@ for batch in flow.triples_query_stream(
from trustgraph.api import AsyncSocketClient
```
Asynchronous WebSocket client
Asynchronous WebSocket client with persistent connection.
Maintains a single websocket connection and multiplexes requests
by ID, routing responses via a background reader task.
Use as an async context manager for proper lifecycle management:
async with AsyncSocketClient(url, timeout, token) as client:
result = await client._send_request(...)
Or call connect()/aclose() manually.
### Methods
### `__aenter__(self)`
### `__aexit__(self, exc_type, exc_val, exc_tb)`
### `__init__(self, url: str, timeout: int, token: str | None)`
Initialize self. See help(type(self)) for accurate signature.
### `aclose(self)`
Close WebSocket connection
Close the persistent WebSocket connection cleanly.
### `connect(self)`
Establish the persistent websocket connection.
### `flow(self, flow_id: str)`
@ -3151,7 +2622,10 @@ Detect whether a session is GraphRAG or Agent type.
Fetch the complete Agent trace starting from a session URI.
Follows the provenance chain: Question -> Analysis(s) -> Conclusion
Follows the provenance chain for all patterns:
- ReAct: Question -> Analysis(s) -> Conclusion
- Supervisor: Question -> Decomposition -> Finding(s) -> Synthesis
- Plan-then-Execute: Question -> Plan -> StepResult(s) -> Synthesis
**Arguments:**
@ -3162,7 +2636,7 @@ Follows the provenance chain: Question -> Analysis(s) -> Conclusion
- `api`: TrustGraph Api instance for librarian access (optional)
- `max_content`: Maximum content length for conclusion
**Returns:** Dict with question, iterations (Analysis list), conclusion entities
**Returns:** Dict with question, steps (mixed entity list), conclusion/synthesis
### `fetch_docrag_trace(self, question_uri: str, graph: str | None = None, user: str | None = None, collection: str | None = None, api: Any = None, max_content: int = 10000) -> Dict[str, Any]`
@ -3423,7 +2897,7 @@ Initialize self. See help(type(self)) for accurate signature.
from trustgraph.api import Analysis
```
Analysis entity - one think/act/observe cycle (Agent only).
Analysis+ToolUse entity - decision + tool call (Agent only).
**Fields:**
@ -3432,11 +2906,33 @@ Analysis entity - one think/act/observe cycle (Agent only).
- `action`: <class 'str'>
- `arguments`: <class 'str'>
- `thought`: <class 'str'>
- `observation`: <class 'str'>
### Methods
### `__init__(self, uri: str, entity_type: str = '', action: str = '', arguments: str = '', thought: str = '', observation: str = '') -> None`
### `__init__(self, uri: str, entity_type: str = '', action: str = '', arguments: str = '', thought: str = '') -> None`
Initialize self. See help(type(self)) for accurate signature.
---
## `Observation`
```python
from trustgraph.api import Observation
```
Observation entity - standalone tool result (Agent only).
**Fields:**
- `uri`: <class 'str'>
- `entity_type`: <class 'str'>
- `document`: <class 'str'>
### Methods
### `__init__(self, uri: str, entity_type: str = '', document: str = '') -> None`
Initialize self. See help(type(self)) for accurate signature.
@ -3761,10 +3257,11 @@ These chunks show how the agent is thinking about the problem.
- `content`: <class 'str'>
- `end_of_message`: <class 'bool'>
- `chunk_type`: <class 'str'>
- `message_id`: <class 'str'>
### Methods
### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'thought') -> None`
### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'thought', message_id: str = '') -> None`
Initialize self. See help(type(self)) for accurate signature.
@ -3787,10 +3284,11 @@ These chunks show what the agent learned from using tools.
- `content`: <class 'str'>
- `end_of_message`: <class 'bool'>
- `chunk_type`: <class 'str'>
- `message_id`: <class 'str'>
### Methods
### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'observation') -> None`
### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'observation', message_id: str = '') -> None`
Initialize self. See help(type(self)) for accurate signature.
@ -3818,10 +3316,11 @@ its reasoning and tool use.
- `end_of_message`: <class 'bool'>
- `chunk_type`: <class 'str'>
- `end_of_dialog`: <class 'bool'>
- `message_id`: <class 'str'>
### Methods
### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'final-answer', end_of_dialog: bool = False) -> None`
### `__init__(self, content: str, end_of_message: bool = False, chunk_type: str = 'final-answer', end_of_dialog: bool = False, message_id: str = '') -> None`
Initialize self. See help(type(self)) for accurate signature.
@ -3864,7 +3363,7 @@ from trustgraph.api import ProvenanceEvent
Provenance event for explainability.
Emitted during GraphRAG queries when explainable mode is enabled.
Emitted during retrieval queries when explainable mode is enabled.
Each event represents a provenance node created during query processing.
**Fields:**
@ -3872,10 +3371,12 @@ Each event represents a provenance node created during query processing.
- `explain_id`: <class 'str'>
- `explain_graph`: <class 'str'>
- `event_type`: <class 'str'>
- `entity`: <class 'object'>
- `triples`: <class 'list'>
### Methods
### `__init__(self, explain_id: str, explain_graph: str = '', event_type: str = '') -> None`
### `__init__(self, explain_id: str, explain_graph: str = '', event_type: str = '', entity: object = None, triples: list = <factory>) -> None`
Initialize self. See help(type(self)) for accurate signature.