Adding explainability to the ReACT agent (#689)

* Added tech spec

* Add provenance recording to React agent loop

Enables agent sessions to be traced and debugged using the same
explainability infrastructure as GraphRAG. Agent traces record:
- Session start with query and timestamp
- Each iteration's thought, action, arguments, and observation
- Final answer with derivation chain

Changes:
- Add session_id and collection fields to AgentRequest schema
- Add agent predicates (TG_THOUGHT, TG_ACTION, etc.) to namespaces
- Create agent provenance triple generators in provenance/agent.py
- Register explainability producer in agent service
- Emit provenance triples during agent execution
- Update CLI tools to detect and render agent traces alongside GraphRAG

* Updated explainability taxonomy:

GraphRAG: tg:Question → tg:Exploration → tg:Focus → tg:Synthesis

Agent: tg:Question → tg:Analysis(s) → tg:Conclusion

All entities also have their PROV-O type (prov:Activity or prov:Entity).

Updated commit message:

Add provenance recording to React agent loop

Enables agent sessions to be traced and debugged using the same
explainability infrastructure as GraphRAG.

Entity types follow human reasoning patterns:
- tg:Question - the user's query (shared with GraphRAG)
- tg:Analysis - each think/act/observe cycle
- tg:Conclusion - the final answer

Also adds explicit TG types to GraphRAG entities:
- tg:Question, tg:Exploration, tg:Focus, tg:Synthesis

All types retain their PROV-O base types (prov:Activity, prov:Entity).

Changes:
- Add session_id and collection fields to AgentRequest schema
- Add explainability entity types to namespaces.py
- Create agent provenance triple generators
- Register explainability producer in agent service
- Emit provenance triples during agent execution
- Update CLI tools to detect and render both trace types

* Document RAG explainability is now complete. Here's a summary of the
changes made:

Schema Changes:
- trustgraph-base/trustgraph/schema/services/retrieval.py: Added
  explain_id and explain_graph fields to DocumentRagResponse
- trustgraph-base/trustgraph/messaging/translators/retrieval.py:
  Updated translator to handle explainability fields

Provenance Changes:
- trustgraph-base/trustgraph/provenance/namespaces.py: Added
  TG_CHUNK_COUNT and TG_SELECTED_CHUNK predicates
- trustgraph-base/trustgraph/provenance/uris.py: Added
  docrag_question_uri, docrag_exploration_uri, docrag_synthesis_uri
  generators
- trustgraph-base/trustgraph/provenance/triples.py: Added
  docrag_question_triples, docrag_exploration_triples,
  docrag_synthesis_triples builders
- trustgraph-base/trustgraph/provenance/__init__.py: Exported all
  new Document RAG functions and predicates

Service Changes:
- trustgraph-flow/trustgraph/retrieval/document_rag/document_rag.py:
  Added explainability callback support and triple emission at each
  phase (Question → Exploration → Synthesis)
- trustgraph-flow/trustgraph/retrieval/document_rag/rag.py:
  Registered explainability producer and wired up the callback

Documentation:
- docs/tech-specs/agent-explainability.md: Added Document RAG entity
  types and provenance model documentation

Document RAG Provenance Model:
Question (urn:trustgraph:docrag:{uuid})
    │
    │  tg:query, prov:startedAtTime
    │  rdf:type = prov:Activity, tg:Question
    │
    ↓ prov:wasGeneratedBy
    │
Exploration (urn:trustgraph:docrag:{uuid}/exploration)
    │
    │  tg:chunkCount, tg:selectedChunk (multiple)
    │  rdf:type = prov:Entity, tg:Exploration
    │
    ↓ prov:wasDerivedFrom
    │
Synthesis (urn:trustgraph:docrag:{uuid}/synthesis)
    │
    │  tg:content = "The answer..."
    │  rdf:type = prov:Entity, tg:Synthesis

* Specific subtype that makes the retrieval mechanism immediately
obvious:

System: GraphRAG
TG Types on Question: tg:Question, tg:GraphRagQuestion
URI Pattern: urn:trustgraph:question:{uuid}
────────────────────────────────────────
System: Document RAG
TG Types on Question: tg:Question, tg:DocRagQuestion
URI Pattern: urn:trustgraph:docrag:{uuid}
────────────────────────────────────────
System: Agent
TG Types on Question: tg:Question, tg:AgentQuestion
URI Pattern: urn:trustgraph:agent:{uuid}
Files modified:
- trustgraph-base/trustgraph/provenance/namespaces.py - Added
TG_GRAPH_RAG_QUESTION, TG_DOC_RAG_QUESTION, TG_AGENT_QUESTION
- trustgraph-base/trustgraph/provenance/triples.py - Added subtype to
question_triples and docrag_question_triples
- trustgraph-base/trustgraph/provenance/agent.py - Added subtype to
agent_session_triples
- trustgraph-base/trustgraph/provenance/__init__.py - Exported new types
- docs/tech-specs/agent-explainability.md - Documented the subtypes

This allows:
- Query all questions: ?q rdf:type tg:Question
- Query only GraphRAG: ?q rdf:type tg:GraphRagQuestion
- Query only Document RAG: ?q rdf:type tg:DocRagQuestion
- Query only Agent: ?q rdf:type tg:AgentQuestion

* Fixed tests
This commit is contained in:
cybermaggedon 2026-03-11 15:28:15 +00:00 committed by GitHub
parent a53ed41da2
commit 312174eb88
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 1269 additions and 44 deletions

View file

@ -0,0 +1,272 @@
# Agent Explainability: Provenance Recording
## Overview
Add provenance recording to the React agent loop so agent sessions can be traced and debugged using the same explainability infrastructure as GraphRAG.
**Design Decisions:**
- Write to `urn:graph:retrieval` (generic explainability graph)
- Linear dependency chain for now (analysis N → wasDerivedFrom → analysis N-1)
- Tools are opaque black boxes (record input/output only)
- DAG support deferred to future iteration
## Entity Types
Both GraphRAG and Agent use PROV-O as the base ontology with TrustGraph-specific subtypes:
### GraphRAG Types
| Entity | PROV-O Type | TG Types | Description |
|--------|-------------|----------|-------------|
| Question | `prov:Activity` | `tg:Question`, `tg:GraphRagQuestion` | The user's query |
| Exploration | `prov:Entity` | `tg:Exploration` | Edges retrieved from knowledge graph |
| Focus | `prov:Entity` | `tg:Focus` | Selected edges with reasoning |
| Synthesis | `prov:Entity` | `tg:Synthesis` | Final answer |
### Agent Types
| Entity | PROV-O Type | TG Types | Description |
|--------|-------------|----------|-------------|
| Question | `prov:Activity` | `tg:Question`, `tg:AgentQuestion` | The user's query |
| Analysis | `prov:Entity` | `tg:Analysis` | Each think/act/observe cycle |
| Conclusion | `prov:Entity` | `tg:Conclusion` | Final answer |
### Document RAG Types
| Entity | PROV-O Type | TG Types | Description |
|--------|-------------|----------|-------------|
| Question | `prov:Activity` | `tg:Question`, `tg:DocRagQuestion` | The user's query |
| Exploration | `prov:Entity` | `tg:Exploration` | Chunks retrieved from document store |
| Synthesis | `prov:Entity` | `tg:Synthesis` | Final answer |
**Note:** Document RAG uses a subset of GraphRAG's types (no Focus step since there's no edge selection/reasoning phase).
### Question Subtypes
All Question entities share `tg:Question` as a base type but have a specific subtype to identify the retrieval mechanism:
| Subtype | URI Pattern | Mechanism |
|---------|-------------|-----------|
| `tg:GraphRagQuestion` | `urn:trustgraph:question:{uuid}` | Knowledge graph RAG |
| `tg:DocRagQuestion` | `urn:trustgraph:docrag:{uuid}` | Document/chunk RAG |
| `tg:AgentQuestion` | `urn:trustgraph:agent:{uuid}` | ReAct agent |
This allows querying all questions via `tg:Question` while filtering by specific mechanism via the subtype.
## Provenance Model
```
Question (urn:trustgraph:agent:{uuid})
│ tg:query = "User's question"
│ prov:startedAtTime = timestamp
│ rdf:type = prov:Activity, tg:Question
↓ prov:wasDerivedFrom
Analysis1 (urn:trustgraph:agent:{uuid}/i1)
│ tg:thought = "I need to query the knowledge base..."
│ tg:action = "knowledge-query"
│ tg:arguments = {"question": "..."}
│ tg:observation = "Result from tool..."
│ rdf:type = prov:Entity, tg:Analysis
↓ prov:wasDerivedFrom
Analysis2 (urn:trustgraph:agent:{uuid}/i2)
│ ...
↓ prov:wasDerivedFrom
Conclusion (urn:trustgraph:agent:{uuid}/final)
│ tg:answer = "The final response..."
│ rdf:type = prov:Entity, tg:Conclusion
```
### Document RAG Provenance Model
```
Question (urn:trustgraph:docrag:{uuid})
│ tg:query = "User's question"
│ prov:startedAtTime = timestamp
│ rdf:type = prov:Activity, tg:Question
↓ prov:wasGeneratedBy
Exploration (urn:trustgraph:docrag:{uuid}/exploration)
│ tg:chunkCount = 5
│ tg:selectedChunk = "chunk-id-1"
│ tg:selectedChunk = "chunk-id-2"
│ ...
│ rdf:type = prov:Entity, tg:Exploration
↓ prov:wasDerivedFrom
Synthesis (urn:trustgraph:docrag:{uuid}/synthesis)
│ tg:content = "The synthesized answer..."
│ rdf:type = prov:Entity, tg:Synthesis
```
## Changes Required
### 1. Schema Changes
**File:** `trustgraph-base/trustgraph/schema/services/agent.py`
Add `session_id` and `collection` fields to `AgentRequest`:
```python
@dataclass
class AgentRequest:
question: str = ""
state: str = ""
group: list[str] | None = None
history: list[AgentStep] = field(default_factory=list)
user: str = ""
collection: str = "default" # NEW: Collection for provenance traces
streaming: bool = False
session_id: str = "" # NEW: For provenance tracking across iterations
```
**File:** `trustgraph-base/trustgraph/messaging/translators/agent.py`
Update translator to handle `session_id` and `collection` in both `to_pulsar()` and `from_pulsar()`.
### 2. Add Explainability Producer to Agent Service
**File:** `trustgraph-flow/trustgraph/agent/react/service.py`
Register an "explainability" producer (same pattern as GraphRAG):
```python
from ... base import ProducerSpec
from ... schema import Triples
# In __init__:
self.register_specification(
ProducerSpec(
name = "explainability",
schema = Triples,
)
)
```
### 3. Provenance Triple Generation
**File:** `trustgraph-base/trustgraph/provenance/agent.py`
Create helper functions (similar to GraphRAG's `question_triples`, `exploration_triples`, etc.):
```python
def agent_session_triples(session_uri, query, timestamp):
"""Generate triples for agent Question."""
return [
Triple(s=session_uri, p=RDF_TYPE, o=PROV_ACTIVITY),
Triple(s=session_uri, p=RDF_TYPE, o=TG_QUESTION),
Triple(s=session_uri, p=TG_QUERY, o=query),
Triple(s=session_uri, p=PROV_STARTED_AT_TIME, o=timestamp),
]
def agent_iteration_triples(iteration_uri, parent_uri, thought, action, arguments, observation):
"""Generate triples for one Analysis step."""
return [
Triple(s=iteration_uri, p=RDF_TYPE, o=PROV_ENTITY),
Triple(s=iteration_uri, p=RDF_TYPE, o=TG_ANALYSIS),
Triple(s=iteration_uri, p=TG_THOUGHT, o=thought),
Triple(s=iteration_uri, p=TG_ACTION, o=action),
Triple(s=iteration_uri, p=TG_ARGUMENTS, o=json.dumps(arguments)),
Triple(s=iteration_uri, p=TG_OBSERVATION, o=observation),
Triple(s=iteration_uri, p=PROV_WAS_DERIVED_FROM, o=parent_uri),
]
def agent_final_triples(final_uri, parent_uri, answer):
"""Generate triples for Conclusion."""
return [
Triple(s=final_uri, p=RDF_TYPE, o=PROV_ENTITY),
Triple(s=final_uri, p=RDF_TYPE, o=TG_CONCLUSION),
Triple(s=final_uri, p=TG_ANSWER, o=answer),
Triple(s=final_uri, p=PROV_WAS_DERIVED_FROM, o=parent_uri),
]
```
### 4. Type Definitions
**File:** `trustgraph-base/trustgraph/provenance/namespaces.py`
Add explainability entity types and agent predicates:
```python
# Explainability entity types (used by both GraphRAG and Agent)
TG_QUESTION = TG + "Question"
TG_EXPLORATION = TG + "Exploration"
TG_FOCUS = TG + "Focus"
TG_SYNTHESIS = TG + "Synthesis"
TG_ANALYSIS = TG + "Analysis"
TG_CONCLUSION = TG + "Conclusion"
# Agent predicates
TG_THOUGHT = TG + "thought"
TG_ACTION = TG + "action"
TG_ARGUMENTS = TG + "arguments"
TG_OBSERVATION = TG + "observation"
TG_ANSWER = TG + "answer"
```
## Files Modified
| File | Change |
|------|--------|
| `trustgraph-base/trustgraph/schema/services/agent.py` | Add session_id and collection to AgentRequest |
| `trustgraph-base/trustgraph/messaging/translators/agent.py` | Update translator for new fields |
| `trustgraph-base/trustgraph/provenance/namespaces.py` | Add entity types, agent predicates, and Document RAG predicates |
| `trustgraph-base/trustgraph/provenance/triples.py` | Add TG types to GraphRAG triple builders, add Document RAG triple builders |
| `trustgraph-base/trustgraph/provenance/uris.py` | Add Document RAG URI generators |
| `trustgraph-base/trustgraph/provenance/__init__.py` | Export new types, predicates, and Document RAG functions |
| `trustgraph-base/trustgraph/schema/services/retrieval.py` | Add explain_id and explain_graph to DocumentRagResponse |
| `trustgraph-base/trustgraph/messaging/translators/retrieval.py` | Update DocumentRagResponseTranslator for explainability fields |
| `trustgraph-flow/trustgraph/agent/react/service.py` | Add explainability producer + recording logic |
| `trustgraph-flow/trustgraph/retrieval/document_rag/document_rag.py` | Add explainability callback and emit provenance triples |
| `trustgraph-flow/trustgraph/retrieval/document_rag/rag.py` | Add explainability producer and wire up callback |
| `trustgraph-cli/trustgraph/cli/show_explain_trace.py` | Handle agent trace types |
| `trustgraph-cli/trustgraph/cli/list_explain_traces.py` | List agent sessions alongside GraphRAG |
## Files Created
| File | Purpose |
|------|---------|
| `trustgraph-base/trustgraph/provenance/agent.py` | Agent-specific triple generators |
## CLI Updates
**Detection:** Both GraphRAG and Agent Questions have `tg:Question` type. Distinguished by:
1. URI pattern: `urn:trustgraph:agent:` vs `urn:trustgraph:question:`
2. Derived entities: `tg:Analysis` (agent) vs `tg:Exploration` (GraphRAG)
**`list_explain_traces.py`:**
- Shows Type column (Agent vs GraphRAG)
**`show_explain_trace.py`:**
- Auto-detects trace type
- Agent rendering shows: Question → Analysis step(s) → Conclusion
## Backwards Compatibility
- `session_id` defaults to `""` - old requests work, just won't have provenance
- `collection` defaults to `"default"` - reasonable fallback
- CLI gracefully handles both trace types
## Verification
```bash
# Run an agent query
tg-invoke-agent -q "What is the capital of France?"
# List traces (should show agent sessions with Type column)
tg-list-explain-traces -U trustgraph -C default
# Show agent trace
tg-show-explain-trace "urn:trustgraph:agent:xxx"
```
## Future Work (Not This PR)
- DAG dependencies (when analysis N uses results from multiple prior analyses)
- Tool-specific provenance linking (KnowledgeQuery → its GraphRAG trace)
- Streaming provenance emission (emit as we go, not batch at end)

View file

@ -208,9 +208,12 @@ class TestQuery:
collection="test_collection"
)
# Verify result is list of fetched document content
assert "Document 1 content" in result
assert "Document 2 content" in result
# Verify result is tuple of (docs, chunk_ids)
docs, chunk_ids = result
assert "Document 1 content" in docs
assert "Document 2 content" in docs
assert "doc/c1" in chunk_ids
assert "doc/c2" in chunk_ids
@pytest.mark.asyncio
async def test_document_rag_query_method(self, mock_fetch_chunk):
@ -350,8 +353,10 @@ class TestQuery:
mock_embeddings_client.embed.assert_called_once_with(["verbose test"])
mock_doc_embeddings_client.query.assert_called_once()
# Verify result contains fetched content
assert "Verbose test doc" in result
# Verify result is tuple of (docs, chunk_ids) with fetched content
docs, chunk_ids = result
assert "Verbose test doc" in docs
assert "doc/c6" in chunk_ids
@pytest.mark.asyncio
async def test_document_rag_query_with_verbose(self, mock_fetch_chunk):
@ -426,8 +431,8 @@ class TestQuery:
mock_embeddings_client.embed.assert_called_once_with(["query with no results"])
mock_doc_embeddings_client.query.assert_called_once()
# Verify empty result is returned
assert result == []
# Verify empty result is returned (tuple of empty lists)
assert result == ([], [])
@pytest.mark.asyncio
async def test_document_rag_query_with_empty_documents(self, mock_fetch_chunk):

View file

@ -5,7 +5,7 @@ passed to the DocumentRag.query() method.
"""
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from unittest.mock import MagicMock, AsyncMock, patch, ANY
from trustgraph.retrieval.document_rag.rag import Processor
from trustgraph.schema import DocumentRagQuery, DocumentRagResponse
@ -65,8 +65,9 @@ class TestDocumentRagService:
mock_rag_instance.query.assert_called_once_with(
"test query",
user="my_user", # Must be from message, not hardcoded default
collection="test_coll_1", # Must be from message, not hardcoded default
doc_limit=5
collection="test_coll_1", # Must be from message, not hardcoded default
doc_limit=5,
explain_callback=ANY, # Explainability callback is always passed
)
# Verify response was sent

View file

@ -13,7 +13,9 @@ class AgentRequestTranslator(MessageTranslator):
group=data.get("group", None),
history=data.get("history", []),
user=data.get("user", "trustgraph"),
streaming=data.get("streaming", False)
collection=data.get("collection", "default"),
streaming=data.get("streaming", False),
session_id=data.get("session_id", ""),
)
def from_pulsar(self, obj: AgentRequest) -> Dict[str, Any]:
@ -23,7 +25,9 @@ class AgentRequestTranslator(MessageTranslator):
"group": obj.group,
"history": obj.history,
"user": obj.user,
"streaming": getattr(obj, "streaming", False)
"collection": getattr(obj, "collection", "default"),
"streaming": getattr(obj, "streaming", False),
"session_id": getattr(obj, "session_id", ""),
}

View file

@ -38,6 +38,16 @@ class DocumentRagResponseTranslator(MessageTranslator):
if obj.response is not None:
result["response"] = obj.response
# Include explain_id for explain messages
explain_id = getattr(obj, "explain_id", None)
if explain_id:
result["explain_id"] = explain_id
# Include explain_graph for explain messages (named graph filter)
explain_graph = getattr(obj, "explain_graph", None)
if explain_graph is not None:
result["explain_graph"] = explain_graph
# Include end_of_stream flag
result["end_of_stream"] = getattr(obj, "end_of_stream", False)

View file

@ -40,11 +40,19 @@ from . uris import (
activity_uri,
statement_uri,
agent_uri,
# Query-time provenance URIs
# Query-time provenance URIs (GraphRAG)
question_uri,
exploration_uri,
focus_uri,
synthesis_uri,
# Agent provenance URIs
agent_session_uri,
agent_iteration_uri,
agent_final_uri,
# Document RAG provenance URIs
docrag_question_uri,
docrag_exploration_uri,
docrag_synthesis_uri,
)
# Namespace constants
@ -63,8 +71,17 @@ from . namespaces import (
TG_CHUNK_SIZE, TG_CHUNK_OVERLAP, TG_COMPONENT_VERSION,
TG_LLM_MODEL, TG_ONTOLOGY, TG_EMBEDDING_MODEL,
TG_SOURCE_TEXT, TG_SOURCE_CHAR_OFFSET, TG_SOURCE_CHAR_LENGTH,
# Query-time provenance predicates
# Query-time provenance predicates (GraphRAG)
TG_QUERY, TG_EDGE_COUNT, TG_SELECTED_EDGE, TG_REASONING, TG_CONTENT,
# Query-time provenance predicates (DocumentRAG)
TG_CHUNK_COUNT, TG_SELECTED_CHUNK,
# Explainability entity types
TG_QUESTION, TG_EXPLORATION, TG_FOCUS, TG_SYNTHESIS,
TG_ANALYSIS, TG_CONCLUSION,
# Question subtypes (to distinguish retrieval mechanism)
TG_GRAPH_RAG_QUESTION, TG_DOC_RAG_QUESTION, TG_AGENT_QUESTION,
# Agent provenance predicates
TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_OBSERVATION, TG_ANSWER,
# Named graphs
GRAPH_DEFAULT, GRAPH_SOURCE, GRAPH_RETRIEVAL,
)
@ -74,15 +91,26 @@ from . triples import (
document_triples,
derived_entity_triples,
triple_provenance_triples,
# Query-time provenance triple builders
# Query-time provenance triple builders (GraphRAG)
question_triples,
exploration_triples,
focus_triples,
synthesis_triples,
# Query-time provenance triple builders (DocumentRAG)
docrag_question_triples,
docrag_exploration_triples,
docrag_synthesis_triples,
# Utility
set_graph,
)
# Agent provenance triple builders
from . agent import (
agent_session_triples,
agent_iteration_triples,
agent_final_triples,
)
# Vocabulary bootstrap
from . vocabulary import (
get_vocabulary_triples,
@ -107,6 +135,14 @@ __all__ = [
"exploration_uri",
"focus_uri",
"synthesis_uri",
# Agent provenance URIs
"agent_session_uri",
"agent_iteration_uri",
"agent_final_uri",
# Document RAG provenance URIs
"docrag_question_uri",
"docrag_exploration_uri",
"docrag_synthesis_uri",
# Namespaces
"PROV", "PROV_ENTITY", "PROV_ACTIVITY", "PROV_AGENT",
"PROV_WAS_DERIVED_FROM", "PROV_WAS_GENERATED_BY",
@ -118,19 +154,36 @@ __all__ = [
"TG_CHUNK_SIZE", "TG_CHUNK_OVERLAP", "TG_COMPONENT_VERSION",
"TG_LLM_MODEL", "TG_ONTOLOGY", "TG_EMBEDDING_MODEL",
"TG_SOURCE_TEXT", "TG_SOURCE_CHAR_OFFSET", "TG_SOURCE_CHAR_LENGTH",
# Query-time provenance predicates
# Query-time provenance predicates (GraphRAG)
"TG_QUERY", "TG_EDGE_COUNT", "TG_SELECTED_EDGE", "TG_REASONING", "TG_CONTENT",
# Query-time provenance predicates (DocumentRAG)
"TG_CHUNK_COUNT", "TG_SELECTED_CHUNK",
# Explainability entity types
"TG_QUESTION", "TG_EXPLORATION", "TG_FOCUS", "TG_SYNTHESIS",
"TG_ANALYSIS", "TG_CONCLUSION",
# Question subtypes
"TG_GRAPH_RAG_QUESTION", "TG_DOC_RAG_QUESTION", "TG_AGENT_QUESTION",
# Agent provenance predicates
"TG_THOUGHT", "TG_ACTION", "TG_ARGUMENTS", "TG_OBSERVATION", "TG_ANSWER",
# Named graphs
"GRAPH_DEFAULT", "GRAPH_SOURCE", "GRAPH_RETRIEVAL",
# Triple builders
"document_triples",
"derived_entity_triples",
"triple_provenance_triples",
# Query-time provenance triple builders
# Query-time provenance triple builders (GraphRAG)
"question_triples",
"exploration_triples",
"focus_triples",
"synthesis_triples",
# Query-time provenance triple builders (DocumentRAG)
"docrag_question_triples",
"docrag_exploration_triples",
"docrag_synthesis_triples",
# Agent provenance triple builders
"agent_session_triples",
"agent_iteration_triples",
"agent_final_triples",
# Utility
"set_graph",
# Vocabulary

View file

@ -0,0 +1,141 @@
"""
Helper functions to build PROV-O triples for agent provenance.
Agent provenance tracks the reasoning trace of ReAct agent sessions:
- Question: The root activity with query and timestamp
- Analysis: Each think/act/observe cycle
- Conclusion: The final answer
"""
import json
from datetime import datetime
from typing import List, Optional, Dict, Any
from .. schema import Triple, Term, IRI, LITERAL
from . namespaces import (
RDF_TYPE, RDFS_LABEL,
PROV_ACTIVITY, PROV_ENTITY, PROV_WAS_DERIVED_FROM, PROV_STARTED_AT_TIME,
TG_QUERY, TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_OBSERVATION, TG_ANSWER,
TG_QUESTION, TG_ANALYSIS, TG_CONCLUSION,
TG_AGENT_QUESTION,
)
def _iri(uri: str) -> Term:
"""Create an IRI term."""
return Term(type=IRI, iri=uri)
def _literal(value) -> Term:
"""Create a literal term."""
return Term(type=LITERAL, value=str(value))
def _triple(s: str, p: str, o_term: Term) -> Triple:
"""Create a triple with IRI subject and predicate."""
return Triple(s=_iri(s), p=_iri(p), o=o_term)
def agent_session_triples(
session_uri: str,
query: str,
timestamp: Optional[str] = None,
) -> List[Triple]:
"""
Build triples for an agent session start (Question).
Creates:
- Activity declaration with tg:Question type
- Query text and timestamp
Args:
session_uri: URI of the session (from agent_session_uri)
query: The user's query text
timestamp: ISO timestamp (defaults to now)
Returns:
List of Triple objects
"""
if timestamp is None:
timestamp = datetime.utcnow().isoformat() + "Z"
return [
_triple(session_uri, RDF_TYPE, _iri(PROV_ACTIVITY)),
_triple(session_uri, RDF_TYPE, _iri(TG_QUESTION)),
_triple(session_uri, RDF_TYPE, _iri(TG_AGENT_QUESTION)),
_triple(session_uri, RDFS_LABEL, _literal("Agent Question")),
_triple(session_uri, PROV_STARTED_AT_TIME, _literal(timestamp)),
_triple(session_uri, TG_QUERY, _literal(query)),
]
def agent_iteration_triples(
iteration_uri: str,
parent_uri: str,
thought: str,
action: str,
arguments: Dict[str, Any],
observation: str,
) -> List[Triple]:
"""
Build triples for one agent iteration (Analysis - think/act/observe cycle).
Creates:
- Entity declaration with tg:Analysis type
- wasDerivedFrom link to parent (previous iteration or session)
- Thought, action, arguments, and observation data
Args:
iteration_uri: URI of this iteration (from agent_iteration_uri)
parent_uri: URI of the parent (previous iteration or session)
thought: The agent's reasoning/thought
action: The tool/action name
arguments: Arguments passed to the tool (will be JSON-encoded)
observation: The result/observation from the tool
Returns:
List of Triple objects
"""
triples = [
_triple(iteration_uri, RDF_TYPE, _iri(PROV_ENTITY)),
_triple(iteration_uri, RDF_TYPE, _iri(TG_ANALYSIS)),
_triple(iteration_uri, RDFS_LABEL, _literal(f"Analysis: {action}")),
_triple(iteration_uri, PROV_WAS_DERIVED_FROM, _iri(parent_uri)),
_triple(iteration_uri, TG_THOUGHT, _literal(thought)),
_triple(iteration_uri, TG_ACTION, _literal(action)),
_triple(iteration_uri, TG_ARGUMENTS, _literal(json.dumps(arguments))),
_triple(iteration_uri, TG_OBSERVATION, _literal(observation)),
]
return triples
def agent_final_triples(
final_uri: str,
parent_uri: str,
answer: str,
) -> List[Triple]:
"""
Build triples for an agent final answer (Conclusion).
Creates:
- Entity declaration with tg:Conclusion type
- wasDerivedFrom link to parent (last iteration or session)
- The answer text
Args:
final_uri: URI of the final answer (from agent_final_uri)
parent_uri: URI of the parent (last iteration or session if no iterations)
answer: The final answer text
Returns:
List of Triple objects
"""
return [
_triple(final_uri, RDF_TYPE, _iri(PROV_ENTITY)),
_triple(final_uri, RDF_TYPE, _iri(TG_CONCLUSION)),
_triple(final_uri, RDFS_LABEL, _literal("Conclusion")),
_triple(final_uri, PROV_WAS_DERIVED_FROM, _iri(parent_uri)),
_triple(final_uri, TG_ANSWER, _literal(answer)),
]

View file

@ -59,7 +59,7 @@ TG_SOURCE_TEXT = TG + "sourceText"
TG_SOURCE_CHAR_OFFSET = TG + "sourceCharOffset"
TG_SOURCE_CHAR_LENGTH = TG + "sourceCharLength"
# Query-time provenance predicates
# Query-time provenance predicates (GraphRAG)
TG_QUERY = TG + "query"
TG_EDGE_COUNT = TG + "edgeCount"
TG_SELECTED_EDGE = TG + "selectedEdge"
@ -68,6 +68,30 @@ TG_REASONING = TG + "reasoning"
TG_CONTENT = TG + "content"
TG_DOCUMENT = TG + "document" # Reference to document in librarian
# Query-time provenance predicates (DocumentRAG)
TG_CHUNK_COUNT = TG + "chunkCount"
TG_SELECTED_CHUNK = TG + "selectedChunk"
# Explainability entity types (shared)
TG_QUESTION = TG + "Question"
TG_EXPLORATION = TG + "Exploration"
TG_FOCUS = TG + "Focus"
TG_SYNTHESIS = TG + "Synthesis"
TG_ANALYSIS = TG + "Analysis"
TG_CONCLUSION = TG + "Conclusion"
# Question subtypes (to distinguish retrieval mechanism)
TG_GRAPH_RAG_QUESTION = TG + "GraphRagQuestion"
TG_DOC_RAG_QUESTION = TG + "DocRagQuestion"
TG_AGENT_QUESTION = TG + "AgentQuestion"
# Agent provenance predicates
TG_THOUGHT = TG + "thought"
TG_ACTION = TG + "action"
TG_ARGUMENTS = TG + "arguments"
TG_OBSERVATION = TG + "observation"
TG_ANSWER = TG + "answer"
# Named graph URIs for RDF datasets
# These separate different types of data while keeping them in the same collection
GRAPH_DEFAULT = "" # Core knowledge facts (triples extracted from documents)

View file

@ -17,9 +17,15 @@ from . namespaces import (
TG_CHUNK_INDEX, TG_CHAR_OFFSET, TG_CHAR_LENGTH,
TG_CHUNK_SIZE, TG_CHUNK_OVERLAP, TG_COMPONENT_VERSION,
TG_LLM_MODEL, TG_ONTOLOGY, TG_REIFIES,
# Query-time provenance predicates
# Query-time provenance predicates (GraphRAG)
TG_QUERY, TG_EDGE_COUNT, TG_SELECTED_EDGE, TG_EDGE, TG_REASONING, TG_CONTENT,
TG_DOCUMENT,
# Query-time provenance predicates (DocumentRAG)
TG_CHUNK_COUNT, TG_SELECTED_CHUNK,
# Explainability entity types
TG_QUESTION, TG_EXPLORATION, TG_FOCUS, TG_SYNTHESIS,
# Question subtypes
TG_GRAPH_RAG_QUESTION, TG_DOC_RAG_QUESTION,
)
from . uris import activity_uri, agent_uri, edge_selection_uri
@ -310,7 +316,9 @@ def question_triples(
return [
_triple(question_uri, RDF_TYPE, _iri(PROV_ACTIVITY)),
_triple(question_uri, RDFS_LABEL, _literal("GraphRAG question")),
_triple(question_uri, RDF_TYPE, _iri(TG_QUESTION)),
_triple(question_uri, RDF_TYPE, _iri(TG_GRAPH_RAG_QUESTION)),
_triple(question_uri, RDFS_LABEL, _literal("GraphRAG Question")),
_triple(question_uri, PROV_STARTED_AT_TIME, _literal(timestamp)),
_triple(question_uri, TG_QUERY, _literal(query)),
]
@ -339,6 +347,7 @@ def exploration_triples(
"""
return [
_triple(exploration_uri, RDF_TYPE, _iri(PROV_ENTITY)),
_triple(exploration_uri, RDF_TYPE, _iri(TG_EXPLORATION)),
_triple(exploration_uri, RDFS_LABEL, _literal("Exploration")),
_triple(exploration_uri, PROV_WAS_GENERATED_BY, _iri(question_uri)),
_triple(exploration_uri, TG_EDGE_COUNT, _literal(edge_count)),
@ -383,6 +392,7 @@ def focus_triples(
"""
triples = [
_triple(focus_uri, RDF_TYPE, _iri(PROV_ENTITY)),
_triple(focus_uri, RDF_TYPE, _iri(TG_FOCUS)),
_triple(focus_uri, RDFS_LABEL, _literal("Focus")),
_triple(focus_uri, PROV_WAS_DERIVED_FROM, _iri(exploration_uri)),
]
@ -443,6 +453,7 @@ def synthesis_triples(
"""
triples = [
_triple(synthesis_uri, RDF_TYPE, _iri(PROV_ENTITY)),
_triple(synthesis_uri, RDF_TYPE, _iri(TG_SYNTHESIS)),
_triple(synthesis_uri, RDFS_LABEL, _literal("Synthesis")),
_triple(synthesis_uri, PROV_WAS_DERIVED_FROM, _iri(focus_uri)),
]
@ -455,3 +466,120 @@ def synthesis_triples(
triples.append(_triple(synthesis_uri, TG_CONTENT, _literal(answer_text)))
return triples
# Document RAG provenance triple builders
#
# Document RAG uses a subset of GraphRAG's model:
# Question - What was asked
# Exploration - Chunks retrieved from document store
# Synthesis - The final answer (no Focus step)
def docrag_question_triples(
question_uri: str,
query: str,
timestamp: Optional[str] = None,
) -> List[Triple]:
"""
Build triples for a document RAG question activity.
Creates:
- Activity declaration with tg:Question type
- Query text and timestamp
Args:
question_uri: URI of the question (from docrag_question_uri)
query: The user's query text
timestamp: ISO timestamp (defaults to now)
Returns:
List of Triple objects
"""
if timestamp is None:
timestamp = datetime.utcnow().isoformat() + "Z"
return [
_triple(question_uri, RDF_TYPE, _iri(PROV_ACTIVITY)),
_triple(question_uri, RDF_TYPE, _iri(TG_QUESTION)),
_triple(question_uri, RDF_TYPE, _iri(TG_DOC_RAG_QUESTION)),
_triple(question_uri, RDFS_LABEL, _literal("DocumentRAG Question")),
_triple(question_uri, PROV_STARTED_AT_TIME, _literal(timestamp)),
_triple(question_uri, TG_QUERY, _literal(query)),
]
def docrag_exploration_triples(
exploration_uri: str,
question_uri: str,
chunk_count: int,
chunk_ids: Optional[List[str]] = None,
) -> List[Triple]:
"""
Build triples for a document RAG exploration entity (chunks retrieved).
Creates:
- Entity declaration with tg:Exploration type
- wasGeneratedBy link to question
- Chunk count and optional chunk references
Args:
exploration_uri: URI of the exploration entity
question_uri: URI of the parent question
chunk_count: Number of chunks retrieved
chunk_ids: Optional list of chunk URIs/IDs
Returns:
List of Triple objects
"""
triples = [
_triple(exploration_uri, RDF_TYPE, _iri(PROV_ENTITY)),
_triple(exploration_uri, RDF_TYPE, _iri(TG_EXPLORATION)),
_triple(exploration_uri, RDFS_LABEL, _literal("Exploration")),
_triple(exploration_uri, PROV_WAS_GENERATED_BY, _iri(question_uri)),
_triple(exploration_uri, TG_CHUNK_COUNT, _literal(chunk_count)),
]
# Add references to selected chunks
if chunk_ids:
for chunk_id in chunk_ids:
triples.append(_triple(exploration_uri, TG_SELECTED_CHUNK, _iri(chunk_id)))
return triples
def docrag_synthesis_triples(
synthesis_uri: str,
exploration_uri: str,
answer_text: str = "",
document_id: Optional[str] = None,
) -> List[Triple]:
"""
Build triples for a document RAG synthesis entity (final answer).
Creates:
- Entity declaration with tg:Synthesis type
- wasDerivedFrom link to exploration (skips focus step)
- Either document reference or inline content
Args:
synthesis_uri: URI of the synthesis entity
exploration_uri: URI of the parent exploration entity
answer_text: The synthesized answer text (used if no document_id)
document_id: Optional librarian document ID (preferred over inline content)
Returns:
List of Triple objects
"""
triples = [
_triple(synthesis_uri, RDF_TYPE, _iri(PROV_ENTITY)),
_triple(synthesis_uri, RDF_TYPE, _iri(TG_SYNTHESIS)),
_triple(synthesis_uri, RDFS_LABEL, _literal("Synthesis")),
_triple(synthesis_uri, PROV_WAS_DERIVED_FROM, _iri(exploration_uri)),
]
if document_id:
triples.append(_triple(synthesis_uri, TG_DOCUMENT, _iri(document_id)))
elif answer_text:
triples.append(_triple(synthesis_uri, TG_CONTENT, _literal(answer_text)))
return triples

View file

@ -138,3 +138,94 @@ def edge_selection_uri(session_id: str, edge_index: int) -> str:
URN in format: urn:trustgraph:prov:edge:{uuid}:{index}
"""
return f"urn:trustgraph:prov:edge:{session_id}:{edge_index}"
# Agent provenance URIs
# These URIs use the urn:trustgraph:agent: namespace to distinguish agent
# provenance from GraphRAG question provenance
def agent_session_uri(session_id: str = None) -> str:
"""
Generate URI for an agent session.
Args:
session_id: Optional UUID string. Auto-generates if not provided.
Returns:
URN in format: urn:trustgraph:agent:{uuid}
"""
if session_id is None:
session_id = str(uuid.uuid4())
return f"urn:trustgraph:agent:{session_id}"
def agent_iteration_uri(session_id: str, iteration_num: int) -> str:
"""
Generate URI for an agent iteration.
Args:
session_id: The session UUID.
iteration_num: 1-based iteration number.
Returns:
URN in format: urn:trustgraph:agent:{uuid}/i{num}
"""
return f"urn:trustgraph:agent:{session_id}/i{iteration_num}"
def agent_final_uri(session_id: str) -> str:
"""
Generate URI for an agent final answer.
Args:
session_id: The session UUID.
Returns:
URN in format: urn:trustgraph:agent:{uuid}/final
"""
return f"urn:trustgraph:agent:{session_id}/final"
# Document RAG provenance URIs
# These URIs use the urn:trustgraph:docrag: namespace to distinguish
# document RAG provenance from graph RAG provenance
def docrag_question_uri(session_id: str = None) -> str:
"""
Generate URI for a document RAG question activity.
Args:
session_id: Optional UUID string. Auto-generates if not provided.
Returns:
URN in format: urn:trustgraph:docrag:{uuid}
"""
if session_id is None:
session_id = str(uuid.uuid4())
return f"urn:trustgraph:docrag:{session_id}"
def docrag_exploration_uri(session_id: str) -> str:
"""
Generate URI for a document RAG exploration entity (chunks retrieved).
Args:
session_id: The session UUID.
Returns:
URN in format: urn:trustgraph:docrag:{uuid}/exploration
"""
return f"urn:trustgraph:docrag:{session_id}/exploration"
def docrag_synthesis_uri(session_id: str) -> str:
"""
Generate URI for a document RAG synthesis entity (final answer).
Args:
session_id: The session UUID.
Returns:
URN in format: urn:trustgraph:docrag:{uuid}/synthesis
"""
return f"urn:trustgraph:docrag:{session_id}/synthesis"

View file

@ -23,7 +23,9 @@ class AgentRequest:
group: list[str] | None = None
history: list[AgentStep] = field(default_factory=list)
user: str = "" # User context for multi-tenancy
streaming: bool = False # NEW: Enable streaming response delivery (default false)
collection: str = "default" # Collection for provenance traces
streaming: bool = False # Enable streaming response delivery (default false)
session_id: str = "" # For provenance tracking across iterations
@dataclass
class AgentResponse:

View file

@ -42,5 +42,7 @@ class DocumentRagQuery:
@dataclass
class DocumentRagResponse:
error: Error | None = None
response: str = ""
response: str | None = ""
end_of_stream: bool = False
explain_id: str | None = None # Single explain URI (announced as created)
explain_graph: str | None = None # Named graph where explain was stored (e.g., urn:graph:retrieval)

View file

@ -1,8 +1,8 @@
"""
List all GraphRAG sessions (questions) in a collection.
List all explainability sessions (GraphRAG and Agent) in a collection.
Queries for all questions stored in the retrieval graph and displays them
with their session IDs and timestamps.
with their session IDs, type (GraphRAG or Agent), and timestamps.
Examples:
tg-list-explain-traces -U trustgraph -C default
@ -24,8 +24,14 @@ default_collection = 'default'
# Predicates
TG = "https://trustgraph.ai/ns/"
TG_QUERY = TG + "query"
TG_QUESTION = TG + "Question"
TG_ANALYSIS = TG + "Analysis"
TG_EXPLORATION = TG + "Exploration"
PROV = "http://www.w3.org/ns/prov#"
PROV_STARTED_AT_TIME = PROV + "startedAtTime"
PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom"
PROV_WAS_GENERATED_BY = PROV + "wasGeneratedBy"
RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
# Retrieval graph
RETRIEVAL_GRAPH = "urn:graph:retrieval"
@ -117,8 +123,45 @@ def get_timestamp(socket, flow_id, user, collection, question_id):
return ""
def get_session_type(socket, flow_id, user, collection, session_id):
"""
Get the type of session (Agent or GraphRAG).
Both have tg:Question type, so we distinguish by URI pattern
or by checking what's derived from it.
"""
# Fast path: check URI pattern
if session_id.startswith("urn:trustgraph:agent:"):
return "Agent"
if session_id.startswith("urn:trustgraph:question:"):
return "GraphRAG"
# Check what's derived from this entity
derived = query_triples(
socket, flow_id, user, collection,
p=PROV_WAS_DERIVED_FROM, o=session_id, g=RETRIEVAL_GRAPH
)
generated = query_triples(
socket, flow_id, user, collection,
p=PROV_WAS_GENERATED_BY, o=session_id, g=RETRIEVAL_GRAPH
)
for s, p, o in derived + generated:
child_types = query_triples(
socket, flow_id, user, collection,
s=s, p=RDF_TYPE, g=RETRIEVAL_GRAPH
)
for _, _, child_type in child_types:
if child_type == TG_ANALYSIS:
return "Agent"
if child_type == TG_EXPLORATION:
return "GraphRAG"
return "GraphRAG"
def list_sessions(socket, flow_id, user, collection, limit):
"""List all GraphRAG sessions by finding questions."""
"""List all explainability sessions (GraphRAG and Agent) by finding questions."""
# Query for all triples with predicate = tg:query
triples = query_triples(
socket, flow_id, user, collection,
@ -129,9 +172,12 @@ def list_sessions(socket, flow_id, user, collection, limit):
for question_id, _, query_text in triples:
# Get timestamp if available
timestamp = get_timestamp(socket, flow_id, user, collection, question_id)
# Get session type (Agent or GraphRAG)
session_type = get_session_type(socket, flow_id, user, collection, question_id)
sessions.append({
"id": question_id,
"type": session_type,
"question": query_text,
"time": timestamp,
})
@ -154,18 +200,19 @@ def truncate_text(text, max_len=60):
def print_table(sessions):
"""Print sessions as a table."""
if not sessions:
print("No GraphRAG sessions found.")
print("No explainability sessions found.")
return
rows = []
for session in sessions:
rows.append([
session["id"],
truncate_text(session["question"], 50),
session.get("type", "Unknown"),
truncate_text(session["question"], 45),
session.get("time", "")
])
headers = ["Session ID", "Question", "Time"]
headers = ["Session ID", "Type", "Question", "Time"]
print(tabulate(rows, headers=headers, tablefmt="simple"))

View file

@ -1,11 +1,15 @@
"""
Show full explainability trace for a GraphRAG session.
Show full explainability trace for a GraphRAG or Agent session.
Given a question/session URI, displays the complete cascade:
Question -> Exploration -> Focus (edge selection) -> Synthesis (answer).
Given a question/session URI, displays the complete trace:
- GraphRAG: Question -> Exploration -> Focus (edge selection) -> Synthesis (answer)
- Agent: Session -> Iteration(s) (thought/action/observation) -> Final Answer
The tool auto-detects the trace type based on rdf:type.
Examples:
tg-show-explain-trace -U trustgraph -C default "urn:trustgraph:question:abc123"
tg-show-explain-trace -U trustgraph -C default "urn:trustgraph:agent:abc123"
tg-show-explain-trace --max-answer 1000 "urn:trustgraph:question:abc123"
tg-show-explain-trace --show-provenance "urn:trustgraph:question:abc123"
"""
@ -31,10 +35,25 @@ TG_REASONING = TG + "reasoning"
TG_CONTENT = TG + "content"
TG_DOCUMENT = TG + "document"
TG_REIFIES = TG + "reifies"
# Explainability entity types
TG_QUESTION = TG + "Question"
TG_EXPLORATION = TG + "Exploration"
TG_FOCUS = TG + "Focus"
TG_SYNTHESIS = TG + "Synthesis"
TG_ANALYSIS = TG + "Analysis"
TG_CONCLUSION = TG + "Conclusion"
# Agent predicates
TG_THOUGHT = TG + "thought"
TG_ACTION = TG + "action"
TG_ARGUMENTS = TG + "arguments"
TG_OBSERVATION = TG + "observation"
TG_ANSWER = TG + "answer"
PROV = "http://www.w3.org/ns/prov#"
PROV_STARTED_AT_TIME = PROV + "startedAtTime"
PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom"
PROV_WAS_GENERATED_BY = PROV + "wasGeneratedBy"
RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
RDFS_LABEL = "http://www.w3.org/2000/01/rdf-schema#label"
# Graphs
@ -280,6 +299,186 @@ def format_edge(edge, label_cache=None, socket=None, flow_id=None, user=None, co
return f"({s_label}, {p_label}, {o_label})"
def detect_trace_type(socket, flow_id, user, collection, entity_id):
"""
Detect whether an entity is an agent Question or GraphRAG Question.
Both have rdf:type = tg:Question, so we distinguish by checking
what's derived from it:
- Agent: has tg:Analysis or tg:Conclusion derived
- GraphRAG: has tg:Exploration derived
Also checks URI pattern as fallback:
- urn:trustgraph:agent: -> agent
- urn:trustgraph:question: -> graphrag
Returns:
"agent" or "graphrag"
"""
# Check URI pattern first (fast path)
if entity_id.startswith("urn:trustgraph:agent:"):
return "agent"
if entity_id.startswith("urn:trustgraph:question:"):
return "graphrag"
# Check what's derived from this entity
derived = find_by_predicate_object(
socket, flow_id, user, collection,
PROV_WAS_DERIVED_FROM, entity_id
)
# Also check wasGeneratedBy (GraphRAG exploration uses this)
generated = find_by_predicate_object(
socket, flow_id, user, collection,
PROV_WAS_GENERATED_BY, entity_id
)
all_children = derived + generated
for child_id in all_children:
child_types = query_triples(
socket, flow_id, user, collection,
s=child_id, p=RDF_TYPE, g=RETRIEVAL_GRAPH
)
for s, p, o in child_types:
if o == TG_ANALYSIS or o == TG_CONCLUSION:
return "agent"
if o == TG_EXPLORATION:
return "graphrag"
# Default to graphrag
return "graphrag"
def build_agent_trace(socket, flow_id, user, collection, session_id, api=None, max_answer=500):
"""Build the full explainability trace for an agent session."""
trace = {
"session_id": session_id,
"type": "agent",
"question": None,
"time": None,
"iterations": [],
"final_answer": None,
}
# Get session metadata
props = get_node_properties(socket, flow_id, user, collection, session_id)
trace["question"] = props.get(TG_QUERY, [None])[0]
trace["time"] = props.get(PROV_STARTED_AT_TIME, [None])[0]
# Find all entities derived from this session (iterations and final)
# Start by looking for entities where prov:wasDerivedFrom = session_id
current_uri = session_id
iteration_num = 1
while True:
# Find entities derived from current
derived_ids = find_by_predicate_object(
socket, flow_id, user, collection,
PROV_WAS_DERIVED_FROM, current_uri
)
if not derived_ids:
break
derived_id = derived_ids[0]
derived_props = get_node_properties(socket, flow_id, user, collection, derived_id)
# Check type
types = derived_props.get(RDF_TYPE, [])
if TG_ANALYSIS in types:
iteration = {
"id": derived_id,
"iteration_num": iteration_num,
"thought": derived_props.get(TG_THOUGHT, [None])[0],
"action": derived_props.get(TG_ACTION, [None])[0],
"arguments": derived_props.get(TG_ARGUMENTS, [None])[0],
"observation": derived_props.get(TG_OBSERVATION, [None])[0],
}
trace["iterations"].append(iteration)
current_uri = derived_id
iteration_num += 1
elif TG_CONCLUSION in types:
answer = derived_props.get(TG_ANSWER, [None])[0]
if answer and len(answer) > max_answer:
answer = answer[:max_answer] + "... [truncated]"
trace["final_answer"] = {
"id": derived_id,
"answer": answer,
}
break
else:
# Unknown type, stop traversal
break
return trace
def print_agent_text(trace):
"""Print agent trace in text format."""
print(f"=== Agent Session: {trace['session_id']} ===")
print()
if trace["question"]:
print(f"Question: {trace['question']}")
if trace["time"]:
print(f"Time: {trace['time']}")
print()
# Analysis steps
print("--- Analysis ---")
iterations = trace.get("iterations", [])
if iterations:
for iteration in iterations:
print(f"Analysis {iteration['iteration_num']}:")
print(f" Thought: {iteration.get('thought', 'N/A')}")
print(f" Action: {iteration.get('action', 'N/A')}")
args = iteration.get('arguments')
if args:
# Try to pretty-print JSON arguments
try:
import json
args_obj = json.loads(args)
args_str = json.dumps(args_obj, indent=4)
# Indent each line
args_lines = args_str.split('\n')
print(f" Arguments:")
for line in args_lines:
print(f" {line}")
except:
print(f" Arguments: {args}")
else:
print(f" Arguments: N/A")
obs = iteration.get('observation', 'N/A')
if obs and len(obs) > 200:
obs = obs[:200] + "... [truncated]"
print(f" Observation: {obs}")
print()
else:
print("No analysis steps recorded")
print()
# Conclusion
print("--- Conclusion ---")
final = trace.get("final_answer")
if final and final.get("answer"):
print("Answer:")
for line in final["answer"].split("\n"):
print(f" {line}")
else:
print("No conclusion recorded")
def print_agent_json(trace):
"""Print agent trace as JSON."""
print(json.dumps(trace, indent=2))
def build_trace(socket, flow_id, user, collection, question_id, api=None, show_provenance=False, max_answer=500):
"""Build the full explainability trace for a question."""
label_cache = {}
@ -530,21 +729,48 @@ def main():
socket = api.socket()
try:
trace = build_trace(
# Detect trace type (agent vs graphrag)
trace_type = detect_trace_type(
socket=socket,
flow_id=args.flow_id,
user=args.user,
collection=args.collection,
question_id=args.question_id,
api=api,
show_provenance=args.show_provenance,
max_answer=args.max_answer,
entity_id=args.question_id,
)
if args.format == 'json':
print_json(trace)
if trace_type == "agent":
# Build and print agent trace
trace = build_agent_trace(
socket=socket,
flow_id=args.flow_id,
user=args.user,
collection=args.collection,
session_id=args.question_id,
api=api,
max_answer=args.max_answer,
)
if args.format == 'json':
print_agent_json(trace)
else:
print_agent_text(trace)
else:
print_text(trace, show_provenance=args.show_provenance)
# Build and print GraphRAG trace (existing behavior)
trace = build_trace(
socket=socket,
flow_id=args.flow_id,
user=args.user,
collection=args.collection,
question_id=args.question_id,
api=api,
show_provenance=args.show_provenance,
max_answer=args.max_answer,
)
if args.format == 'json':
print_json(trace)
else:
print_text(trace, show_provenance=args.show_provenance)
finally:
socket.close()

View file

@ -7,6 +7,8 @@ import re
import sys
import functools
import logging
import uuid
from datetime import datetime
# Module logger
logger = logging.getLogger(__name__)
@ -14,8 +16,22 @@ logger = logging.getLogger(__name__)
from ... base import AgentService, TextCompletionClientSpec, PromptClientSpec
from ... base import GraphRagClientSpec, ToolClientSpec, StructuredQueryClientSpec
from ... base import RowEmbeddingsQueryClientSpec, EmbeddingsClientSpec
from ... base import ProducerSpec
from ... schema import AgentRequest, AgentResponse, AgentStep, Error
from ... schema import Triples, Metadata
# Provenance imports for agent explainability
from trustgraph.provenance import (
agent_session_uri,
agent_iteration_uri,
agent_final_uri,
agent_session_triples,
agent_iteration_triples,
agent_final_triples,
set_graph,
GRAPH_RETRIEVAL,
)
from . tools import KnowledgeQueryImpl, TextCompletionImpl, McpToolImpl, PromptImpl, StructuredQueryImpl, RowEmbeddingsQueryImpl, ToolServiceImpl
from . agent_manager import AgentManager
@ -105,6 +121,14 @@ class Processor(AgentService):
)
)
# Explainability producer for agent provenance triples
self.register_specification(
ProducerSpec(
name = "explainability",
schema = Triples,
)
)
async def on_tools_config(self, config, version):
logger.info(f"Loading configuration version {version}")
@ -285,6 +309,10 @@ class Processor(AgentService):
# Check if streaming is enabled
streaming = getattr(request, 'streaming', False)
# Generate or retrieve session ID for provenance tracking
session_id = getattr(request, 'session_id', '') or str(uuid.uuid4())
collection = getattr(request, 'collection', 'default')
if request.history:
history = [
Action(
@ -298,6 +326,27 @@ class Processor(AgentService):
else:
history = []
# Calculate iteration number (1-based)
iteration_num = len(history) + 1
session_uri = agent_session_uri(session_id)
# On first iteration, emit session triples
if iteration_num == 1:
timestamp = datetime.utcnow().isoformat() + "Z"
triples = set_graph(
agent_session_triples(session_uri, request.question, timestamp),
GRAPH_RETRIEVAL
)
await flow("explainability").send(Triples(
metadata=Metadata(
id=session_uri,
user=request.user,
collection=collection,
),
triples=triples,
))
logger.debug(f"Emitted session triples for {session_uri}")
logger.info(f"Question: {request.question}")
if len(history) >= self.max_iterations:
@ -447,6 +496,28 @@ class Processor(AgentService):
else:
f = json.dumps(act.final)
# Emit final answer provenance triples
final_uri = agent_final_uri(session_id)
# Parent is last iteration, or session if no iterations
if iteration_num > 1:
parent_uri = agent_iteration_uri(session_id, iteration_num - 1)
else:
parent_uri = session_uri
final_triples = set_graph(
agent_final_triples(final_uri, parent_uri, f),
GRAPH_RETRIEVAL
)
await flow("explainability").send(Triples(
metadata=Metadata(
id=final_uri,
user=request.user,
collection=collection,
),
triples=final_triples,
))
logger.debug(f"Emitted final triples for {final_uri}")
if streaming:
# Streaming format - send end-of-dialog marker
# Answer chunks were already sent via answer() callback during parsing
@ -479,8 +550,37 @@ class Processor(AgentService):
logger.debug("Send next...")
# Emit iteration provenance triples
iteration_uri = agent_iteration_uri(session_id, iteration_num)
# Parent is previous iteration, or session if this is first iteration
if iteration_num > 1:
parent_uri = agent_iteration_uri(session_id, iteration_num - 1)
else:
parent_uri = session_uri
iter_triples = set_graph(
agent_iteration_triples(
iteration_uri,
parent_uri,
act.thought,
act.name,
act.arguments,
act.observation,
),
GRAPH_RETRIEVAL
)
await flow("explainability").send(Triples(
metadata=Metadata(
id=iteration_uri,
user=request.user,
collection=collection,
),
triples=iter_triples,
))
logger.debug(f"Emitted iteration triples for {iteration_uri}")
history.append(act)
# Handle state transitions if tool execution was successful
next_state = request.state
if act.name in filtered_tools:
@ -501,7 +601,9 @@ class Processor(AgentService):
for h in history
],
user=request.user,
collection=collection,
streaming=streaming,
session_id=session_id, # Pass session_id for provenance continuity
)
await next(r)

View file

@ -1,6 +1,20 @@
import asyncio
import logging
import uuid
from datetime import datetime
# Provenance imports
from trustgraph.provenance import (
docrag_question_uri,
docrag_exploration_uri,
docrag_synthesis_uri,
docrag_question_triples,
docrag_exploration_triples,
docrag_synthesis_triples,
set_graph,
GRAPH_RETRIEVAL,
)
# Module logger
logger = logging.getLogger(__name__)
@ -33,7 +47,14 @@ class Query:
return qembeds[0] if qembeds else []
async def get_docs(self, query):
"""
Get documents (chunks) matching the query.
Returns:
tuple: (docs, chunk_ids) where:
- docs: list of document content strings
- chunk_ids: list of chunk IDs that were successfully fetched
"""
vectors = await self.get_vector(query)
if self.verbose:
@ -50,11 +71,13 @@ class Query:
# Fetch chunk content from Garage
docs = []
chunk_ids = []
for match in chunk_matches:
if match.chunk_id:
try:
content = await self.rag.fetch_chunk(match.chunk_id, self.user)
docs.append(content)
chunk_ids.append(match.chunk_id)
except Exception as e:
logger.warning(f"Failed to fetch chunk {match.chunk_id}: {e}")
@ -63,7 +86,7 @@ class Query:
for doc in docs:
logger.debug(f" {doc[:100]}...")
return docs
return docs, chunk_ids
class DocumentRag:
@ -86,17 +109,56 @@ class DocumentRag:
async def query(
self, query, user="trustgraph", collection="default",
doc_limit=20, streaming=False, chunk_callback=None,
explain_callback=None,
):
"""
Execute a Document RAG query with optional explainability tracking.
Args:
query: The query string
user: User identifier
collection: Collection identifier
doc_limit: Max chunks to retrieve
streaming: Enable streaming LLM response
chunk_callback: async def callback(chunk, end_of_stream) for streaming
explain_callback: async def callback(triples, explain_id) for explainability
Returns:
str: The synthesized answer text
"""
if self.verbose:
logger.debug("Constructing prompt...")
# Generate explainability URIs upfront
session_id = str(uuid.uuid4())
q_uri = docrag_question_uri(session_id)
exp_uri = docrag_exploration_uri(session_id)
syn_uri = docrag_synthesis_uri(session_id)
timestamp = datetime.utcnow().isoformat() + "Z"
# Emit question explainability immediately
if explain_callback:
q_triples = set_graph(
docrag_question_triples(q_uri, query, timestamp),
GRAPH_RETRIEVAL
)
await explain_callback(q_triples, q_uri)
q = Query(
rag=self, user=user, collection=collection, verbose=self.verbose,
doc_limit=doc_limit
)
docs = await q.get_docs(query)
docs, chunk_ids = await q.get_docs(query)
# Emit exploration explainability after chunks retrieved
if explain_callback:
exp_triples = set_graph(
docrag_exploration_triples(exp_uri, q_uri, len(chunk_ids), chunk_ids),
GRAPH_RETRIEVAL
)
await explain_callback(exp_triples, exp_uri)
if self.verbose:
logger.debug("Invoking LLM...")
@ -104,12 +166,21 @@ class DocumentRag:
logger.debug(f"Query: {query}")
if streaming and chunk_callback:
# Accumulate chunks for answer storage while forwarding to callback
accumulated_chunks = []
async def accumulating_callback(chunk, end_of_stream):
accumulated_chunks.append(chunk)
await chunk_callback(chunk, end_of_stream)
resp = await self.prompt_client.document_prompt(
query=query,
documents=docs,
streaming=True,
chunk_callback=chunk_callback
chunk_callback=accumulating_callback
)
# Combine all chunks into full response
resp = "".join(accumulated_chunks)
else:
resp = await self.prompt_client.document_prompt(
query=query,
@ -119,5 +190,17 @@ class DocumentRag:
if self.verbose:
logger.debug("Query processing complete")
# Emit synthesis explainability after answer generated
if explain_callback:
answer_text = resp if resp else ""
syn_triples = set_graph(
docrag_synthesis_triples(syn_uri, exp_uri, answer_text),
GRAPH_RETRIEVAL
)
await explain_callback(syn_triples, syn_uri)
if self.verbose:
logger.debug(f"Emitted explain for session {session_id}")
return resp

View file

@ -11,6 +11,8 @@ import logging
from ... schema import DocumentRagQuery, DocumentRagResponse, Error
from ... schema import LibrarianRequest, LibrarianResponse
from ... schema import librarian_request_queue, librarian_response_queue
from ... schema import Triples, Metadata
from ... provenance import GRAPH_RETRIEVAL
from . document_rag import DocumentRag
from ... base import FlowProcessor, ConsumerSpec, ProducerSpec
from ... base import PromptClientSpec, EmbeddingsClientSpec
@ -78,6 +80,13 @@ class Processor(FlowProcessor):
)
)
self.register_specification(
ProducerSpec(
name = "explainability",
schema = Triples,
)
)
# Librarian client for fetching chunk content from Garage
librarian_request_q = params.get(
"librarian_request_queue", default_librarian_request_queue
@ -194,6 +203,29 @@ class Processor(FlowProcessor):
else:
doc_limit = self.doc_limit
# Real-time explainability callback - emits triples and IDs as they're generated
# Triples are stored in the user's collection with a named graph (urn:graph:retrieval)
async def send_explainability(triples, explain_id):
# Send triples to explainability queue - stores in same collection with named graph
await flow("explainability").send(Triples(
metadata=Metadata(
id=explain_id,
user=v.user,
collection=v.collection, # Store in user's collection
),
triples=triples,
))
# Send explain ID and graph to response queue
await flow("response").send(
DocumentRagResponse(
response=None,
explain_id=explain_id,
explain_graph=GRAPH_RETRIEVAL,
),
properties={"id": id}
)
# Check if streaming is requested
if v.streaming:
# Define async callback for streaming chunks
@ -217,6 +249,7 @@ class Processor(FlowProcessor):
doc_limit=doc_limit,
streaming=True,
chunk_callback=send_chunk,
explain_callback=send_explainability,
)
else:
# Non-streaming path (existing behavior)
@ -224,7 +257,8 @@ class Processor(FlowProcessor):
v.query,
user=v.user,
collection=v.collection,
doc_limit=doc_limit
doc_limit=doc_limit,
explain_callback=send_explainability,
)
await flow("response").send(