diff --git a/docs/tech-specs/agent-explainability.md b/docs/tech-specs/agent-explainability.md new file mode 100644 index 00000000..f02a95b1 --- /dev/null +++ b/docs/tech-specs/agent-explainability.md @@ -0,0 +1,272 @@ +# Agent Explainability: Provenance Recording + +## Overview + +Add provenance recording to the React agent loop so agent sessions can be traced and debugged using the same explainability infrastructure as GraphRAG. + +**Design Decisions:** +- Write to `urn:graph:retrieval` (generic explainability graph) +- Linear dependency chain for now (analysis N → wasDerivedFrom → analysis N-1) +- Tools are opaque black boxes (record input/output only) +- DAG support deferred to future iteration + +## Entity Types + +Both GraphRAG and Agent use PROV-O as the base ontology with TrustGraph-specific subtypes: + +### GraphRAG Types +| Entity | PROV-O Type | TG Types | Description | +|--------|-------------|----------|-------------| +| Question | `prov:Activity` | `tg:Question`, `tg:GraphRagQuestion` | The user's query | +| Exploration | `prov:Entity` | `tg:Exploration` | Edges retrieved from knowledge graph | +| Focus | `prov:Entity` | `tg:Focus` | Selected edges with reasoning | +| Synthesis | `prov:Entity` | `tg:Synthesis` | Final answer | + +### Agent Types +| Entity | PROV-O Type | TG Types | Description | +|--------|-------------|----------|-------------| +| Question | `prov:Activity` | `tg:Question`, `tg:AgentQuestion` | The user's query | +| Analysis | `prov:Entity` | `tg:Analysis` | Each think/act/observe cycle | +| Conclusion | `prov:Entity` | `tg:Conclusion` | Final answer | + +### Document RAG Types +| Entity | PROV-O Type | TG Types | Description | +|--------|-------------|----------|-------------| +| Question | `prov:Activity` | `tg:Question`, `tg:DocRagQuestion` | The user's query | +| Exploration | `prov:Entity` | `tg:Exploration` | Chunks retrieved from document store | +| Synthesis | `prov:Entity` | `tg:Synthesis` | Final answer | + +**Note:** Document RAG uses a subset of GraphRAG's types (no Focus step since there's no edge selection/reasoning phase). + +### Question Subtypes + +All Question entities share `tg:Question` as a base type but have a specific subtype to identify the retrieval mechanism: + +| Subtype | URI Pattern | Mechanism | +|---------|-------------|-----------| +| `tg:GraphRagQuestion` | `urn:trustgraph:question:{uuid}` | Knowledge graph RAG | +| `tg:DocRagQuestion` | `urn:trustgraph:docrag:{uuid}` | Document/chunk RAG | +| `tg:AgentQuestion` | `urn:trustgraph:agent:{uuid}` | ReAct agent | + +This allows querying all questions via `tg:Question` while filtering by specific mechanism via the subtype. + +## Provenance Model + +``` +Question (urn:trustgraph:agent:{uuid}) + │ + │ tg:query = "User's question" + │ prov:startedAtTime = timestamp + │ rdf:type = prov:Activity, tg:Question + │ + ↓ prov:wasDerivedFrom + │ +Analysis1 (urn:trustgraph:agent:{uuid}/i1) + │ + │ tg:thought = "I need to query the knowledge base..." + │ tg:action = "knowledge-query" + │ tg:arguments = {"question": "..."} + │ tg:observation = "Result from tool..." + │ rdf:type = prov:Entity, tg:Analysis + │ + ↓ prov:wasDerivedFrom + │ +Analysis2 (urn:trustgraph:agent:{uuid}/i2) + │ ... + ↓ prov:wasDerivedFrom + │ +Conclusion (urn:trustgraph:agent:{uuid}/final) + │ + │ tg:answer = "The final response..." + │ rdf:type = prov:Entity, tg:Conclusion +``` + +### Document RAG Provenance Model + +``` +Question (urn:trustgraph:docrag:{uuid}) + │ + │ tg:query = "User's question" + │ prov:startedAtTime = timestamp + │ rdf:type = prov:Activity, tg:Question + │ + ↓ prov:wasGeneratedBy + │ +Exploration (urn:trustgraph:docrag:{uuid}/exploration) + │ + │ tg:chunkCount = 5 + │ tg:selectedChunk = "chunk-id-1" + │ tg:selectedChunk = "chunk-id-2" + │ ... + │ rdf:type = prov:Entity, tg:Exploration + │ + ↓ prov:wasDerivedFrom + │ +Synthesis (urn:trustgraph:docrag:{uuid}/synthesis) + │ + │ tg:content = "The synthesized answer..." + │ rdf:type = prov:Entity, tg:Synthesis +``` + +## Changes Required + +### 1. Schema Changes + +**File:** `trustgraph-base/trustgraph/schema/services/agent.py` + +Add `session_id` and `collection` fields to `AgentRequest`: +```python +@dataclass +class AgentRequest: + question: str = "" + state: str = "" + group: list[str] | None = None + history: list[AgentStep] = field(default_factory=list) + user: str = "" + collection: str = "default" # NEW: Collection for provenance traces + streaming: bool = False + session_id: str = "" # NEW: For provenance tracking across iterations +``` + +**File:** `trustgraph-base/trustgraph/messaging/translators/agent.py` + +Update translator to handle `session_id` and `collection` in both `to_pulsar()` and `from_pulsar()`. + +### 2. Add Explainability Producer to Agent Service + +**File:** `trustgraph-flow/trustgraph/agent/react/service.py` + +Register an "explainability" producer (same pattern as GraphRAG): +```python +from ... base import ProducerSpec +from ... schema import Triples + +# In __init__: +self.register_specification( + ProducerSpec( + name = "explainability", + schema = Triples, + ) +) +``` + +### 3. Provenance Triple Generation + +**File:** `trustgraph-base/trustgraph/provenance/agent.py` + +Create helper functions (similar to GraphRAG's `question_triples`, `exploration_triples`, etc.): +```python +def agent_session_triples(session_uri, query, timestamp): + """Generate triples for agent Question.""" + return [ + Triple(s=session_uri, p=RDF_TYPE, o=PROV_ACTIVITY), + Triple(s=session_uri, p=RDF_TYPE, o=TG_QUESTION), + Triple(s=session_uri, p=TG_QUERY, o=query), + Triple(s=session_uri, p=PROV_STARTED_AT_TIME, o=timestamp), + ] + +def agent_iteration_triples(iteration_uri, parent_uri, thought, action, arguments, observation): + """Generate triples for one Analysis step.""" + return [ + Triple(s=iteration_uri, p=RDF_TYPE, o=PROV_ENTITY), + Triple(s=iteration_uri, p=RDF_TYPE, o=TG_ANALYSIS), + Triple(s=iteration_uri, p=TG_THOUGHT, o=thought), + Triple(s=iteration_uri, p=TG_ACTION, o=action), + Triple(s=iteration_uri, p=TG_ARGUMENTS, o=json.dumps(arguments)), + Triple(s=iteration_uri, p=TG_OBSERVATION, o=observation), + Triple(s=iteration_uri, p=PROV_WAS_DERIVED_FROM, o=parent_uri), + ] + +def agent_final_triples(final_uri, parent_uri, answer): + """Generate triples for Conclusion.""" + return [ + Triple(s=final_uri, p=RDF_TYPE, o=PROV_ENTITY), + Triple(s=final_uri, p=RDF_TYPE, o=TG_CONCLUSION), + Triple(s=final_uri, p=TG_ANSWER, o=answer), + Triple(s=final_uri, p=PROV_WAS_DERIVED_FROM, o=parent_uri), + ] +``` + +### 4. Type Definitions + +**File:** `trustgraph-base/trustgraph/provenance/namespaces.py` + +Add explainability entity types and agent predicates: +```python +# Explainability entity types (used by both GraphRAG and Agent) +TG_QUESTION = TG + "Question" +TG_EXPLORATION = TG + "Exploration" +TG_FOCUS = TG + "Focus" +TG_SYNTHESIS = TG + "Synthesis" +TG_ANALYSIS = TG + "Analysis" +TG_CONCLUSION = TG + "Conclusion" + +# Agent predicates +TG_THOUGHT = TG + "thought" +TG_ACTION = TG + "action" +TG_ARGUMENTS = TG + "arguments" +TG_OBSERVATION = TG + "observation" +TG_ANSWER = TG + "answer" +``` + +## Files Modified + +| File | Change | +|------|--------| +| `trustgraph-base/trustgraph/schema/services/agent.py` | Add session_id and collection to AgentRequest | +| `trustgraph-base/trustgraph/messaging/translators/agent.py` | Update translator for new fields | +| `trustgraph-base/trustgraph/provenance/namespaces.py` | Add entity types, agent predicates, and Document RAG predicates | +| `trustgraph-base/trustgraph/provenance/triples.py` | Add TG types to GraphRAG triple builders, add Document RAG triple builders | +| `trustgraph-base/trustgraph/provenance/uris.py` | Add Document RAG URI generators | +| `trustgraph-base/trustgraph/provenance/__init__.py` | Export new types, predicates, and Document RAG functions | +| `trustgraph-base/trustgraph/schema/services/retrieval.py` | Add explain_id and explain_graph to DocumentRagResponse | +| `trustgraph-base/trustgraph/messaging/translators/retrieval.py` | Update DocumentRagResponseTranslator for explainability fields | +| `trustgraph-flow/trustgraph/agent/react/service.py` | Add explainability producer + recording logic | +| `trustgraph-flow/trustgraph/retrieval/document_rag/document_rag.py` | Add explainability callback and emit provenance triples | +| `trustgraph-flow/trustgraph/retrieval/document_rag/rag.py` | Add explainability producer and wire up callback | +| `trustgraph-cli/trustgraph/cli/show_explain_trace.py` | Handle agent trace types | +| `trustgraph-cli/trustgraph/cli/list_explain_traces.py` | List agent sessions alongside GraphRAG | + +## Files Created + +| File | Purpose | +|------|---------| +| `trustgraph-base/trustgraph/provenance/agent.py` | Agent-specific triple generators | + +## CLI Updates + +**Detection:** Both GraphRAG and Agent Questions have `tg:Question` type. Distinguished by: +1. URI pattern: `urn:trustgraph:agent:` vs `urn:trustgraph:question:` +2. Derived entities: `tg:Analysis` (agent) vs `tg:Exploration` (GraphRAG) + +**`list_explain_traces.py`:** +- Shows Type column (Agent vs GraphRAG) + +**`show_explain_trace.py`:** +- Auto-detects trace type +- Agent rendering shows: Question → Analysis step(s) → Conclusion + +## Backwards Compatibility + +- `session_id` defaults to `""` - old requests work, just won't have provenance +- `collection` defaults to `"default"` - reasonable fallback +- CLI gracefully handles both trace types + +## Verification + +```bash +# Run an agent query +tg-invoke-agent -q "What is the capital of France?" + +# List traces (should show agent sessions with Type column) +tg-list-explain-traces -U trustgraph -C default + +# Show agent trace +tg-show-explain-trace "urn:trustgraph:agent:xxx" +``` + +## Future Work (Not This PR) + +- DAG dependencies (when analysis N uses results from multiple prior analyses) +- Tool-specific provenance linking (KnowledgeQuery → its GraphRAG trace) +- Streaming provenance emission (emit as we go, not batch at end) diff --git a/tests/unit/test_retrieval/test_document_rag.py b/tests/unit/test_retrieval/test_document_rag.py index 92a62222..1f4c5f12 100644 --- a/tests/unit/test_retrieval/test_document_rag.py +++ b/tests/unit/test_retrieval/test_document_rag.py @@ -208,9 +208,12 @@ class TestQuery: collection="test_collection" ) - # Verify result is list of fetched document content - assert "Document 1 content" in result - assert "Document 2 content" in result + # Verify result is tuple of (docs, chunk_ids) + docs, chunk_ids = result + assert "Document 1 content" in docs + assert "Document 2 content" in docs + assert "doc/c1" in chunk_ids + assert "doc/c2" in chunk_ids @pytest.mark.asyncio async def test_document_rag_query_method(self, mock_fetch_chunk): @@ -350,8 +353,10 @@ class TestQuery: mock_embeddings_client.embed.assert_called_once_with(["verbose test"]) mock_doc_embeddings_client.query.assert_called_once() - # Verify result contains fetched content - assert "Verbose test doc" in result + # Verify result is tuple of (docs, chunk_ids) with fetched content + docs, chunk_ids = result + assert "Verbose test doc" in docs + assert "doc/c6" in chunk_ids @pytest.mark.asyncio async def test_document_rag_query_with_verbose(self, mock_fetch_chunk): @@ -426,8 +431,8 @@ class TestQuery: mock_embeddings_client.embed.assert_called_once_with(["query with no results"]) mock_doc_embeddings_client.query.assert_called_once() - # Verify empty result is returned - assert result == [] + # Verify empty result is returned (tuple of empty lists) + assert result == ([], []) @pytest.mark.asyncio async def test_document_rag_query_with_empty_documents(self, mock_fetch_chunk): diff --git a/tests/unit/test_retrieval/test_document_rag_service.py b/tests/unit/test_retrieval/test_document_rag_service.py index 041d29df..d6b5031a 100644 --- a/tests/unit/test_retrieval/test_document_rag_service.py +++ b/tests/unit/test_retrieval/test_document_rag_service.py @@ -5,7 +5,7 @@ passed to the DocumentRag.query() method. """ import pytest -from unittest.mock import MagicMock, AsyncMock, patch +from unittest.mock import MagicMock, AsyncMock, patch, ANY from trustgraph.retrieval.document_rag.rag import Processor from trustgraph.schema import DocumentRagQuery, DocumentRagResponse @@ -65,8 +65,9 @@ class TestDocumentRagService: mock_rag_instance.query.assert_called_once_with( "test query", user="my_user", # Must be from message, not hardcoded default - collection="test_coll_1", # Must be from message, not hardcoded default - doc_limit=5 + collection="test_coll_1", # Must be from message, not hardcoded default + doc_limit=5, + explain_callback=ANY, # Explainability callback is always passed ) # Verify response was sent diff --git a/trustgraph-base/trustgraph/messaging/translators/agent.py b/trustgraph-base/trustgraph/messaging/translators/agent.py index 4289df0a..4da0aec6 100644 --- a/trustgraph-base/trustgraph/messaging/translators/agent.py +++ b/trustgraph-base/trustgraph/messaging/translators/agent.py @@ -13,7 +13,9 @@ class AgentRequestTranslator(MessageTranslator): group=data.get("group", None), history=data.get("history", []), user=data.get("user", "trustgraph"), - streaming=data.get("streaming", False) + collection=data.get("collection", "default"), + streaming=data.get("streaming", False), + session_id=data.get("session_id", ""), ) def from_pulsar(self, obj: AgentRequest) -> Dict[str, Any]: @@ -23,7 +25,9 @@ class AgentRequestTranslator(MessageTranslator): "group": obj.group, "history": obj.history, "user": obj.user, - "streaming": getattr(obj, "streaming", False) + "collection": getattr(obj, "collection", "default"), + "streaming": getattr(obj, "streaming", False), + "session_id": getattr(obj, "session_id", ""), } diff --git a/trustgraph-base/trustgraph/messaging/translators/retrieval.py b/trustgraph-base/trustgraph/messaging/translators/retrieval.py index f84ce103..9f102f9a 100644 --- a/trustgraph-base/trustgraph/messaging/translators/retrieval.py +++ b/trustgraph-base/trustgraph/messaging/translators/retrieval.py @@ -38,6 +38,16 @@ class DocumentRagResponseTranslator(MessageTranslator): if obj.response is not None: result["response"] = obj.response + # Include explain_id for explain messages + explain_id = getattr(obj, "explain_id", None) + if explain_id: + result["explain_id"] = explain_id + + # Include explain_graph for explain messages (named graph filter) + explain_graph = getattr(obj, "explain_graph", None) + if explain_graph is not None: + result["explain_graph"] = explain_graph + # Include end_of_stream flag result["end_of_stream"] = getattr(obj, "end_of_stream", False) diff --git a/trustgraph-base/trustgraph/provenance/__init__.py b/trustgraph-base/trustgraph/provenance/__init__.py index 5aa6d447..c1cb522a 100644 --- a/trustgraph-base/trustgraph/provenance/__init__.py +++ b/trustgraph-base/trustgraph/provenance/__init__.py @@ -40,11 +40,19 @@ from . uris import ( activity_uri, statement_uri, agent_uri, - # Query-time provenance URIs + # Query-time provenance URIs (GraphRAG) question_uri, exploration_uri, focus_uri, synthesis_uri, + # Agent provenance URIs + agent_session_uri, + agent_iteration_uri, + agent_final_uri, + # Document RAG provenance URIs + docrag_question_uri, + docrag_exploration_uri, + docrag_synthesis_uri, ) # Namespace constants @@ -63,8 +71,17 @@ from . namespaces import ( TG_CHUNK_SIZE, TG_CHUNK_OVERLAP, TG_COMPONENT_VERSION, TG_LLM_MODEL, TG_ONTOLOGY, TG_EMBEDDING_MODEL, TG_SOURCE_TEXT, TG_SOURCE_CHAR_OFFSET, TG_SOURCE_CHAR_LENGTH, - # Query-time provenance predicates + # Query-time provenance predicates (GraphRAG) TG_QUERY, TG_EDGE_COUNT, TG_SELECTED_EDGE, TG_REASONING, TG_CONTENT, + # Query-time provenance predicates (DocumentRAG) + TG_CHUNK_COUNT, TG_SELECTED_CHUNK, + # Explainability entity types + TG_QUESTION, TG_EXPLORATION, TG_FOCUS, TG_SYNTHESIS, + TG_ANALYSIS, TG_CONCLUSION, + # Question subtypes (to distinguish retrieval mechanism) + TG_GRAPH_RAG_QUESTION, TG_DOC_RAG_QUESTION, TG_AGENT_QUESTION, + # Agent provenance predicates + TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_OBSERVATION, TG_ANSWER, # Named graphs GRAPH_DEFAULT, GRAPH_SOURCE, GRAPH_RETRIEVAL, ) @@ -74,15 +91,26 @@ from . triples import ( document_triples, derived_entity_triples, triple_provenance_triples, - # Query-time provenance triple builders + # Query-time provenance triple builders (GraphRAG) question_triples, exploration_triples, focus_triples, synthesis_triples, + # Query-time provenance triple builders (DocumentRAG) + docrag_question_triples, + docrag_exploration_triples, + docrag_synthesis_triples, # Utility set_graph, ) +# Agent provenance triple builders +from . agent import ( + agent_session_triples, + agent_iteration_triples, + agent_final_triples, +) + # Vocabulary bootstrap from . vocabulary import ( get_vocabulary_triples, @@ -107,6 +135,14 @@ __all__ = [ "exploration_uri", "focus_uri", "synthesis_uri", + # Agent provenance URIs + "agent_session_uri", + "agent_iteration_uri", + "agent_final_uri", + # Document RAG provenance URIs + "docrag_question_uri", + "docrag_exploration_uri", + "docrag_synthesis_uri", # Namespaces "PROV", "PROV_ENTITY", "PROV_ACTIVITY", "PROV_AGENT", "PROV_WAS_DERIVED_FROM", "PROV_WAS_GENERATED_BY", @@ -118,19 +154,36 @@ __all__ = [ "TG_CHUNK_SIZE", "TG_CHUNK_OVERLAP", "TG_COMPONENT_VERSION", "TG_LLM_MODEL", "TG_ONTOLOGY", "TG_EMBEDDING_MODEL", "TG_SOURCE_TEXT", "TG_SOURCE_CHAR_OFFSET", "TG_SOURCE_CHAR_LENGTH", - # Query-time provenance predicates + # Query-time provenance predicates (GraphRAG) "TG_QUERY", "TG_EDGE_COUNT", "TG_SELECTED_EDGE", "TG_REASONING", "TG_CONTENT", + # Query-time provenance predicates (DocumentRAG) + "TG_CHUNK_COUNT", "TG_SELECTED_CHUNK", + # Explainability entity types + "TG_QUESTION", "TG_EXPLORATION", "TG_FOCUS", "TG_SYNTHESIS", + "TG_ANALYSIS", "TG_CONCLUSION", + # Question subtypes + "TG_GRAPH_RAG_QUESTION", "TG_DOC_RAG_QUESTION", "TG_AGENT_QUESTION", + # Agent provenance predicates + "TG_THOUGHT", "TG_ACTION", "TG_ARGUMENTS", "TG_OBSERVATION", "TG_ANSWER", # Named graphs "GRAPH_DEFAULT", "GRAPH_SOURCE", "GRAPH_RETRIEVAL", # Triple builders "document_triples", "derived_entity_triples", "triple_provenance_triples", - # Query-time provenance triple builders + # Query-time provenance triple builders (GraphRAG) "question_triples", "exploration_triples", "focus_triples", "synthesis_triples", + # Query-time provenance triple builders (DocumentRAG) + "docrag_question_triples", + "docrag_exploration_triples", + "docrag_synthesis_triples", + # Agent provenance triple builders + "agent_session_triples", + "agent_iteration_triples", + "agent_final_triples", # Utility "set_graph", # Vocabulary diff --git a/trustgraph-base/trustgraph/provenance/agent.py b/trustgraph-base/trustgraph/provenance/agent.py new file mode 100644 index 00000000..1f108795 --- /dev/null +++ b/trustgraph-base/trustgraph/provenance/agent.py @@ -0,0 +1,141 @@ +""" +Helper functions to build PROV-O triples for agent provenance. + +Agent provenance tracks the reasoning trace of ReAct agent sessions: +- Question: The root activity with query and timestamp +- Analysis: Each think/act/observe cycle +- Conclusion: The final answer +""" + +import json +from datetime import datetime +from typing import List, Optional, Dict, Any + +from .. schema import Triple, Term, IRI, LITERAL + +from . namespaces import ( + RDF_TYPE, RDFS_LABEL, + PROV_ACTIVITY, PROV_ENTITY, PROV_WAS_DERIVED_FROM, PROV_STARTED_AT_TIME, + TG_QUERY, TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_OBSERVATION, TG_ANSWER, + TG_QUESTION, TG_ANALYSIS, TG_CONCLUSION, + TG_AGENT_QUESTION, +) + + +def _iri(uri: str) -> Term: + """Create an IRI term.""" + return Term(type=IRI, iri=uri) + + +def _literal(value) -> Term: + """Create a literal term.""" + return Term(type=LITERAL, value=str(value)) + + +def _triple(s: str, p: str, o_term: Term) -> Triple: + """Create a triple with IRI subject and predicate.""" + return Triple(s=_iri(s), p=_iri(p), o=o_term) + + +def agent_session_triples( + session_uri: str, + query: str, + timestamp: Optional[str] = None, +) -> List[Triple]: + """ + Build triples for an agent session start (Question). + + Creates: + - Activity declaration with tg:Question type + - Query text and timestamp + + Args: + session_uri: URI of the session (from agent_session_uri) + query: The user's query text + timestamp: ISO timestamp (defaults to now) + + Returns: + List of Triple objects + """ + if timestamp is None: + timestamp = datetime.utcnow().isoformat() + "Z" + + return [ + _triple(session_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + _triple(session_uri, RDF_TYPE, _iri(TG_QUESTION)), + _triple(session_uri, RDF_TYPE, _iri(TG_AGENT_QUESTION)), + _triple(session_uri, RDFS_LABEL, _literal("Agent Question")), + _triple(session_uri, PROV_STARTED_AT_TIME, _literal(timestamp)), + _triple(session_uri, TG_QUERY, _literal(query)), + ] + + +def agent_iteration_triples( + iteration_uri: str, + parent_uri: str, + thought: str, + action: str, + arguments: Dict[str, Any], + observation: str, +) -> List[Triple]: + """ + Build triples for one agent iteration (Analysis - think/act/observe cycle). + + Creates: + - Entity declaration with tg:Analysis type + - wasDerivedFrom link to parent (previous iteration or session) + - Thought, action, arguments, and observation data + + Args: + iteration_uri: URI of this iteration (from agent_iteration_uri) + parent_uri: URI of the parent (previous iteration or session) + thought: The agent's reasoning/thought + action: The tool/action name + arguments: Arguments passed to the tool (will be JSON-encoded) + observation: The result/observation from the tool + + Returns: + List of Triple objects + """ + triples = [ + _triple(iteration_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(iteration_uri, RDF_TYPE, _iri(TG_ANALYSIS)), + _triple(iteration_uri, RDFS_LABEL, _literal(f"Analysis: {action}")), + _triple(iteration_uri, PROV_WAS_DERIVED_FROM, _iri(parent_uri)), + _triple(iteration_uri, TG_THOUGHT, _literal(thought)), + _triple(iteration_uri, TG_ACTION, _literal(action)), + _triple(iteration_uri, TG_ARGUMENTS, _literal(json.dumps(arguments))), + _triple(iteration_uri, TG_OBSERVATION, _literal(observation)), + ] + + return triples + + +def agent_final_triples( + final_uri: str, + parent_uri: str, + answer: str, +) -> List[Triple]: + """ + Build triples for an agent final answer (Conclusion). + + Creates: + - Entity declaration with tg:Conclusion type + - wasDerivedFrom link to parent (last iteration or session) + - The answer text + + Args: + final_uri: URI of the final answer (from agent_final_uri) + parent_uri: URI of the parent (last iteration or session if no iterations) + answer: The final answer text + + Returns: + List of Triple objects + """ + return [ + _triple(final_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(final_uri, RDF_TYPE, _iri(TG_CONCLUSION)), + _triple(final_uri, RDFS_LABEL, _literal("Conclusion")), + _triple(final_uri, PROV_WAS_DERIVED_FROM, _iri(parent_uri)), + _triple(final_uri, TG_ANSWER, _literal(answer)), + ] diff --git a/trustgraph-base/trustgraph/provenance/namespaces.py b/trustgraph-base/trustgraph/provenance/namespaces.py index aaca70db..15f1b7d3 100644 --- a/trustgraph-base/trustgraph/provenance/namespaces.py +++ b/trustgraph-base/trustgraph/provenance/namespaces.py @@ -59,7 +59,7 @@ TG_SOURCE_TEXT = TG + "sourceText" TG_SOURCE_CHAR_OFFSET = TG + "sourceCharOffset" TG_SOURCE_CHAR_LENGTH = TG + "sourceCharLength" -# Query-time provenance predicates +# Query-time provenance predicates (GraphRAG) TG_QUERY = TG + "query" TG_EDGE_COUNT = TG + "edgeCount" TG_SELECTED_EDGE = TG + "selectedEdge" @@ -68,6 +68,30 @@ TG_REASONING = TG + "reasoning" TG_CONTENT = TG + "content" TG_DOCUMENT = TG + "document" # Reference to document in librarian +# Query-time provenance predicates (DocumentRAG) +TG_CHUNK_COUNT = TG + "chunkCount" +TG_SELECTED_CHUNK = TG + "selectedChunk" + +# Explainability entity types (shared) +TG_QUESTION = TG + "Question" +TG_EXPLORATION = TG + "Exploration" +TG_FOCUS = TG + "Focus" +TG_SYNTHESIS = TG + "Synthesis" +TG_ANALYSIS = TG + "Analysis" +TG_CONCLUSION = TG + "Conclusion" + +# Question subtypes (to distinguish retrieval mechanism) +TG_GRAPH_RAG_QUESTION = TG + "GraphRagQuestion" +TG_DOC_RAG_QUESTION = TG + "DocRagQuestion" +TG_AGENT_QUESTION = TG + "AgentQuestion" + +# Agent provenance predicates +TG_THOUGHT = TG + "thought" +TG_ACTION = TG + "action" +TG_ARGUMENTS = TG + "arguments" +TG_OBSERVATION = TG + "observation" +TG_ANSWER = TG + "answer" + # Named graph URIs for RDF datasets # These separate different types of data while keeping them in the same collection GRAPH_DEFAULT = "" # Core knowledge facts (triples extracted from documents) diff --git a/trustgraph-base/trustgraph/provenance/triples.py b/trustgraph-base/trustgraph/provenance/triples.py index a1b04596..14581c6f 100644 --- a/trustgraph-base/trustgraph/provenance/triples.py +++ b/trustgraph-base/trustgraph/provenance/triples.py @@ -17,9 +17,15 @@ from . namespaces import ( TG_CHUNK_INDEX, TG_CHAR_OFFSET, TG_CHAR_LENGTH, TG_CHUNK_SIZE, TG_CHUNK_OVERLAP, TG_COMPONENT_VERSION, TG_LLM_MODEL, TG_ONTOLOGY, TG_REIFIES, - # Query-time provenance predicates + # Query-time provenance predicates (GraphRAG) TG_QUERY, TG_EDGE_COUNT, TG_SELECTED_EDGE, TG_EDGE, TG_REASONING, TG_CONTENT, TG_DOCUMENT, + # Query-time provenance predicates (DocumentRAG) + TG_CHUNK_COUNT, TG_SELECTED_CHUNK, + # Explainability entity types + TG_QUESTION, TG_EXPLORATION, TG_FOCUS, TG_SYNTHESIS, + # Question subtypes + TG_GRAPH_RAG_QUESTION, TG_DOC_RAG_QUESTION, ) from . uris import activity_uri, agent_uri, edge_selection_uri @@ -310,7 +316,9 @@ def question_triples( return [ _triple(question_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), - _triple(question_uri, RDFS_LABEL, _literal("GraphRAG question")), + _triple(question_uri, RDF_TYPE, _iri(TG_QUESTION)), + _triple(question_uri, RDF_TYPE, _iri(TG_GRAPH_RAG_QUESTION)), + _triple(question_uri, RDFS_LABEL, _literal("GraphRAG Question")), _triple(question_uri, PROV_STARTED_AT_TIME, _literal(timestamp)), _triple(question_uri, TG_QUERY, _literal(query)), ] @@ -339,6 +347,7 @@ def exploration_triples( """ return [ _triple(exploration_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(exploration_uri, RDF_TYPE, _iri(TG_EXPLORATION)), _triple(exploration_uri, RDFS_LABEL, _literal("Exploration")), _triple(exploration_uri, PROV_WAS_GENERATED_BY, _iri(question_uri)), _triple(exploration_uri, TG_EDGE_COUNT, _literal(edge_count)), @@ -383,6 +392,7 @@ def focus_triples( """ triples = [ _triple(focus_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(focus_uri, RDF_TYPE, _iri(TG_FOCUS)), _triple(focus_uri, RDFS_LABEL, _literal("Focus")), _triple(focus_uri, PROV_WAS_DERIVED_FROM, _iri(exploration_uri)), ] @@ -443,6 +453,7 @@ def synthesis_triples( """ triples = [ _triple(synthesis_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(synthesis_uri, RDF_TYPE, _iri(TG_SYNTHESIS)), _triple(synthesis_uri, RDFS_LABEL, _literal("Synthesis")), _triple(synthesis_uri, PROV_WAS_DERIVED_FROM, _iri(focus_uri)), ] @@ -455,3 +466,120 @@ def synthesis_triples( triples.append(_triple(synthesis_uri, TG_CONTENT, _literal(answer_text))) return triples + + +# Document RAG provenance triple builders +# +# Document RAG uses a subset of GraphRAG's model: +# Question - What was asked +# Exploration - Chunks retrieved from document store +# Synthesis - The final answer (no Focus step) + +def docrag_question_triples( + question_uri: str, + query: str, + timestamp: Optional[str] = None, +) -> List[Triple]: + """ + Build triples for a document RAG question activity. + + Creates: + - Activity declaration with tg:Question type + - Query text and timestamp + + Args: + question_uri: URI of the question (from docrag_question_uri) + query: The user's query text + timestamp: ISO timestamp (defaults to now) + + Returns: + List of Triple objects + """ + if timestamp is None: + timestamp = datetime.utcnow().isoformat() + "Z" + + return [ + _triple(question_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + _triple(question_uri, RDF_TYPE, _iri(TG_QUESTION)), + _triple(question_uri, RDF_TYPE, _iri(TG_DOC_RAG_QUESTION)), + _triple(question_uri, RDFS_LABEL, _literal("DocumentRAG Question")), + _triple(question_uri, PROV_STARTED_AT_TIME, _literal(timestamp)), + _triple(question_uri, TG_QUERY, _literal(query)), + ] + + +def docrag_exploration_triples( + exploration_uri: str, + question_uri: str, + chunk_count: int, + chunk_ids: Optional[List[str]] = None, +) -> List[Triple]: + """ + Build triples for a document RAG exploration entity (chunks retrieved). + + Creates: + - Entity declaration with tg:Exploration type + - wasGeneratedBy link to question + - Chunk count and optional chunk references + + Args: + exploration_uri: URI of the exploration entity + question_uri: URI of the parent question + chunk_count: Number of chunks retrieved + chunk_ids: Optional list of chunk URIs/IDs + + Returns: + List of Triple objects + """ + triples = [ + _triple(exploration_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(exploration_uri, RDF_TYPE, _iri(TG_EXPLORATION)), + _triple(exploration_uri, RDFS_LABEL, _literal("Exploration")), + _triple(exploration_uri, PROV_WAS_GENERATED_BY, _iri(question_uri)), + _triple(exploration_uri, TG_CHUNK_COUNT, _literal(chunk_count)), + ] + + # Add references to selected chunks + if chunk_ids: + for chunk_id in chunk_ids: + triples.append(_triple(exploration_uri, TG_SELECTED_CHUNK, _iri(chunk_id))) + + return triples + + +def docrag_synthesis_triples( + synthesis_uri: str, + exploration_uri: str, + answer_text: str = "", + document_id: Optional[str] = None, +) -> List[Triple]: + """ + Build triples for a document RAG synthesis entity (final answer). + + Creates: + - Entity declaration with tg:Synthesis type + - wasDerivedFrom link to exploration (skips focus step) + - Either document reference or inline content + + Args: + synthesis_uri: URI of the synthesis entity + exploration_uri: URI of the parent exploration entity + answer_text: The synthesized answer text (used if no document_id) + document_id: Optional librarian document ID (preferred over inline content) + + Returns: + List of Triple objects + """ + triples = [ + _triple(synthesis_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(synthesis_uri, RDF_TYPE, _iri(TG_SYNTHESIS)), + _triple(synthesis_uri, RDFS_LABEL, _literal("Synthesis")), + _triple(synthesis_uri, PROV_WAS_DERIVED_FROM, _iri(exploration_uri)), + ] + + if document_id: + triples.append(_triple(synthesis_uri, TG_DOCUMENT, _iri(document_id))) + elif answer_text: + triples.append(_triple(synthesis_uri, TG_CONTENT, _literal(answer_text))) + + return triples diff --git a/trustgraph-base/trustgraph/provenance/uris.py b/trustgraph-base/trustgraph/provenance/uris.py index 6c7d0a7f..b14abe76 100644 --- a/trustgraph-base/trustgraph/provenance/uris.py +++ b/trustgraph-base/trustgraph/provenance/uris.py @@ -138,3 +138,94 @@ def edge_selection_uri(session_id: str, edge_index: int) -> str: URN in format: urn:trustgraph:prov:edge:{uuid}:{index} """ return f"urn:trustgraph:prov:edge:{session_id}:{edge_index}" + + +# Agent provenance URIs +# These URIs use the urn:trustgraph:agent: namespace to distinguish agent +# provenance from GraphRAG question provenance + +def agent_session_uri(session_id: str = None) -> str: + """ + Generate URI for an agent session. + + Args: + session_id: Optional UUID string. Auto-generates if not provided. + + Returns: + URN in format: urn:trustgraph:agent:{uuid} + """ + if session_id is None: + session_id = str(uuid.uuid4()) + return f"urn:trustgraph:agent:{session_id}" + + +def agent_iteration_uri(session_id: str, iteration_num: int) -> str: + """ + Generate URI for an agent iteration. + + Args: + session_id: The session UUID. + iteration_num: 1-based iteration number. + + Returns: + URN in format: urn:trustgraph:agent:{uuid}/i{num} + """ + return f"urn:trustgraph:agent:{session_id}/i{iteration_num}" + + +def agent_final_uri(session_id: str) -> str: + """ + Generate URI for an agent final answer. + + Args: + session_id: The session UUID. + + Returns: + URN in format: urn:trustgraph:agent:{uuid}/final + """ + return f"urn:trustgraph:agent:{session_id}/final" + + +# Document RAG provenance URIs +# These URIs use the urn:trustgraph:docrag: namespace to distinguish +# document RAG provenance from graph RAG provenance + +def docrag_question_uri(session_id: str = None) -> str: + """ + Generate URI for a document RAG question activity. + + Args: + session_id: Optional UUID string. Auto-generates if not provided. + + Returns: + URN in format: urn:trustgraph:docrag:{uuid} + """ + if session_id is None: + session_id = str(uuid.uuid4()) + return f"urn:trustgraph:docrag:{session_id}" + + +def docrag_exploration_uri(session_id: str) -> str: + """ + Generate URI for a document RAG exploration entity (chunks retrieved). + + Args: + session_id: The session UUID. + + Returns: + URN in format: urn:trustgraph:docrag:{uuid}/exploration + """ + return f"urn:trustgraph:docrag:{session_id}/exploration" + + +def docrag_synthesis_uri(session_id: str) -> str: + """ + Generate URI for a document RAG synthesis entity (final answer). + + Args: + session_id: The session UUID. + + Returns: + URN in format: urn:trustgraph:docrag:{uuid}/synthesis + """ + return f"urn:trustgraph:docrag:{session_id}/synthesis" diff --git a/trustgraph-base/trustgraph/schema/services/agent.py b/trustgraph-base/trustgraph/schema/services/agent.py index 9f883ff2..35a387fc 100644 --- a/trustgraph-base/trustgraph/schema/services/agent.py +++ b/trustgraph-base/trustgraph/schema/services/agent.py @@ -23,7 +23,9 @@ class AgentRequest: group: list[str] | None = None history: list[AgentStep] = field(default_factory=list) user: str = "" # User context for multi-tenancy - streaming: bool = False # NEW: Enable streaming response delivery (default false) + collection: str = "default" # Collection for provenance traces + streaming: bool = False # Enable streaming response delivery (default false) + session_id: str = "" # For provenance tracking across iterations @dataclass class AgentResponse: diff --git a/trustgraph-base/trustgraph/schema/services/retrieval.py b/trustgraph-base/trustgraph/schema/services/retrieval.py index dd31444e..5b09b11e 100644 --- a/trustgraph-base/trustgraph/schema/services/retrieval.py +++ b/trustgraph-base/trustgraph/schema/services/retrieval.py @@ -42,5 +42,7 @@ class DocumentRagQuery: @dataclass class DocumentRagResponse: error: Error | None = None - response: str = "" + response: str | None = "" end_of_stream: bool = False + explain_id: str | None = None # Single explain URI (announced as created) + explain_graph: str | None = None # Named graph where explain was stored (e.g., urn:graph:retrieval) diff --git a/trustgraph-cli/trustgraph/cli/list_explain_traces.py b/trustgraph-cli/trustgraph/cli/list_explain_traces.py index 2690159c..d2bb28ea 100644 --- a/trustgraph-cli/trustgraph/cli/list_explain_traces.py +++ b/trustgraph-cli/trustgraph/cli/list_explain_traces.py @@ -1,8 +1,8 @@ """ -List all GraphRAG sessions (questions) in a collection. +List all explainability sessions (GraphRAG and Agent) in a collection. Queries for all questions stored in the retrieval graph and displays them -with their session IDs and timestamps. +with their session IDs, type (GraphRAG or Agent), and timestamps. Examples: tg-list-explain-traces -U trustgraph -C default @@ -24,8 +24,14 @@ default_collection = 'default' # Predicates TG = "https://trustgraph.ai/ns/" TG_QUERY = TG + "query" +TG_QUESTION = TG + "Question" +TG_ANALYSIS = TG + "Analysis" +TG_EXPLORATION = TG + "Exploration" PROV = "http://www.w3.org/ns/prov#" PROV_STARTED_AT_TIME = PROV + "startedAtTime" +PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom" +PROV_WAS_GENERATED_BY = PROV + "wasGeneratedBy" +RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" # Retrieval graph RETRIEVAL_GRAPH = "urn:graph:retrieval" @@ -117,8 +123,45 @@ def get_timestamp(socket, flow_id, user, collection, question_id): return "" +def get_session_type(socket, flow_id, user, collection, session_id): + """ + Get the type of session (Agent or GraphRAG). + + Both have tg:Question type, so we distinguish by URI pattern + or by checking what's derived from it. + """ + # Fast path: check URI pattern + if session_id.startswith("urn:trustgraph:agent:"): + return "Agent" + if session_id.startswith("urn:trustgraph:question:"): + return "GraphRAG" + + # Check what's derived from this entity + derived = query_triples( + socket, flow_id, user, collection, + p=PROV_WAS_DERIVED_FROM, o=session_id, g=RETRIEVAL_GRAPH + ) + generated = query_triples( + socket, flow_id, user, collection, + p=PROV_WAS_GENERATED_BY, o=session_id, g=RETRIEVAL_GRAPH + ) + + for s, p, o in derived + generated: + child_types = query_triples( + socket, flow_id, user, collection, + s=s, p=RDF_TYPE, g=RETRIEVAL_GRAPH + ) + for _, _, child_type in child_types: + if child_type == TG_ANALYSIS: + return "Agent" + if child_type == TG_EXPLORATION: + return "GraphRAG" + + return "GraphRAG" + + def list_sessions(socket, flow_id, user, collection, limit): - """List all GraphRAG sessions by finding questions.""" + """List all explainability sessions (GraphRAG and Agent) by finding questions.""" # Query for all triples with predicate = tg:query triples = query_triples( socket, flow_id, user, collection, @@ -129,9 +172,12 @@ def list_sessions(socket, flow_id, user, collection, limit): for question_id, _, query_text in triples: # Get timestamp if available timestamp = get_timestamp(socket, flow_id, user, collection, question_id) + # Get session type (Agent or GraphRAG) + session_type = get_session_type(socket, flow_id, user, collection, question_id) sessions.append({ "id": question_id, + "type": session_type, "question": query_text, "time": timestamp, }) @@ -154,18 +200,19 @@ def truncate_text(text, max_len=60): def print_table(sessions): """Print sessions as a table.""" if not sessions: - print("No GraphRAG sessions found.") + print("No explainability sessions found.") return rows = [] for session in sessions: rows.append([ session["id"], - truncate_text(session["question"], 50), + session.get("type", "Unknown"), + truncate_text(session["question"], 45), session.get("time", "") ]) - headers = ["Session ID", "Question", "Time"] + headers = ["Session ID", "Type", "Question", "Time"] print(tabulate(rows, headers=headers, tablefmt="simple")) diff --git a/trustgraph-cli/trustgraph/cli/show_explain_trace.py b/trustgraph-cli/trustgraph/cli/show_explain_trace.py index a678a180..d09b220c 100644 --- a/trustgraph-cli/trustgraph/cli/show_explain_trace.py +++ b/trustgraph-cli/trustgraph/cli/show_explain_trace.py @@ -1,11 +1,15 @@ """ -Show full explainability trace for a GraphRAG session. +Show full explainability trace for a GraphRAG or Agent session. -Given a question/session URI, displays the complete cascade: -Question -> Exploration -> Focus (edge selection) -> Synthesis (answer). +Given a question/session URI, displays the complete trace: +- GraphRAG: Question -> Exploration -> Focus (edge selection) -> Synthesis (answer) +- Agent: Session -> Iteration(s) (thought/action/observation) -> Final Answer + +The tool auto-detects the trace type based on rdf:type. Examples: tg-show-explain-trace -U trustgraph -C default "urn:trustgraph:question:abc123" + tg-show-explain-trace -U trustgraph -C default "urn:trustgraph:agent:abc123" tg-show-explain-trace --max-answer 1000 "urn:trustgraph:question:abc123" tg-show-explain-trace --show-provenance "urn:trustgraph:question:abc123" """ @@ -31,10 +35,25 @@ TG_REASONING = TG + "reasoning" TG_CONTENT = TG + "content" TG_DOCUMENT = TG + "document" TG_REIFIES = TG + "reifies" +# Explainability entity types +TG_QUESTION = TG + "Question" +TG_EXPLORATION = TG + "Exploration" +TG_FOCUS = TG + "Focus" +TG_SYNTHESIS = TG + "Synthesis" +TG_ANALYSIS = TG + "Analysis" +TG_CONCLUSION = TG + "Conclusion" + +# Agent predicates +TG_THOUGHT = TG + "thought" +TG_ACTION = TG + "action" +TG_ARGUMENTS = TG + "arguments" +TG_OBSERVATION = TG + "observation" +TG_ANSWER = TG + "answer" PROV = "http://www.w3.org/ns/prov#" PROV_STARTED_AT_TIME = PROV + "startedAtTime" PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom" PROV_WAS_GENERATED_BY = PROV + "wasGeneratedBy" +RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" RDFS_LABEL = "http://www.w3.org/2000/01/rdf-schema#label" # Graphs @@ -280,6 +299,186 @@ def format_edge(edge, label_cache=None, socket=None, flow_id=None, user=None, co return f"({s_label}, {p_label}, {o_label})" +def detect_trace_type(socket, flow_id, user, collection, entity_id): + """ + Detect whether an entity is an agent Question or GraphRAG Question. + + Both have rdf:type = tg:Question, so we distinguish by checking + what's derived from it: + - Agent: has tg:Analysis or tg:Conclusion derived + - GraphRAG: has tg:Exploration derived + + Also checks URI pattern as fallback: + - urn:trustgraph:agent: -> agent + - urn:trustgraph:question: -> graphrag + + Returns: + "agent" or "graphrag" + """ + # Check URI pattern first (fast path) + if entity_id.startswith("urn:trustgraph:agent:"): + return "agent" + if entity_id.startswith("urn:trustgraph:question:"): + return "graphrag" + + # Check what's derived from this entity + derived = find_by_predicate_object( + socket, flow_id, user, collection, + PROV_WAS_DERIVED_FROM, entity_id + ) + + # Also check wasGeneratedBy (GraphRAG exploration uses this) + generated = find_by_predicate_object( + socket, flow_id, user, collection, + PROV_WAS_GENERATED_BY, entity_id + ) + + all_children = derived + generated + + for child_id in all_children: + child_types = query_triples( + socket, flow_id, user, collection, + s=child_id, p=RDF_TYPE, g=RETRIEVAL_GRAPH + ) + for s, p, o in child_types: + if o == TG_ANALYSIS or o == TG_CONCLUSION: + return "agent" + if o == TG_EXPLORATION: + return "graphrag" + + # Default to graphrag + return "graphrag" + + +def build_agent_trace(socket, flow_id, user, collection, session_id, api=None, max_answer=500): + """Build the full explainability trace for an agent session.""" + trace = { + "session_id": session_id, + "type": "agent", + "question": None, + "time": None, + "iterations": [], + "final_answer": None, + } + + # Get session metadata + props = get_node_properties(socket, flow_id, user, collection, session_id) + trace["question"] = props.get(TG_QUERY, [None])[0] + trace["time"] = props.get(PROV_STARTED_AT_TIME, [None])[0] + + # Find all entities derived from this session (iterations and final) + # Start by looking for entities where prov:wasDerivedFrom = session_id + current_uri = session_id + iteration_num = 1 + + while True: + # Find entities derived from current + derived_ids = find_by_predicate_object( + socket, flow_id, user, collection, + PROV_WAS_DERIVED_FROM, current_uri + ) + + if not derived_ids: + break + + derived_id = derived_ids[0] + derived_props = get_node_properties(socket, flow_id, user, collection, derived_id) + + # Check type + types = derived_props.get(RDF_TYPE, []) + + if TG_ANALYSIS in types: + iteration = { + "id": derived_id, + "iteration_num": iteration_num, + "thought": derived_props.get(TG_THOUGHT, [None])[0], + "action": derived_props.get(TG_ACTION, [None])[0], + "arguments": derived_props.get(TG_ARGUMENTS, [None])[0], + "observation": derived_props.get(TG_OBSERVATION, [None])[0], + } + trace["iterations"].append(iteration) + current_uri = derived_id + iteration_num += 1 + + elif TG_CONCLUSION in types: + answer = derived_props.get(TG_ANSWER, [None])[0] + if answer and len(answer) > max_answer: + answer = answer[:max_answer] + "... [truncated]" + trace["final_answer"] = { + "id": derived_id, + "answer": answer, + } + break + + else: + # Unknown type, stop traversal + break + + return trace + + +def print_agent_text(trace): + """Print agent trace in text format.""" + print(f"=== Agent Session: {trace['session_id']} ===") + print() + + if trace["question"]: + print(f"Question: {trace['question']}") + if trace["time"]: + print(f"Time: {trace['time']}") + print() + + # Analysis steps + print("--- Analysis ---") + iterations = trace.get("iterations", []) + if iterations: + for iteration in iterations: + print(f"Analysis {iteration['iteration_num']}:") + print(f" Thought: {iteration.get('thought', 'N/A')}") + print(f" Action: {iteration.get('action', 'N/A')}") + + args = iteration.get('arguments') + if args: + # Try to pretty-print JSON arguments + try: + import json + args_obj = json.loads(args) + args_str = json.dumps(args_obj, indent=4) + # Indent each line + args_lines = args_str.split('\n') + print(f" Arguments:") + for line in args_lines: + print(f" {line}") + except: + print(f" Arguments: {args}") + else: + print(f" Arguments: N/A") + + obs = iteration.get('observation', 'N/A') + if obs and len(obs) > 200: + obs = obs[:200] + "... [truncated]" + print(f" Observation: {obs}") + print() + else: + print("No analysis steps recorded") + print() + + # Conclusion + print("--- Conclusion ---") + final = trace.get("final_answer") + if final and final.get("answer"): + print("Answer:") + for line in final["answer"].split("\n"): + print(f" {line}") + else: + print("No conclusion recorded") + + +def print_agent_json(trace): + """Print agent trace as JSON.""" + print(json.dumps(trace, indent=2)) + + def build_trace(socket, flow_id, user, collection, question_id, api=None, show_provenance=False, max_answer=500): """Build the full explainability trace for a question.""" label_cache = {} @@ -530,21 +729,48 @@ def main(): socket = api.socket() try: - trace = build_trace( + # Detect trace type (agent vs graphrag) + trace_type = detect_trace_type( socket=socket, flow_id=args.flow_id, user=args.user, collection=args.collection, - question_id=args.question_id, - api=api, - show_provenance=args.show_provenance, - max_answer=args.max_answer, + entity_id=args.question_id, ) - if args.format == 'json': - print_json(trace) + if trace_type == "agent": + # Build and print agent trace + trace = build_agent_trace( + socket=socket, + flow_id=args.flow_id, + user=args.user, + collection=args.collection, + session_id=args.question_id, + api=api, + max_answer=args.max_answer, + ) + + if args.format == 'json': + print_agent_json(trace) + else: + print_agent_text(trace) else: - print_text(trace, show_provenance=args.show_provenance) + # Build and print GraphRAG trace (existing behavior) + trace = build_trace( + socket=socket, + flow_id=args.flow_id, + user=args.user, + collection=args.collection, + question_id=args.question_id, + api=api, + show_provenance=args.show_provenance, + max_answer=args.max_answer, + ) + + if args.format == 'json': + print_json(trace) + else: + print_text(trace, show_provenance=args.show_provenance) finally: socket.close() diff --git a/trustgraph-flow/trustgraph/agent/react/service.py b/trustgraph-flow/trustgraph/agent/react/service.py index 1c96adef..2ae93b05 100755 --- a/trustgraph-flow/trustgraph/agent/react/service.py +++ b/trustgraph-flow/trustgraph/agent/react/service.py @@ -7,6 +7,8 @@ import re import sys import functools import logging +import uuid +from datetime import datetime # Module logger logger = logging.getLogger(__name__) @@ -14,8 +16,22 @@ logger = logging.getLogger(__name__) from ... base import AgentService, TextCompletionClientSpec, PromptClientSpec from ... base import GraphRagClientSpec, ToolClientSpec, StructuredQueryClientSpec from ... base import RowEmbeddingsQueryClientSpec, EmbeddingsClientSpec +from ... base import ProducerSpec from ... schema import AgentRequest, AgentResponse, AgentStep, Error +from ... schema import Triples, Metadata + +# Provenance imports for agent explainability +from trustgraph.provenance import ( + agent_session_uri, + agent_iteration_uri, + agent_final_uri, + agent_session_triples, + agent_iteration_triples, + agent_final_triples, + set_graph, + GRAPH_RETRIEVAL, +) from . tools import KnowledgeQueryImpl, TextCompletionImpl, McpToolImpl, PromptImpl, StructuredQueryImpl, RowEmbeddingsQueryImpl, ToolServiceImpl from . agent_manager import AgentManager @@ -105,6 +121,14 @@ class Processor(AgentService): ) ) + # Explainability producer for agent provenance triples + self.register_specification( + ProducerSpec( + name = "explainability", + schema = Triples, + ) + ) + async def on_tools_config(self, config, version): logger.info(f"Loading configuration version {version}") @@ -285,6 +309,10 @@ class Processor(AgentService): # Check if streaming is enabled streaming = getattr(request, 'streaming', False) + # Generate or retrieve session ID for provenance tracking + session_id = getattr(request, 'session_id', '') or str(uuid.uuid4()) + collection = getattr(request, 'collection', 'default') + if request.history: history = [ Action( @@ -298,6 +326,27 @@ class Processor(AgentService): else: history = [] + # Calculate iteration number (1-based) + iteration_num = len(history) + 1 + session_uri = agent_session_uri(session_id) + + # On first iteration, emit session triples + if iteration_num == 1: + timestamp = datetime.utcnow().isoformat() + "Z" + triples = set_graph( + agent_session_triples(session_uri, request.question, timestamp), + GRAPH_RETRIEVAL + ) + await flow("explainability").send(Triples( + metadata=Metadata( + id=session_uri, + user=request.user, + collection=collection, + ), + triples=triples, + )) + logger.debug(f"Emitted session triples for {session_uri}") + logger.info(f"Question: {request.question}") if len(history) >= self.max_iterations: @@ -447,6 +496,28 @@ class Processor(AgentService): else: f = json.dumps(act.final) + # Emit final answer provenance triples + final_uri = agent_final_uri(session_id) + # Parent is last iteration, or session if no iterations + if iteration_num > 1: + parent_uri = agent_iteration_uri(session_id, iteration_num - 1) + else: + parent_uri = session_uri + + final_triples = set_graph( + agent_final_triples(final_uri, parent_uri, f), + GRAPH_RETRIEVAL + ) + await flow("explainability").send(Triples( + metadata=Metadata( + id=final_uri, + user=request.user, + collection=collection, + ), + triples=final_triples, + )) + logger.debug(f"Emitted final triples for {final_uri}") + if streaming: # Streaming format - send end-of-dialog marker # Answer chunks were already sent via answer() callback during parsing @@ -479,8 +550,37 @@ class Processor(AgentService): logger.debug("Send next...") + # Emit iteration provenance triples + iteration_uri = agent_iteration_uri(session_id, iteration_num) + # Parent is previous iteration, or session if this is first iteration + if iteration_num > 1: + parent_uri = agent_iteration_uri(session_id, iteration_num - 1) + else: + parent_uri = session_uri + + iter_triples = set_graph( + agent_iteration_triples( + iteration_uri, + parent_uri, + act.thought, + act.name, + act.arguments, + act.observation, + ), + GRAPH_RETRIEVAL + ) + await flow("explainability").send(Triples( + metadata=Metadata( + id=iteration_uri, + user=request.user, + collection=collection, + ), + triples=iter_triples, + )) + logger.debug(f"Emitted iteration triples for {iteration_uri}") + history.append(act) - + # Handle state transitions if tool execution was successful next_state = request.state if act.name in filtered_tools: @@ -501,7 +601,9 @@ class Processor(AgentService): for h in history ], user=request.user, + collection=collection, streaming=streaming, + session_id=session_id, # Pass session_id for provenance continuity ) await next(r) diff --git a/trustgraph-flow/trustgraph/retrieval/document_rag/document_rag.py b/trustgraph-flow/trustgraph/retrieval/document_rag/document_rag.py index 5e77f733..7730ceac 100644 --- a/trustgraph-flow/trustgraph/retrieval/document_rag/document_rag.py +++ b/trustgraph-flow/trustgraph/retrieval/document_rag/document_rag.py @@ -1,6 +1,20 @@ import asyncio import logging +import uuid +from datetime import datetime + +# Provenance imports +from trustgraph.provenance import ( + docrag_question_uri, + docrag_exploration_uri, + docrag_synthesis_uri, + docrag_question_triples, + docrag_exploration_triples, + docrag_synthesis_triples, + set_graph, + GRAPH_RETRIEVAL, +) # Module logger logger = logging.getLogger(__name__) @@ -33,7 +47,14 @@ class Query: return qembeds[0] if qembeds else [] async def get_docs(self, query): + """ + Get documents (chunks) matching the query. + Returns: + tuple: (docs, chunk_ids) where: + - docs: list of document content strings + - chunk_ids: list of chunk IDs that were successfully fetched + """ vectors = await self.get_vector(query) if self.verbose: @@ -50,11 +71,13 @@ class Query: # Fetch chunk content from Garage docs = [] + chunk_ids = [] for match in chunk_matches: if match.chunk_id: try: content = await self.rag.fetch_chunk(match.chunk_id, self.user) docs.append(content) + chunk_ids.append(match.chunk_id) except Exception as e: logger.warning(f"Failed to fetch chunk {match.chunk_id}: {e}") @@ -63,7 +86,7 @@ class Query: for doc in docs: logger.debug(f" {doc[:100]}...") - return docs + return docs, chunk_ids class DocumentRag: @@ -86,17 +109,56 @@ class DocumentRag: async def query( self, query, user="trustgraph", collection="default", doc_limit=20, streaming=False, chunk_callback=None, + explain_callback=None, ): + """ + Execute a Document RAG query with optional explainability tracking. + Args: + query: The query string + user: User identifier + collection: Collection identifier + doc_limit: Max chunks to retrieve + streaming: Enable streaming LLM response + chunk_callback: async def callback(chunk, end_of_stream) for streaming + explain_callback: async def callback(triples, explain_id) for explainability + + Returns: + str: The synthesized answer text + """ if self.verbose: logger.debug("Constructing prompt...") + # Generate explainability URIs upfront + session_id = str(uuid.uuid4()) + q_uri = docrag_question_uri(session_id) + exp_uri = docrag_exploration_uri(session_id) + syn_uri = docrag_synthesis_uri(session_id) + + timestamp = datetime.utcnow().isoformat() + "Z" + + # Emit question explainability immediately + if explain_callback: + q_triples = set_graph( + docrag_question_triples(q_uri, query, timestamp), + GRAPH_RETRIEVAL + ) + await explain_callback(q_triples, q_uri) + q = Query( rag=self, user=user, collection=collection, verbose=self.verbose, doc_limit=doc_limit ) - docs = await q.get_docs(query) + docs, chunk_ids = await q.get_docs(query) + + # Emit exploration explainability after chunks retrieved + if explain_callback: + exp_triples = set_graph( + docrag_exploration_triples(exp_uri, q_uri, len(chunk_ids), chunk_ids), + GRAPH_RETRIEVAL + ) + await explain_callback(exp_triples, exp_uri) if self.verbose: logger.debug("Invoking LLM...") @@ -104,12 +166,21 @@ class DocumentRag: logger.debug(f"Query: {query}") if streaming and chunk_callback: + # Accumulate chunks for answer storage while forwarding to callback + accumulated_chunks = [] + + async def accumulating_callback(chunk, end_of_stream): + accumulated_chunks.append(chunk) + await chunk_callback(chunk, end_of_stream) + resp = await self.prompt_client.document_prompt( query=query, documents=docs, streaming=True, - chunk_callback=chunk_callback + chunk_callback=accumulating_callback ) + # Combine all chunks into full response + resp = "".join(accumulated_chunks) else: resp = await self.prompt_client.document_prompt( query=query, @@ -119,5 +190,17 @@ class DocumentRag: if self.verbose: logger.debug("Query processing complete") + # Emit synthesis explainability after answer generated + if explain_callback: + answer_text = resp if resp else "" + syn_triples = set_graph( + docrag_synthesis_triples(syn_uri, exp_uri, answer_text), + GRAPH_RETRIEVAL + ) + await explain_callback(syn_triples, syn_uri) + + if self.verbose: + logger.debug(f"Emitted explain for session {session_id}") + return resp diff --git a/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py b/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py index 3bc7113a..c1d96260 100755 --- a/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py +++ b/trustgraph-flow/trustgraph/retrieval/document_rag/rag.py @@ -11,6 +11,8 @@ import logging from ... schema import DocumentRagQuery, DocumentRagResponse, Error from ... schema import LibrarianRequest, LibrarianResponse from ... schema import librarian_request_queue, librarian_response_queue +from ... schema import Triples, Metadata +from ... provenance import GRAPH_RETRIEVAL from . document_rag import DocumentRag from ... base import FlowProcessor, ConsumerSpec, ProducerSpec from ... base import PromptClientSpec, EmbeddingsClientSpec @@ -78,6 +80,13 @@ class Processor(FlowProcessor): ) ) + self.register_specification( + ProducerSpec( + name = "explainability", + schema = Triples, + ) + ) + # Librarian client for fetching chunk content from Garage librarian_request_q = params.get( "librarian_request_queue", default_librarian_request_queue @@ -194,6 +203,29 @@ class Processor(FlowProcessor): else: doc_limit = self.doc_limit + # Real-time explainability callback - emits triples and IDs as they're generated + # Triples are stored in the user's collection with a named graph (urn:graph:retrieval) + async def send_explainability(triples, explain_id): + # Send triples to explainability queue - stores in same collection with named graph + await flow("explainability").send(Triples( + metadata=Metadata( + id=explain_id, + user=v.user, + collection=v.collection, # Store in user's collection + ), + triples=triples, + )) + + # Send explain ID and graph to response queue + await flow("response").send( + DocumentRagResponse( + response=None, + explain_id=explain_id, + explain_graph=GRAPH_RETRIEVAL, + ), + properties={"id": id} + ) + # Check if streaming is requested if v.streaming: # Define async callback for streaming chunks @@ -217,6 +249,7 @@ class Processor(FlowProcessor): doc_limit=doc_limit, streaming=True, chunk_callback=send_chunk, + explain_callback=send_explainability, ) else: # Non-streaming path (existing behavior) @@ -224,7 +257,8 @@ class Processor(FlowProcessor): v.query, user=v.user, collection=v.collection, - doc_limit=doc_limit + doc_limit=doc_limit, + explain_callback=send_explainability, ) await flow("response").send(