diff --git a/tests/integration/test_agent_manager_integration.py b/tests/integration/test_agent_manager_integration.py index 5db95638..652894a2 100644 --- a/tests/integration/test_agent_manager_integration.py +++ b/tests/integration/test_agent_manager_integration.py @@ -9,7 +9,7 @@ Following the TEST_STRATEGY.md approach for integration testing. import pytest import json -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, ANY, patch from trustgraph.agent.react.agent_manager import AgentManager from trustgraph.agent.react.tools import KnowledgeQueryImpl, TextCompletionImpl, McpToolImpl @@ -187,7 +187,7 @@ Final Answer: Machine learning is a field of AI that enables computers to learn # Verify tool was executed graph_rag_client = mock_flow_context("graph-rag-request") - graph_rag_client.rag.assert_called_once_with("What is machine learning?", collection="default") + graph_rag_client.rag.assert_called_once_with("What is machine learning?", collection="default", explain_callback=ANY, parent_uri=ANY) @pytest.mark.asyncio async def test_agent_manager_react_with_final_answer(self, agent_manager, mock_flow_context): @@ -272,7 +272,7 @@ Args: {{ # Verify correct service was called if tool_name == "knowledge_query": - mock_flow_context("graph-rag-request").rag.assert_called_with("test question", collection="default") + mock_flow_context("graph-rag-request").rag.assert_called_with("test question", collection="default", explain_callback=ANY, parent_uri=ANY) elif tool_name == "text_completion": mock_flow_context("prompt-request").question.assert_called() @@ -726,7 +726,7 @@ Final Answer: { # Assert graph_rag_client = mock_flow_context("graph-rag-request") - graph_rag_client.rag.assert_called_once_with("What is AI?", collection="default") + graph_rag_client.rag.assert_called_once_with("What is AI?", collection="default", explain_callback=ANY, parent_uri=ANY) @pytest.mark.asyncio async def test_knowledge_query_with_custom_collection(self, mock_flow_context): @@ -739,7 +739,7 @@ Final Answer: { # Assert graph_rag_client = mock_flow_context("graph-rag-request") - graph_rag_client.rag.assert_called_once_with("What is machine learning?", collection="custom_collection") + graph_rag_client.rag.assert_called_once_with("What is machine learning?", collection="custom_collection", explain_callback=ANY, parent_uri=ANY) @pytest.mark.asyncio async def test_knowledge_query_with_none_collection(self, mock_flow_context): @@ -752,7 +752,7 @@ Final Answer: { # Assert graph_rag_client = mock_flow_context("graph-rag-request") - graph_rag_client.rag.assert_called_once_with("Explain neural networks", collection="default") + graph_rag_client.rag.assert_called_once_with("Explain neural networks", collection="default", explain_callback=ANY, parent_uri=ANY) @pytest.mark.asyncio async def test_agent_manager_knowledge_query_collection_integration(self, mock_flow_context): @@ -810,7 +810,7 @@ Args: { # Verify the custom collection was used graph_rag_client = mock_flow_context("graph-rag-request") - graph_rag_client.rag.assert_called_once_with("Latest AI research?", collection="research_papers") + graph_rag_client.rag.assert_called_once_with("Latest AI research?", collection="research_papers", explain_callback=ANY, parent_uri=ANY) @pytest.mark.asyncio async def test_knowledge_query_multiple_collections(self, mock_flow_context): @@ -840,4 +840,4 @@ Args: { # Verify correct collection was used graph_rag_client = mock_flow_context("graph-rag-request") - graph_rag_client.rag.assert_called_once_with(question, collection=expected_collection) + graph_rag_client.rag.assert_called_once_with(question, collection=expected_collection, explain_callback=ANY, parent_uri=ANY) diff --git a/tests/unit/test_agent/test_agent_service_non_streaming.py b/tests/unit/test_agent/test_agent_service_non_streaming.py index ff630325..0b9b283a 100644 --- a/tests/unit/test_agent/test_agent_service_non_streaming.py +++ b/tests/unit/test_agent/test_agent_service_non_streaming.py @@ -39,7 +39,7 @@ class TestAgentServiceNonStreaming: mock_agent_manager_class.return_value = mock_agent_instance # Mock react to call think and observe callbacks - async def mock_react(question, history, think, observe, answer, context, streaming): + async def mock_react(question, history, think, observe, answer, context, streaming, on_action=None): await think("I need to solve this.", is_final=True) await observe("The answer is 4.", is_final=True) return Final(thought="Final answer", final="4") @@ -76,11 +76,22 @@ class TestAgentServiceNonStreaming: # Execute await processor.on_request(msg, consumer, flow) - # Verify: should have 3 responses (thought, observation, answer) - assert len(sent_responses) == 3, f"Expected 3 responses, got {len(sent_responses)}" + # Filter out explain events — those are always sent now + content_responses = [ + r for r in sent_responses if r.chunk_type != "explain" + ] + explain_responses = [ + r for r in sent_responses if r.chunk_type == "explain" + ] + + # Should have explain events for session, iteration, observation, and final + assert len(explain_responses) >= 1, "Expected at least 1 explain event" + + # Should have 3 content responses (thought, observation, answer) + assert len(content_responses) == 3, f"Expected 3 content responses, got {len(content_responses)}" # Check thought message - thought_response = sent_responses[0] + thought_response = content_responses[0] assert isinstance(thought_response, AgentResponse) assert thought_response.chunk_type == "thought" assert thought_response.content == "I need to solve this." @@ -88,7 +99,7 @@ class TestAgentServiceNonStreaming: assert thought_response.end_of_dialog is False, "Thought message must have end_of_dialog=False" # Check observation message - observation_response = sent_responses[1] + observation_response = content_responses[1] assert isinstance(observation_response, AgentResponse) assert observation_response.chunk_type == "observation" assert observation_response.content == "The answer is 4." @@ -120,7 +131,7 @@ class TestAgentServiceNonStreaming: mock_agent_manager_class.return_value = mock_agent_instance # Mock react to return Final directly - async def mock_react(question, history, think, observe, answer, context, streaming): + async def mock_react(question, history, think, observe, answer, context, streaming, on_action=None): return Final(thought="Final answer", final="4") mock_agent_instance.react = mock_react @@ -155,11 +166,22 @@ class TestAgentServiceNonStreaming: # Execute await processor.on_request(msg, consumer, flow) - # Verify: should have 1 response (final answer) - assert len(sent_responses) == 1, f"Expected 1 response, got {len(sent_responses)}" + # Filter out explain events — those are always sent now + content_responses = [ + r for r in sent_responses if r.chunk_type != "explain" + ] + explain_responses = [ + r for r in sent_responses if r.chunk_type == "explain" + ] + + # Should have explain events for session and final + assert len(explain_responses) >= 1, "Expected at least 1 explain event" + + # Should have 1 content response (final answer) + assert len(content_responses) == 1, f"Expected 1 content response, got {len(content_responses)}" # Check final answer message - answer_response = sent_responses[0] + answer_response = content_responses[0] assert isinstance(answer_response, AgentResponse) assert answer_response.chunk_type == "answer" assert answer_response.content == "4" diff --git a/tests/unit/test_agent/test_explainability_parsing.py b/tests/unit/test_agent/test_explainability_parsing.py index e09a7f1f..7035318d 100644 --- a/tests/unit/test_agent/test_explainability_parsing.py +++ b/tests/unit/test_agent/test_explainability_parsing.py @@ -13,6 +13,7 @@ from trustgraph.api.explainability import ( StepResult, Synthesis, Analysis, + Observation, Conclusion, TG_DECOMPOSITION, TG_FINDING, @@ -20,6 +21,7 @@ from trustgraph.api.explainability import ( TG_STEP_RESULT, TG_SYNTHESIS, TG_ANSWER_TYPE, + TG_OBSERVATION_TYPE, TG_ANALYSIS, TG_CONCLUSION, TG_DOCUMENT, @@ -74,6 +76,11 @@ class TestFromTriplesDispatch: entity = ExplainEntity.from_triples("urn:a", triples) assert isinstance(entity, Analysis) + def test_dispatches_observation(self): + triples = _make_triples("urn:o", [PROV_ENTITY, TG_OBSERVATION_TYPE]) + entity = ExplainEntity.from_triples("urn:o", triples) + assert isinstance(entity, Observation) + def test_dispatches_conclusion_unchanged(self): triples = _make_triples("urn:c", [PROV_ENTITY, TG_CONCLUSION, TG_ANSWER_TYPE]) diff --git a/tests/unit/test_agent/test_provenance_triples.py b/tests/unit/test_agent/test_provenance_triples.py index ed14d6ae..c83f4b08 100644 --- a/tests/unit/test_agent/test_provenance_triples.py +++ b/tests/unit/test_agent/test_provenance_triples.py @@ -14,7 +14,7 @@ from trustgraph.provenance import ( from trustgraph.provenance.namespaces import ( RDF_TYPE, RDFS_LABEL, - PROV_ENTITY, PROV_WAS_DERIVED_FROM, PROV_WAS_GENERATED_BY, + PROV_ENTITY, PROV_WAS_DERIVED_FROM, TG_DECOMPOSITION, TG_FINDING, TG_PLAN_TYPE, TG_STEP_RESULT, TG_SYNTHESIS, TG_ANSWER_TYPE, TG_DOCUMENT, TG_SUBAGENT_GOAL, TG_PLAN_STEP, @@ -63,7 +63,7 @@ class TestDecompositionTriples: "urn:decompose", "urn:session", ["goal-a"], ) ts = _triple_set(triples) - assert ("urn:decompose", PROV_WAS_GENERATED_BY, "urn:session") in ts + assert ("urn:decompose", PROV_WAS_DERIVED_FROM, "urn:session") in ts def test_includes_goals(self): goals = ["What is X?", "What is Y?", "What is Z?"] @@ -141,7 +141,7 @@ class TestPlanTriples: "urn:plan", "urn:session", ["step-a"], ) ts = _triple_set(triples) - assert ("urn:plan", PROV_WAS_GENERATED_BY, "urn:session") in ts + assert ("urn:plan", PROV_WAS_DERIVED_FROM, "urn:session") in ts def test_includes_steps(self): steps = ["Define X", "Research Y", "Analyse Z"] diff --git a/tests/unit/test_provenance/test_agent_provenance.py b/tests/unit/test_provenance/test_agent_provenance.py index 4efe24c7..d3f0ef8c 100644 --- a/tests/unit/test_provenance/test_agent_provenance.py +++ b/tests/unit/test_provenance/test_agent_provenance.py @@ -10,16 +10,18 @@ from trustgraph.schema import Triple, Term, IRI, LITERAL from trustgraph.provenance.agent import ( agent_session_triples, agent_iteration_triples, + agent_observation_triples, agent_final_triples, ) from trustgraph.provenance.namespaces import ( RDF_TYPE, RDFS_LABEL, - PROV_ACTIVITY, PROV_ENTITY, PROV_WAS_DERIVED_FROM, - PROV_WAS_GENERATED_BY, PROV_STARTED_AT_TIME, - TG_QUERY, TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_OBSERVATION, + PROV_ENTITY, PROV_WAS_DERIVED_FROM, + PROV_STARTED_AT_TIME, + TG_QUERY, TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_QUESTION, TG_ANALYSIS, TG_CONCLUSION, TG_DOCUMENT, TG_ANSWER_TYPE, TG_REFLECTION_TYPE, TG_THOUGHT_TYPE, TG_OBSERVATION_TYPE, + TG_TOOL_USE, TG_AGENT_QUESTION, ) @@ -63,7 +65,7 @@ class TestAgentSessionTriples: triples = agent_session_triples( self.SESSION_URI, "What is X?", "2024-01-01T00:00:00Z" ) - assert has_type(triples, self.SESSION_URI, PROV_ACTIVITY) + assert has_type(triples, self.SESSION_URI, PROV_ENTITY) assert has_type(triples, self.SESSION_URI, TG_QUESTION) assert has_type(triples, self.SESSION_URI, TG_AGENT_QUESTION) @@ -121,19 +123,17 @@ class TestAgentIterationTriples: ) assert has_type(triples, self.ITER_URI, PROV_ENTITY) assert has_type(triples, self.ITER_URI, TG_ANALYSIS) + assert has_type(triples, self.ITER_URI, TG_TOOL_USE) - def test_first_iteration_generated_by_question(self): - """First iteration uses wasGeneratedBy to link to question activity.""" + def test_first_iteration_derived_from_question(self): + """First iteration uses wasDerivedFrom to link to question entity.""" triples = agent_iteration_triples( self.ITER_URI, question_uri=self.SESSION_URI, action="search", ) - gen = find_triple(triples, PROV_WAS_GENERATED_BY, self.ITER_URI) - assert gen is not None - assert gen.o.iri == self.SESSION_URI - # Should NOT have wasDerivedFrom derived = find_triple(triples, PROV_WAS_DERIVED_FROM, self.ITER_URI) - assert derived is None + assert derived is not None + assert derived.o.iri == self.SESSION_URI def test_subsequent_iteration_derived_from_previous(self): """Subsequent iterations use wasDerivedFrom to link to previous iteration.""" @@ -144,9 +144,6 @@ class TestAgentIterationTriples: derived = find_triple(triples, PROV_WAS_DERIVED_FROM, self.ITER_URI) assert derived is not None assert derived.o.iri == self.PREV_URI - # Should NOT have wasGeneratedBy - gen = find_triple(triples, PROV_WAS_GENERATED_BY, self.ITER_URI) - assert gen is None def test_iteration_label_includes_action(self): triples = agent_iteration_triples( @@ -174,40 +171,24 @@ class TestAgentIterationTriples: # Thought has correct types assert has_type(triples, thought_uri, TG_REFLECTION_TYPE) assert has_type(triples, thought_uri, TG_THOUGHT_TYPE) - # Thought was generated by iteration - gen = find_triple(triples, PROV_WAS_GENERATED_BY, thought_uri) - assert gen is not None - assert gen.o.iri == self.ITER_URI + # Thought was derived from iteration + derived = find_triple(triples, PROV_WAS_DERIVED_FROM, thought_uri) + assert derived is not None + assert derived.o.iri == self.ITER_URI # Thought has document reference doc = find_triple(triples, TG_DOCUMENT, thought_uri) assert doc is not None assert doc.o.iri == thought_doc - def test_iteration_observation_sub_entity(self): - """Observation is a sub-entity with Reflection and Observation types.""" - obs_uri = "urn:trustgraph:agent:test-session/i1/observation" - obs_doc = "urn:doc:obs-1" + def test_iteration_no_observation_sub_entity(self): + """Iteration no longer embeds observation — it's a separate entity.""" triples = agent_iteration_triples( self.ITER_URI, question_uri=self.SESSION_URI, action="search", - observation_uri=obs_uri, - observation_document_id=obs_doc, ) - # Iteration links to observation sub-entity - obs_link = find_triple(triples, TG_OBSERVATION, self.ITER_URI) - assert obs_link is not None - assert obs_link.o.iri == obs_uri - # Observation has correct types - assert has_type(triples, obs_uri, TG_REFLECTION_TYPE) - assert has_type(triples, obs_uri, TG_OBSERVATION_TYPE) - # Observation was generated by iteration - gen = find_triple(triples, PROV_WAS_GENERATED_BY, obs_uri) - assert gen is not None - assert gen.o.iri == self.ITER_URI - # Observation has document reference - doc = find_triple(triples, TG_DOCUMENT, obs_uri) - assert doc is not None - assert doc.o.iri == obs_doc + # No TG_OBSERVATION predicate on the iteration + for t in triples: + assert "observation" not in t.p.iri.lower() or "Observation" not in t.p.iri def test_iteration_action_recorded(self): triples = agent_iteration_triples( @@ -240,19 +221,17 @@ class TestAgentIterationTriples: parsed = json.loads(arguments.o.value) assert parsed == {} - def test_iteration_no_thought_or_observation(self): - """Minimal iteration with just action — no thought or observation triples.""" + def test_iteration_no_thought(self): + """Minimal iteration with just action — no thought triples.""" triples = agent_iteration_triples( self.ITER_URI, question_uri=self.SESSION_URI, action="noop", ) thought = find_triple(triples, TG_THOUGHT, self.ITER_URI) - obs = find_triple(triples, TG_OBSERVATION, self.ITER_URI) assert thought is None - assert obs is None def test_iteration_chaining(self): - """First iteration uses wasGeneratedBy, second uses wasDerivedFrom.""" + """Both first and second iterations use wasDerivedFrom.""" iter1_uri = "urn:trustgraph:agent:sess/i1" iter2_uri = "urn:trustgraph:agent:sess/i2" @@ -263,13 +242,62 @@ class TestAgentIterationTriples: iter2_uri, previous_uri=iter1_uri, action="step2", ) - gen1 = find_triple(triples1, PROV_WAS_GENERATED_BY, iter1_uri) - assert gen1.o.iri == self.SESSION_URI + derived1 = find_triple(triples1, PROV_WAS_DERIVED_FROM, iter1_uri) + assert derived1.o.iri == self.SESSION_URI derived2 = find_triple(triples2, PROV_WAS_DERIVED_FROM, iter2_uri) assert derived2.o.iri == iter1_uri +# --------------------------------------------------------------------------- +# agent_observation_triples +# --------------------------------------------------------------------------- + +class TestAgentObservationTriples: + + OBS_URI = "urn:trustgraph:agent:test-session/i1/observation" + ITER_URI = "urn:trustgraph:agent:test-session/i1" + + def test_observation_types(self): + triples = agent_observation_triples( + self.OBS_URI, self.ITER_URI, + ) + assert has_type(triples, self.OBS_URI, PROV_ENTITY) + assert has_type(triples, self.OBS_URI, TG_OBSERVATION_TYPE) + + def test_observation_derived_from_iteration(self): + triples = agent_observation_triples( + self.OBS_URI, self.ITER_URI, + ) + derived = find_triple(triples, PROV_WAS_DERIVED_FROM, self.OBS_URI) + assert derived is not None + assert derived.o.iri == self.ITER_URI + + def test_observation_label(self): + triples = agent_observation_triples( + self.OBS_URI, self.ITER_URI, + ) + label = find_triple(triples, RDFS_LABEL, self.OBS_URI) + assert label is not None + assert label.o.value == "Observation" + + def test_observation_document(self): + doc_id = "urn:doc:obs-1" + triples = agent_observation_triples( + self.OBS_URI, self.ITER_URI, document_id=doc_id, + ) + doc = find_triple(triples, TG_DOCUMENT, self.OBS_URI) + assert doc is not None + assert doc.o.iri == doc_id + + def test_observation_no_document(self): + triples = agent_observation_triples( + self.OBS_URI, self.ITER_URI, + ) + doc = find_triple(triples, TG_DOCUMENT, self.OBS_URI) + assert doc is None + + # --------------------------------------------------------------------------- # agent_final_triples # --------------------------------------------------------------------------- @@ -296,19 +324,15 @@ class TestAgentFinalTriples: derived = find_triple(triples, PROV_WAS_DERIVED_FROM, self.FINAL_URI) assert derived is not None assert derived.o.iri == self.PREV_URI - gen = find_triple(triples, PROV_WAS_GENERATED_BY, self.FINAL_URI) - assert gen is None - def test_final_generated_by_question_when_no_iterations(self): - """When agent answers immediately, final uses wasGeneratedBy.""" + def test_final_derived_from_question_when_no_iterations(self): + """When agent answers immediately, final uses wasDerivedFrom to question.""" triples = agent_final_triples( self.FINAL_URI, question_uri=self.SESSION_URI, ) - gen = find_triple(triples, PROV_WAS_GENERATED_BY, self.FINAL_URI) - assert gen is not None - assert gen.o.iri == self.SESSION_URI derived = find_triple(triples, PROV_WAS_DERIVED_FROM, self.FINAL_URI) - assert derived is None + assert derived is not None + assert derived.o.iri == self.SESSION_URI def test_final_label(self): triples = agent_final_triples( diff --git a/tests/unit/test_provenance/test_explainability.py b/tests/unit/test_provenance/test_explainability.py index 62498c61..e2c7fcd1 100644 --- a/tests/unit/test_provenance/test_explainability.py +++ b/tests/unit/test_provenance/test_explainability.py @@ -16,6 +16,7 @@ from trustgraph.api.explainability import ( Synthesis, Reflection, Analysis, + Observation, Conclusion, parse_edge_selection_triples, extract_term_value, @@ -23,12 +24,12 @@ from trustgraph.api.explainability import ( ExplainabilityClient, TG_QUERY, TG_EDGE_COUNT, TG_SELECTED_EDGE, TG_EDGE, TG_REASONING, TG_DOCUMENT, TG_CHUNK_COUNT, TG_CONCEPT, TG_ENTITY, - TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_OBSERVATION, + TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_QUESTION, TG_GROUNDING, TG_EXPLORATION, TG_FOCUS, TG_SYNTHESIS, TG_ANALYSIS, TG_CONCLUSION, TG_REFLECTION_TYPE, TG_THOUGHT_TYPE, TG_OBSERVATION_TYPE, TG_GRAPH_RAG_QUESTION, TG_DOC_RAG_QUESTION, TG_AGENT_QUESTION, - PROV_STARTED_AT_TIME, PROV_WAS_DERIVED_FROM, PROV_WAS_GENERATED_BY, + PROV_STARTED_AT_TIME, PROV_WAS_DERIVED_FROM, RDF_TYPE, RDFS_LABEL, ) @@ -180,14 +181,30 @@ class TestExplainEntityFromTriples: ("urn:ana:1", TG_ACTION, "graph-rag-query"), ("urn:ana:1", TG_ARGUMENTS, '{"query": "test"}'), ("urn:ana:1", TG_THOUGHT, "urn:ref:thought-1"), - ("urn:ana:1", TG_OBSERVATION, "urn:ref:obs-1"), ] entity = ExplainEntity.from_triples("urn:ana:1", triples) assert isinstance(entity, Analysis) assert entity.action == "graph-rag-query" assert entity.arguments == '{"query": "test"}' assert entity.thought == "urn:ref:thought-1" - assert entity.observation == "urn:ref:obs-1" + + def test_observation(self): + triples = [ + ("urn:obs:1", RDF_TYPE, TG_OBSERVATION_TYPE), + ("urn:obs:1", TG_DOCUMENT, "urn:doc:obs-content"), + ] + entity = ExplainEntity.from_triples("urn:obs:1", triples) + assert isinstance(entity, Observation) + assert entity.document == "urn:doc:obs-content" + assert entity.entity_type == "observation" + + def test_observation_no_document(self): + triples = [ + ("urn:obs:2", RDF_TYPE, TG_OBSERVATION_TYPE), + ] + entity = ExplainEntity.from_triples("urn:obs:2", triples) + assert isinstance(entity, Observation) + assert entity.document == "" def test_conclusion_with_document(self): triples = [ diff --git a/tests/unit/test_provenance/test_triples.py b/tests/unit/test_provenance/test_triples.py index 9aff7e4b..792db028 100644 --- a/tests/unit/test_provenance/test_triples.py +++ b/tests/unit/test_provenance/test_triples.py @@ -500,7 +500,7 @@ class TestQuestionTriples: def test_question_types(self): triples = question_triples(self.Q_URI, "What is AI?", "2024-01-01T00:00:00Z") - assert has_type(triples, self.Q_URI, PROV_ACTIVITY) + assert has_type(triples, self.Q_URI, PROV_ENTITY) assert has_type(triples, self.Q_URI, TG_QUESTION) assert has_type(triples, self.Q_URI, TG_GRAPH_RAG_QUESTION) @@ -543,11 +543,11 @@ class TestGroundingTriples: assert has_type(triples, self.GND_URI, PROV_ENTITY) assert has_type(triples, self.GND_URI, TG_GROUNDING) - def test_grounding_generated_by_question(self): + def test_grounding_derived_from_question(self): triples = grounding_triples(self.GND_URI, self.Q_URI, ["AI"]) - gen = find_triple(triples, PROV_WAS_GENERATED_BY, self.GND_URI) - assert gen is not None - assert gen.o.iri == self.Q_URI + derived = find_triple(triples, PROV_WAS_DERIVED_FROM, self.GND_URI) + assert derived is not None + assert derived.o.iri == self.Q_URI def test_grounding_concepts(self): triples = grounding_triples(self.GND_URI, self.Q_URI, ["AI", "ML", "robots"]) @@ -730,7 +730,7 @@ class TestDocRagQuestionTriples: def test_docrag_question_types(self): triples = docrag_question_triples(self.Q_URI, "Find info", "2024-01-01T00:00:00Z") - assert has_type(triples, self.Q_URI, PROV_ACTIVITY) + assert has_type(triples, self.Q_URI, PROV_ENTITY) assert has_type(triples, self.Q_URI, TG_QUESTION) assert has_type(triples, self.Q_URI, TG_DOC_RAG_QUESTION) diff --git a/trustgraph-base/trustgraph/api/__init__.py b/trustgraph-base/trustgraph/api/__init__.py index e956db65..8b703dc7 100644 --- a/trustgraph-base/trustgraph/api/__init__.py +++ b/trustgraph-base/trustgraph/api/__init__.py @@ -81,6 +81,7 @@ from .explainability import ( Synthesis, Reflection, Analysis, + Observation, Conclusion, Decomposition, Finding, @@ -164,6 +165,7 @@ __all__ = [ "Focus", "Synthesis", "Analysis", + "Observation", "Conclusion", "EdgeSelection", "wire_triples_to_tuples", diff --git a/trustgraph-base/trustgraph/api/explainability.py b/trustgraph-base/trustgraph/api/explainability.py index ee7fd05e..fa6c4a0c 100644 --- a/trustgraph-base/trustgraph/api/explainability.py +++ b/trustgraph-base/trustgraph/api/explainability.py @@ -40,6 +40,7 @@ TG_ANSWER_TYPE = TG + "Answer" TG_REFLECTION_TYPE = TG + "Reflection" TG_THOUGHT_TYPE = TG + "Thought" TG_OBSERVATION_TYPE = TG + "Observation" +TG_TOOL_USE = TG + "ToolUse" TG_GRAPH_RAG_QUESTION = TG + "GraphRagQuestion" TG_DOC_RAG_QUESTION = TG + "DocRagQuestion" TG_AGENT_QUESTION = TG + "AgentQuestion" @@ -58,7 +59,6 @@ TG_PLAN_STEP = TG + "planStep" PROV = "http://www.w3.org/ns/prov#" PROV_STARTED_AT_TIME = PROV + "startedAtTime" PROV_WAS_DERIVED_FROM = PROV + "wasDerivedFrom" -PROV_WAS_GENERATED_BY = PROV + "wasGeneratedBy" RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" RDFS_LABEL = "http://www.w3.org/2000/01/rdf-schema#label" @@ -102,6 +102,8 @@ class ExplainEntity: return StepResult.from_triples(uri, triples) elif TG_SYNTHESIS in types: return Synthesis.from_triples(uri, triples) + elif TG_OBSERVATION_TYPE in types and TG_REFLECTION_TYPE not in types: + return Observation.from_triples(uri, triples) elif TG_REFLECTION_TYPE in types: return Reflection.from_triples(uri, triples) elif TG_ANALYSIS in types: @@ -279,18 +281,16 @@ class Reflection(ExplainEntity): @dataclass class Analysis(ExplainEntity): - """Analysis entity - one think/act/observe cycle (Agent only).""" + """Analysis+ToolUse entity - decision + tool call (Agent only).""" action: str = "" arguments: str = "" # JSON string thought: str = "" - observation: str = "" @classmethod def from_triples(cls, uri: str, triples: List[Tuple[str, str, Any]]) -> "Analysis": action = "" arguments = "" thought = "" - observation = "" for s, p, o in triples: if p == TG_ACTION: @@ -299,8 +299,6 @@ class Analysis(ExplainEntity): arguments = o elif p == TG_THOUGHT: thought = o - elif p == TG_OBSERVATION: - observation = o return cls( uri=uri, @@ -308,7 +306,26 @@ class Analysis(ExplainEntity): action=action, arguments=arguments, thought=thought, - observation=observation + ) + + +@dataclass +class Observation(ExplainEntity): + """Observation entity - standalone tool result (Agent only).""" + document: str = "" + + @classmethod + def from_triples(cls, uri: str, triples: List[Tuple[str, str, Any]]) -> "Observation": + document = "" + + for s, p, o in triples: + if p == TG_DOCUMENT: + document = o + + return cls( + uri=uri, + entity_type="observation", + document=document, ) @@ -757,9 +774,9 @@ class ExplainabilityClient: return trace trace["question"] = question - # Find grounding: ?grounding prov:wasGeneratedBy question_uri + # Find grounding: ?grounding prov:wasDerivedFrom question_uri grounding_triples = self.flow.triples_query( - p=PROV_WAS_GENERATED_BY, + p=PROV_WAS_DERIVED_FROM, o=question_uri, g=graph, user=user, @@ -894,9 +911,9 @@ class ExplainabilityClient: return trace trace["question"] = question - # Find grounding: ?grounding prov:wasGeneratedBy question_uri + # Find grounding: ?grounding prov:wasDerivedFrom question_uri grounding_triples = self.flow.triples_query( - p=PROV_WAS_GENERATED_BY, + p=PROV_WAS_DERIVED_FROM, o=question_uri, g=graph, user=user, @@ -1010,41 +1027,26 @@ class ExplainabilityClient: # Follow the provenance chain from the question self._follow_provenance_chain( session_uri, trace, graph, user, collection, - is_first=True, max_depth=50, + max_depth=50, ) return trace def _follow_provenance_chain( self, current_uri, trace, graph, user, collection, - is_first=False, max_depth=50, + max_depth=50, ): """Recursively follow the provenance chain, handling branches.""" if max_depth <= 0: return # Find entities derived from current_uri - if is_first: - derived_triples = self.flow.triples_query( - p=PROV_WAS_GENERATED_BY, - o=current_uri, - g=graph, user=user, collection=collection, - limit=20 - ) - if not derived_triples: - derived_triples = self.flow.triples_query( - p=PROV_WAS_DERIVED_FROM, - o=current_uri, - g=graph, user=user, collection=collection, - limit=20 - ) - else: - derived_triples = self.flow.triples_query( - p=PROV_WAS_DERIVED_FROM, - o=current_uri, - g=graph, user=user, collection=collection, - limit=20 - ) + derived_triples = self.flow.triples_query( + p=PROV_WAS_DERIVED_FROM, + o=current_uri, + g=graph, user=user, collection=collection, + limit=20 + ) if not derived_triples: return @@ -1062,8 +1064,8 @@ class ExplainabilityClient: if entity is None: continue - if isinstance(entity, (Analysis, Decomposition, Finding, - Plan, StepResult)): + if isinstance(entity, (Analysis, Observation, Decomposition, + Finding, Plan, StepResult)): trace["steps"].append(entity) # Continue following from this entity @@ -1072,6 +1074,27 @@ class ExplainabilityClient: max_depth=max_depth - 1, ) + elif isinstance(entity, Question): + # Sub-trace: a RAG session linked to this agent step. + # Fetch the full sub-trace and embed it. + if entity.question_type == "graph-rag": + sub_trace = self.fetch_graphrag_trace( + derived_uri, graph, user, collection, + ) + elif entity.question_type == "document-rag": + sub_trace = self.fetch_docrag_trace( + derived_uri, graph, user, collection, + ) + else: + sub_trace = None + + if sub_trace: + trace["steps"].append({ + "type": "sub-trace", + "question": entity, + "trace": sub_trace, + }) + elif isinstance(entity, (Conclusion, Synthesis)): trace["steps"].append(entity) @@ -1114,10 +1137,25 @@ class ExplainabilityClient: if isinstance(entity, Question): questions.append(entity) - # Sort by timestamp (newest first) - questions.sort(key=lambda q: q.timestamp or "", reverse=True) + # Filter out sub-traces: sessions that have a wasDerivedFrom link + # (they are child sessions linked to a parent agent iteration) + top_level = [] + for q in questions: + parent_triples = self.flow.triples_query( + s=q.uri, + p=PROV_WAS_DERIVED_FROM, + g=graph, + user=user, + collection=collection, + limit=1 + ) + if not parent_triples: + top_level.append(q) - return questions + # Sort by timestamp (newest first) + top_level.sort(key=lambda q: q.timestamp or "", reverse=True) + + return top_level def detect_session_type( self, @@ -1159,18 +1197,9 @@ class ExplainabilityClient: limit=5 ) - generated_triples = self.flow.triples_query( - p=PROV_WAS_GENERATED_BY, - o=session_uri, - g=graph, - user=user, - collection=collection, - limit=5 - ) - all_child_uris = [ extract_term_value(t.get("s", {})) - for t in (derived_triples + generated_triples) + for t in derived_triples ] for child_uri in all_child_uris: diff --git a/trustgraph-base/trustgraph/api/socket_client.py b/trustgraph-base/trustgraph/api/socket_client.py index e5f63c79..3b463762 100644 --- a/trustgraph-base/trustgraph/api/socket_client.py +++ b/trustgraph-base/trustgraph/api/socket_client.py @@ -384,12 +384,14 @@ class SocketClient: if chunk_type == "thought": return AgentThought( content=resp.get("content", ""), - end_of_message=resp.get("end_of_message", False) + end_of_message=resp.get("end_of_message", False), + message_id=resp.get("message_id", ""), ) elif chunk_type == "observation": return AgentObservation( content=resp.get("content", ""), - end_of_message=resp.get("end_of_message", False) + end_of_message=resp.get("end_of_message", False), + message_id=resp.get("message_id", ""), ) elif chunk_type == "answer" or chunk_type == "final-answer": return AgentAnswer( diff --git a/trustgraph-base/trustgraph/api/types.py b/trustgraph-base/trustgraph/api/types.py index d39310f2..3e3f1520 100644 --- a/trustgraph-base/trustgraph/api/types.py +++ b/trustgraph-base/trustgraph/api/types.py @@ -150,8 +150,10 @@ class AgentThought(StreamingChunk): content: Agent's thought text end_of_message: True if this completes the current thought chunk_type: Always "thought" + message_id: Provenance URI of the entity being built """ chunk_type: str = "thought" + message_id: str = "" @dataclasses.dataclass class AgentObservation(StreamingChunk): @@ -165,8 +167,10 @@ class AgentObservation(StreamingChunk): content: Observation text describing tool results end_of_message: True if this completes the current observation chunk_type: Always "observation" + message_id: Provenance URI of the entity being built """ chunk_type: str = "observation" + message_id: str = "" @dataclasses.dataclass class AgentAnswer(StreamingChunk): diff --git a/trustgraph-base/trustgraph/base/graph_rag_client.py b/trustgraph-base/trustgraph/base/graph_rag_client.py index 66dbad1e..32007943 100644 --- a/trustgraph-base/trustgraph/base/graph_rag_client.py +++ b/trustgraph-base/trustgraph/base/graph_rag_client.py @@ -5,6 +5,7 @@ from .. schema import GraphRagQuery, GraphRagResponse class GraphRagClient(RequestResponse): async def rag(self, query, user="trustgraph", collection="default", chunk_callback=None, explain_callback=None, + parent_uri="", timeout=600): """ Execute a graph RAG query with optional streaming callbacks. @@ -50,6 +51,7 @@ class GraphRagClient(RequestResponse): query = query, user = user, collection = collection, + parent_uri = parent_uri, ), timeout=timeout, recipient=recipient, diff --git a/trustgraph-base/trustgraph/provenance/__init__.py b/trustgraph-base/trustgraph/provenance/__init__.py index 304f17a7..e6ce0a9e 100644 --- a/trustgraph-base/trustgraph/provenance/__init__.py +++ b/trustgraph-base/trustgraph/provenance/__init__.py @@ -96,6 +96,7 @@ from . namespaces import ( TG_ANALYSIS, TG_CONCLUSION, # Unifying types TG_ANSWER_TYPE, TG_REFLECTION_TYPE, TG_THOUGHT_TYPE, TG_OBSERVATION_TYPE, + TG_TOOL_USE, # Question subtypes (to distinguish retrieval mechanism) TG_GRAPH_RAG_QUESTION, TG_DOC_RAG_QUESTION, TG_AGENT_QUESTION, # Agent provenance predicates @@ -132,6 +133,7 @@ from . triples import ( from . agent import ( agent_session_triples, agent_iteration_triples, + agent_observation_triples, agent_final_triples, # Orchestrator provenance triple builders agent_decomposition_triples, @@ -210,6 +212,7 @@ __all__ = [ "TG_ANALYSIS", "TG_CONCLUSION", # Unifying types "TG_ANSWER_TYPE", "TG_REFLECTION_TYPE", "TG_THOUGHT_TYPE", "TG_OBSERVATION_TYPE", + "TG_TOOL_USE", # Question subtypes "TG_GRAPH_RAG_QUESTION", "TG_DOC_RAG_QUESTION", "TG_AGENT_QUESTION", # Agent provenance predicates @@ -238,6 +241,7 @@ __all__ = [ # Agent provenance triple builders "agent_session_triples", "agent_iteration_triples", + "agent_observation_triples", "agent_final_triples", # Orchestrator provenance triple builders "agent_decomposition_triples", diff --git a/trustgraph-base/trustgraph/provenance/agent.py b/trustgraph-base/trustgraph/provenance/agent.py index d25109a7..4fc1f2b5 100644 --- a/trustgraph-base/trustgraph/provenance/agent.py +++ b/trustgraph-base/trustgraph/provenance/agent.py @@ -20,11 +20,12 @@ from .. schema import Triple, Term, IRI, LITERAL from . namespaces import ( RDF_TYPE, RDFS_LABEL, - PROV_ACTIVITY, PROV_ENTITY, PROV_WAS_DERIVED_FROM, - PROV_WAS_GENERATED_BY, PROV_STARTED_AT_TIME, - TG_QUERY, TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_OBSERVATION, + PROV_ENTITY, PROV_WAS_DERIVED_FROM, + PROV_STARTED_AT_TIME, + TG_QUERY, TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_QUESTION, TG_ANALYSIS, TG_CONCLUSION, TG_DOCUMENT, TG_ANSWER_TYPE, TG_REFLECTION_TYPE, TG_THOUGHT_TYPE, TG_OBSERVATION_TYPE, + TG_TOOL_USE, TG_AGENT_QUESTION, TG_DECOMPOSITION, TG_FINDING, TG_PLAN_TYPE, TG_STEP_RESULT, TG_SYNTHESIS, TG_SUBAGENT_GOAL, TG_PLAN_STEP, @@ -70,7 +71,7 @@ def agent_session_triples( timestamp = datetime.utcnow().isoformat() + "Z" return [ - _triple(session_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + _triple(session_uri, RDF_TYPE, _iri(PROV_ENTITY)), _triple(session_uri, RDF_TYPE, _iri(TG_QUESTION)), _triple(session_uri, RDF_TYPE, _iri(TG_AGENT_QUESTION)), _triple(session_uri, RDFS_LABEL, _literal("Agent Question")), @@ -87,19 +88,15 @@ def agent_iteration_triples( arguments: Dict[str, Any] = None, thought_uri: Optional[str] = None, thought_document_id: Optional[str] = None, - observation_uri: Optional[str] = None, - observation_document_id: Optional[str] = None, ) -> List[Triple]: """ - Build triples for one agent iteration (Analysis - think/act/observe cycle). + Build triples for one agent iteration (Analysis+ToolUse). Creates: - - Entity declaration with tg:Analysis type - - wasGeneratedBy link to question (if first iteration) - - wasDerivedFrom link to previous iteration (if not first) + - Entity declaration with tg:Analysis and tg:ToolUse types + - wasDerivedFrom link to question (if first iteration) or previous - Action and arguments metadata - Thought sub-entity (tg:Reflection, tg:Thought) with librarian document - - Observation sub-entity (tg:Reflection, tg:Observation) with librarian document Args: iteration_uri: URI of this iteration (from agent_iteration_uri) @@ -109,8 +106,6 @@ def agent_iteration_triples( arguments: Arguments passed to the tool (will be JSON-encoded) thought_uri: URI for the thought sub-entity thought_document_id: Document URI for thought in librarian - observation_uri: URI for the observation sub-entity - observation_document_id: Document URI for observation in librarian Returns: List of Triple objects @@ -121,6 +116,7 @@ def agent_iteration_triples( triples = [ _triple(iteration_uri, RDF_TYPE, _iri(PROV_ENTITY)), _triple(iteration_uri, RDF_TYPE, _iri(TG_ANALYSIS)), + _triple(iteration_uri, RDF_TYPE, _iri(TG_TOOL_USE)), _triple(iteration_uri, RDFS_LABEL, _literal(f"Analysis: {action}")), _triple(iteration_uri, TG_ACTION, _literal(action)), _triple(iteration_uri, TG_ARGUMENTS, _literal(json.dumps(arguments))), @@ -128,7 +124,7 @@ def agent_iteration_triples( if question_uri: triples.append( - _triple(iteration_uri, PROV_WAS_GENERATED_BY, _iri(question_uri)) + _triple(iteration_uri, PROV_WAS_DERIVED_FROM, _iri(question_uri)) ) elif previous_uri: triples.append( @@ -142,26 +138,48 @@ def agent_iteration_triples( _triple(thought_uri, RDF_TYPE, _iri(TG_REFLECTION_TYPE)), _triple(thought_uri, RDF_TYPE, _iri(TG_THOUGHT_TYPE)), _triple(thought_uri, RDFS_LABEL, _literal("Thought")), - _triple(thought_uri, PROV_WAS_GENERATED_BY, _iri(iteration_uri)), + _triple(thought_uri, PROV_WAS_DERIVED_FROM, _iri(iteration_uri)), ]) if thought_document_id: triples.append( _triple(thought_uri, TG_DOCUMENT, _iri(thought_document_id)) ) - # Observation sub-entity - if observation_uri: - triples.extend([ - _triple(iteration_uri, TG_OBSERVATION, _iri(observation_uri)), - _triple(observation_uri, RDF_TYPE, _iri(TG_REFLECTION_TYPE)), - _triple(observation_uri, RDF_TYPE, _iri(TG_OBSERVATION_TYPE)), - _triple(observation_uri, RDFS_LABEL, _literal("Observation")), - _triple(observation_uri, PROV_WAS_GENERATED_BY, _iri(iteration_uri)), - ]) - if observation_document_id: - triples.append( - _triple(observation_uri, TG_DOCUMENT, _iri(observation_document_id)) - ) + return triples + + +def agent_observation_triples( + observation_uri: str, + iteration_uri: str, + document_id: Optional[str] = None, +) -> List[Triple]: + """ + Build triples for an agent observation (standalone entity). + + Creates: + - Entity declaration with prov:Entity and tg:Observation types + - wasDerivedFrom link to the iteration (Analysis+ToolUse) + - Document reference to librarian (if provided) + + Args: + observation_uri: URI of the observation entity + iteration_uri: URI of the iteration this observation derives from + document_id: Librarian document ID for the observation content + + Returns: + List of Triple objects + """ + triples = [ + _triple(observation_uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(observation_uri, RDF_TYPE, _iri(TG_OBSERVATION_TYPE)), + _triple(observation_uri, RDFS_LABEL, _literal("Observation")), + _triple(observation_uri, PROV_WAS_DERIVED_FROM, _iri(iteration_uri)), + ] + + if document_id: + triples.append( + _triple(observation_uri, TG_DOCUMENT, _iri(document_id)) + ) return triples @@ -199,7 +217,7 @@ def agent_final_triples( if question_uri: triples.append( - _triple(final_uri, PROV_WAS_GENERATED_BY, _iri(question_uri)) + _triple(final_uri, PROV_WAS_DERIVED_FROM, _iri(question_uri)) ) elif previous_uri: triples.append( @@ -223,7 +241,7 @@ def agent_decomposition_triples( _triple(uri, RDF_TYPE, _iri(TG_DECOMPOSITION)), _triple(uri, RDFS_LABEL, _literal(f"Decomposed into {len(goals)} research threads")), - _triple(uri, PROV_WAS_GENERATED_BY, _iri(session_uri)), + _triple(uri, PROV_WAS_DERIVED_FROM, _iri(session_uri)), ] for goal in goals: triples.append(_triple(uri, TG_SUBAGENT_GOAL, _literal(goal))) @@ -261,7 +279,7 @@ def agent_plan_triples( _triple(uri, RDF_TYPE, _iri(TG_PLAN_TYPE)), _triple(uri, RDFS_LABEL, _literal(f"Plan with {len(steps)} steps")), - _triple(uri, PROV_WAS_GENERATED_BY, _iri(session_uri)), + _triple(uri, PROV_WAS_DERIVED_FROM, _iri(session_uri)), ] for step in steps: triples.append(_triple(uri, TG_PLAN_STEP, _literal(step))) diff --git a/trustgraph-base/trustgraph/provenance/namespaces.py b/trustgraph-base/trustgraph/provenance/namespaces.py index 69134dfb..9e7fbb2d 100644 --- a/trustgraph-base/trustgraph/provenance/namespaces.py +++ b/trustgraph-base/trustgraph/provenance/namespaces.py @@ -105,6 +105,7 @@ TG_ANSWER_TYPE = TG + "Answer" # Final answer (Synthesis, Conclusion, F TG_REFLECTION_TYPE = TG + "Reflection" # Intermediate commentary (Thought, Observation) TG_THOUGHT_TYPE = TG + "Thought" # Agent reasoning TG_OBSERVATION_TYPE = TG + "Observation" # Agent tool result +TG_TOOL_USE = TG + "ToolUse" # Analysis+ToolUse mixin # Question subtypes (to distinguish retrieval mechanism) TG_GRAPH_RAG_QUESTION = TG + "GraphRagQuestion" diff --git a/trustgraph-base/trustgraph/provenance/triples.py b/trustgraph-base/trustgraph/provenance/triples.py index 407cab31..f2e85eff 100644 --- a/trustgraph-base/trustgraph/provenance/triples.py +++ b/trustgraph-base/trustgraph/provenance/triples.py @@ -353,18 +353,21 @@ def question_triples( question_uri: str, query: str, timestamp: Optional[str] = None, + parent_uri: Optional[str] = None, ) -> List[Triple]: """ - Build triples for a question activity. + Build triples for a question entity. Creates: - - Activity declaration for the question + - Entity declaration for the question - Query text and timestamp + - Optional wasDerivedFrom link to parent (for sub-traces) Args: question_uri: URI of the question (from question_uri) query: The user's query text timestamp: ISO timestamp (defaults to now) + parent_uri: Optional parent URI to link as wasDerivedFrom (for sub-traces) Returns: List of Triple objects @@ -372,8 +375,8 @@ def question_triples( if timestamp is None: timestamp = datetime.utcnow().isoformat() + "Z" - return [ - _triple(question_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + triples = [ + _triple(question_uri, RDF_TYPE, _iri(PROV_ENTITY)), _triple(question_uri, RDF_TYPE, _iri(TG_QUESTION)), _triple(question_uri, RDF_TYPE, _iri(TG_GRAPH_RAG_QUESTION)), _triple(question_uri, RDFS_LABEL, _literal("GraphRAG Question")), @@ -381,6 +384,13 @@ def question_triples( _triple(question_uri, TG_QUERY, _literal(query)), ] + if parent_uri: + triples.append( + _triple(question_uri, PROV_WAS_DERIVED_FROM, _iri(parent_uri)) + ) + + return triples + def grounding_triples( grounding_uri: str, @@ -407,7 +417,7 @@ def grounding_triples( _triple(grounding_uri, RDF_TYPE, _iri(PROV_ENTITY)), _triple(grounding_uri, RDF_TYPE, _iri(TG_GROUNDING)), _triple(grounding_uri, RDFS_LABEL, _literal("Grounding")), - _triple(grounding_uri, PROV_WAS_GENERATED_BY, _iri(question_uri)), + _triple(grounding_uri, PROV_WAS_DERIVED_FROM, _iri(question_uri)), ] for concept in concepts: @@ -575,18 +585,21 @@ def docrag_question_triples( question_uri: str, query: str, timestamp: Optional[str] = None, + parent_uri: Optional[str] = None, ) -> List[Triple]: """ - Build triples for a document RAG question activity. + Build triples for a document RAG question entity. Creates: - - Activity declaration with tg:Question type + - Entity declaration with tg:Question type - Query text and timestamp + - Optional wasDerivedFrom link to parent (for sub-traces) Args: question_uri: URI of the question (from docrag_question_uri) query: The user's query text timestamp: ISO timestamp (defaults to now) + parent_uri: Optional parent URI to link as wasDerivedFrom (for sub-traces) Returns: List of Triple objects @@ -594,8 +607,8 @@ def docrag_question_triples( if timestamp is None: timestamp = datetime.utcnow().isoformat() + "Z" - return [ - _triple(question_uri, RDF_TYPE, _iri(PROV_ACTIVITY)), + triples = [ + _triple(question_uri, RDF_TYPE, _iri(PROV_ENTITY)), _triple(question_uri, RDF_TYPE, _iri(TG_QUESTION)), _triple(question_uri, RDF_TYPE, _iri(TG_DOC_RAG_QUESTION)), _triple(question_uri, RDFS_LABEL, _literal("DocumentRAG Question")), @@ -603,6 +616,13 @@ def docrag_question_triples( _triple(question_uri, TG_QUERY, _literal(query)), ] + if parent_uri: + triples.append( + _triple(question_uri, PROV_WAS_DERIVED_FROM, _iri(parent_uri)) + ) + + return triples + def docrag_exploration_triples( exploration_uri: str, diff --git a/trustgraph-base/trustgraph/schema/services/retrieval.py b/trustgraph-base/trustgraph/schema/services/retrieval.py index d4f76655..f5ac73d3 100644 --- a/trustgraph-base/trustgraph/schema/services/retrieval.py +++ b/trustgraph-base/trustgraph/schema/services/retrieval.py @@ -18,6 +18,7 @@ class GraphRagQuery: edge_score_limit: int = 0 edge_limit: int = 0 streaming: bool = False + parent_uri: str = "" @dataclass class GraphRagResponse: diff --git a/trustgraph-cli/trustgraph/cli/invoke_agent.py b/trustgraph-cli/trustgraph/cli/invoke_agent.py index c82c78f6..1c4b757b 100644 --- a/trustgraph-cli/trustgraph/cli/invoke_agent.py +++ b/trustgraph-cli/trustgraph/cli/invoke_agent.py @@ -12,6 +12,7 @@ from trustgraph.api import ( ProvenanceEvent, Question, Analysis, + Observation, Conclusion, Decomposition, Finding, @@ -206,13 +207,13 @@ def question_explainable( print(f" Time: {entity.timestamp}", file=sys.stderr) elif isinstance(entity, Analysis): - print(f"\n [iteration] {prov_id}", file=sys.stderr) - if entity.action: - print(f" Action: {entity.action}", file=sys.stderr) - if entity.thought: - print(f" Thought: {entity.thought}", file=sys.stderr) - if entity.observation: - print(f" Observation: {entity.observation}", file=sys.stderr) + action_label = f": {entity.action}" if entity.action else "" + print(f"\n [analysis{action_label}] {prov_id}", file=sys.stderr) + + elif isinstance(entity, Observation): + print(f"\n [observation] {prov_id}", file=sys.stderr) + if entity.document: + print(f" Document: {entity.document}", file=sys.stderr) elif isinstance(entity, Decomposition): print(f"\n [decompose] {prov_id}", file=sys.stderr) diff --git a/trustgraph-cli/trustgraph/cli/show_explain_trace.py b/trustgraph-cli/trustgraph/cli/show_explain_trace.py index c4da0d5a..90c0e452 100644 --- a/trustgraph-cli/trustgraph/cli/show_explain_trace.py +++ b/trustgraph-cli/trustgraph/cli/show_explain_trace.py @@ -26,6 +26,7 @@ from trustgraph.api import ( Focus, Synthesis, Analysis, + Observation, Conclusion, Decomposition, Finding, @@ -379,11 +380,13 @@ def print_agent_text(trace, explain_client, api, user): print(f" {line}") except Exception: print(f" Arguments: {step.arguments}") + print() - obs = step.observation or 'N/A' - if obs and len(obs) > 200: - obs = obs[:200] + "... [truncated]" - print(f" Observation: {obs}") + elif isinstance(step, Observation): + print("--- Observation ---") + _print_document_content( + explain_client, api, user, step.document, "Content", + ) print() elif isinstance(step, Synthesis): @@ -437,6 +440,12 @@ def trace_to_dict(trace, trace_type): "step": step.step, "document": step.document, } + elif isinstance(step, Observation): + return { + "type": "observation", + "id": step.uri, + "document": step.document, + } elif isinstance(step, Analysis): return { "type": "analysis", @@ -444,7 +453,6 @@ def trace_to_dict(trace, trace_type): "action": step.action, "arguments": step.arguments, "thought": step.thought, - "observation": step.observation, } elif isinstance(step, Synthesis): return { diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py index 4faa7ce6..f999b132 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py @@ -27,6 +27,7 @@ from trustgraph.provenance import ( agent_synthesis_uri, agent_session_triples, agent_iteration_triples, + agent_observation_triples, agent_final_triples, agent_decomposition_triples, agent_finding_triples, @@ -46,9 +47,12 @@ logger = logging.getLogger(__name__) class UserAwareContext: """Wraps flow interface to inject user context for tools that need it.""" - def __init__(self, flow, user): + def __init__(self, flow, user, respond=None, streaming=False): self._flow = flow self._user = user + self.respond = respond + self.streaming = streaming + self.current_explain_uri = None def __call__(self, service_name): client = self._flow(service_name) @@ -120,9 +124,9 @@ class PatternBase: current_state=getattr(request, 'state', None), ) - def make_context(self, flow, user): + def make_context(self, flow, user, respond=None, streaming=False): """Create a user-aware context wrapper.""" - return UserAwareContext(flow, user) + return UserAwareContext(flow, user, respond=respond, streaming=streaming) def build_history(self, request): """Convert AgentStep history into Action objects.""" @@ -140,7 +144,7 @@ class PatternBase: # ---- Streaming callbacks ------------------------------------------------ - def make_think_callback(self, respond, streaming): + def make_think_callback(self, respond, streaming, message_id=""): """Create the think callback for streaming/non-streaming.""" async def think(x, is_final=False): logger.debug(f"Think: {x} (is_final={is_final})") @@ -150,6 +154,7 @@ class PatternBase: content=x, end_of_message=is_final, end_of_dialog=False, + message_id=message_id, ) else: r = AgentResponse( @@ -157,11 +162,12 @@ class PatternBase: content=x, end_of_message=True, end_of_dialog=False, + message_id=message_id, ) await respond(r) return think - def make_observe_callback(self, respond, streaming): + def make_observe_callback(self, respond, streaming, message_id=""): """Create the observe callback for streaming/non-streaming.""" async def observe(x, is_final=False): logger.debug(f"Observe: {x} (is_final={is_final})") @@ -171,6 +177,7 @@ class PatternBase: content=x, end_of_message=is_final, end_of_dialog=False, + message_id=message_id, ) else: r = AgentResponse( @@ -178,6 +185,7 @@ class PatternBase: content=x, end_of_message=True, end_of_dialog=False, + message_id=message_id, ) await respond(r) return observe @@ -223,23 +231,23 @@ class PatternBase: )) logger.debug(f"Emitted session triples for {session_uri}") - if streaming: - await respond(AgentResponse( - chunk_type="explain", - content="", - explain_id=session_uri, - explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=session_uri, + explain_graph=GRAPH_RETRIEVAL, + )) async def emit_iteration_triples(self, flow, session_id, iteration_num, session_uri, act, request, respond, streaming): - """Emit provenance triples for an iteration and save to librarian.""" + """Emit provenance triples for an iteration (Analysis+ToolUse).""" iteration_uri = agent_iteration_uri(session_id, iteration_num) if iteration_num > 1: + # Chain through previous Observation (last entity in prior cycle) iter_question_uri = None - iter_previous_uri = agent_iteration_uri(session_id, iteration_num - 1) + iter_previous_uri = agent_observation_uri(session_id, iteration_num - 1) else: iter_question_uri = session_uri iter_previous_uri = None @@ -261,25 +269,7 @@ class PatternBase: logger.warning(f"Failed to save thought to librarian: {e}") thought_doc_id = None - # Save observation to librarian - observation_doc_id = None - if act.observation: - observation_doc_id = ( - f"urn:trustgraph:agent:{session_id}/i{iteration_num}/observation" - ) - try: - await self.processor.save_answer_content( - doc_id=observation_doc_id, - user=request.user, - content=act.observation, - title=f"Agent Observation: {act.name}", - ) - except Exception as e: - logger.warning(f"Failed to save observation to librarian: {e}") - observation_doc_id = None - thought_entity_uri = agent_thought_uri(session_id, iteration_num) - observation_entity_uri = agent_observation_uri(session_id, iteration_num) iter_triples = set_graph( agent_iteration_triples( @@ -290,8 +280,6 @@ class PatternBase: arguments=act.arguments, thought_uri=thought_entity_uri if thought_doc_id else None, thought_document_id=thought_doc_id, - observation_uri=observation_entity_uri if observation_doc_id else None, - observation_document_id=observation_doc_id, ), GRAPH_RETRIEVAL, ) @@ -305,13 +293,60 @@ class PatternBase: )) logger.debug(f"Emitted iteration triples for {iteration_uri}") - if streaming: - await respond(AgentResponse( - chunk_type="explain", - content="", - explain_id=iteration_uri, - explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=iteration_uri, + explain_graph=GRAPH_RETRIEVAL, + )) + + async def emit_observation_triples(self, flow, session_id, iteration_num, + observation_text, request, respond): + """Emit provenance triples for a standalone Observation entity.""" + iteration_uri = agent_iteration_uri(session_id, iteration_num) + observation_entity_uri = agent_observation_uri(session_id, iteration_num) + + # Save observation to librarian + observation_doc_id = None + if observation_text: + observation_doc_id = ( + f"urn:trustgraph:agent:{session_id}/i{iteration_num}/observation" + ) + try: + await self.processor.save_answer_content( + doc_id=observation_doc_id, + user=request.user, + content=observation_text, + title=f"Agent Observation", + ) + except Exception as e: + logger.warning(f"Failed to save observation to librarian: {e}") + observation_doc_id = None + + obs_triples = set_graph( + agent_observation_triples( + observation_entity_uri, + iteration_uri, + document_id=observation_doc_id, + ), + GRAPH_RETRIEVAL, + ) + await flow("explainability").send(Triples( + metadata=Metadata( + id=observation_entity_uri, + user=request.user, + collection=getattr(request, 'collection', 'default'), + ), + triples=obs_triples, + )) + logger.debug(f"Emitted observation triples for {observation_entity_uri}") + + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=observation_entity_uri, + explain_graph=GRAPH_RETRIEVAL, + )) async def emit_final_triples(self, flow, session_id, iteration_num, session_uri, answer_text, request, respond, @@ -320,8 +355,9 @@ class PatternBase: final_uri = agent_final_uri(session_id) if iteration_num > 1: + # Chain through last Observation (last entity in prior cycle) final_question_uri = None - final_previous_uri = agent_iteration_uri(session_id, iteration_num - 1) + final_previous_uri = agent_observation_uri(session_id, iteration_num - 1) else: final_question_uri = session_uri final_previous_uri = None @@ -361,13 +397,12 @@ class PatternBase: )) logger.debug(f"Emitted final triples for {final_uri}") - if streaming: - await respond(AgentResponse( - chunk_type="explain", - content="", - explain_id=final_uri, - explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=final_uri, + explain_graph=GRAPH_RETRIEVAL, + )) # ---- Orchestrator provenance helpers ------------------------------------ @@ -385,11 +420,10 @@ class PatternBase: metadata=Metadata(id=uri, user=user, collection=collection), triples=triples, )) - if streaming: - await respond(AgentResponse( - chunk_type="explain", content="", - explain_id=uri, explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) async def emit_finding_triples( self, flow, session_id, index, goal, answer_text, user, collection, @@ -418,11 +452,10 @@ class PatternBase: metadata=Metadata(id=uri, user=user, collection=collection), triples=triples, )) - if streaming: - await respond(AgentResponse( - chunk_type="explain", content="", - explain_id=uri, explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) async def emit_plan_triples( self, flow, session_id, session_uri, steps, user, collection, @@ -438,11 +471,10 @@ class PatternBase: metadata=Metadata(id=uri, user=user, collection=collection), triples=triples, )) - if streaming: - await respond(AgentResponse( - chunk_type="explain", content="", - explain_id=uri, explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) async def emit_step_result_triples( self, flow, session_id, index, goal, answer_text, user, collection, @@ -471,11 +503,10 @@ class PatternBase: metadata=Metadata(id=uri, user=user, collection=collection), triples=triples, )) - if streaming: - await respond(AgentResponse( - chunk_type="explain", content="", - explain_id=uri, explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) async def emit_synthesis_triples( self, flow, session_id, previous_uri, answer_text, user, collection, @@ -503,11 +534,10 @@ class PatternBase: metadata=Metadata(id=uri, user=user, collection=collection), triples=triples, )) - if streaming: - await respond(AgentResponse( - chunk_type="explain", content="", - explain_id=uri, explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) # ---- Response helpers --------------------------------------------------- diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py index d6abb058..4775212e 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py @@ -11,7 +11,11 @@ import uuid from ... schema import AgentRequest, AgentResponse, AgentStep, PlanStep - +from trustgraph.provenance import ( + agent_step_result_uri as make_step_result_uri, + agent_thought_uri, + agent_observation_uri, +) from . pattern_base import PatternBase @@ -101,7 +105,10 @@ class PlanThenExecutePattern(PatternBase): tools = self.filter_tools(self.processor.agent.tools, request) framing = getattr(request, 'framing', '') - context = self.make_context(flow, request.user) + context = self.make_context( + flow, request.user, + respond=respond, streaming=streaming, + ) client = context("prompt-request") # Use the plan-create prompt template @@ -198,8 +205,11 @@ class PlanThenExecutePattern(PatternBase): logger.info(f"Executing plan step {pending_idx}: {goal}") - think = self.make_think_callback(respond, streaming) - observe = self.make_observe_callback(respond, streaming) + thought_msg_id = agent_thought_uri(session_id, iteration_num) + observation_msg_id = agent_observation_uri(session_id, iteration_num) + + think = self.make_think_callback(respond, streaming, message_id=thought_msg_id) + observe = self.make_observe_callback(respond, streaming, message_id=observation_msg_id) # Gather results from dependencies previous_results = [] @@ -216,7 +226,16 @@ class PlanThenExecutePattern(PatternBase): }) tools = self.filter_tools(self.processor.agent.tools, request) - context = self.make_context(flow, request.user) + context = self.make_context( + flow, request.user, + respond=respond, streaming=streaming, + ) + + # Set current explain URI so tools can link sub-traces + context.current_explain_uri = make_step_result_uri( + session_id, pending_idx, + ) + client = context("prompt-request") # Single-shot: ask LLM which tool + arguments to use for this goal @@ -316,7 +335,10 @@ class PlanThenExecutePattern(PatternBase): think = self.make_think_callback(respond, streaming) framing = getattr(request, 'framing', '') - context = self.make_context(flow, request.user) + context = self.make_context( + flow, request.user, + respond=respond, streaming=streaming, + ) client = context("prompt-request") # Use the plan-synthesise prompt template @@ -342,8 +364,7 @@ class PlanThenExecutePattern(PatternBase): ) # Emit synthesis provenance (links back to last step result) - from trustgraph.provenance import agent_step_result_uri - last_step_uri = agent_step_result_uri(session_id, len(plan) - 1) + last_step_uri = make_step_result_uri(session_id, len(plan) - 1) await self.emit_synthesis_triples( flow, session_id, last_step_uri, response_text, request.user, collection, respond, streaming, diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py index 32261809..f6af65c2 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py @@ -11,6 +11,12 @@ import uuid from ... schema import AgentRequest, AgentResponse, AgentStep +from trustgraph.provenance import ( + agent_iteration_uri, + agent_thought_uri, + agent_observation_uri, +) + from ..react.agent_manager import AgentManager from ..react.types import Action, Final from ..tool_filter import get_next_state @@ -51,9 +57,13 @@ class ReactPattern(PatternBase): if len(history) >= self.processor.max_iterations: raise RuntimeError("Too many agent iterations") + # Compute URIs upfront for message_id + thought_msg_id = agent_thought_uri(session_id, iteration_num) + observation_msg_id = agent_observation_uri(session_id, iteration_num) + # Build callbacks - think = self.make_think_callback(respond, streaming) - observe = self.make_observe_callback(respond, streaming) + think = self.make_think_callback(respond, streaming, message_id=thought_msg_id) + observe = self.make_observe_callback(respond, streaming, message_id=observation_msg_id) answer_cb = self.make_answer_callback(respond, streaming) # Filter tools @@ -75,7 +85,22 @@ class ReactPattern(PatternBase): additional_context=additional_context, ) - context = self.make_context(flow, request.user) + context = self.make_context( + flow, request.user, + respond=respond, streaming=streaming, + ) + + # Set current explain URI so tools can link sub-traces + context.current_explain_uri = agent_iteration_uri( + session_id, iteration_num, + ) + + # Callback: emit Analysis+ToolUse triples before tool executes + async def on_action(act): + await self.emit_iteration_triples( + flow, session_id, iteration_num, session_uri, + act, request, respond, streaming, + ) act = await temp_agent.react( question=request.question, @@ -85,6 +110,7 @@ class ReactPattern(PatternBase): answer=answer_cb, context=context, streaming=streaming, + on_action=on_action, ) logger.debug(f"Action: {act}") @@ -110,10 +136,10 @@ class ReactPattern(PatternBase): ) return - # Not final — emit iteration provenance and send next request - await self.emit_iteration_triples( - flow, session_id, iteration_num, session_uri, - act, request, respond, streaming, + # Emit observation provenance after tool execution + await self.emit_observation_triples( + flow, session_id, iteration_num, + act.observation, request, respond, ) history.append(act) diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py index 8588e400..951063cf 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py @@ -86,7 +86,10 @@ class SupervisorPattern(PatternBase): tools = self.filter_tools(self.processor.agent.tools, request) - context = self.make_context(flow, request.user) + context = self.make_context( + flow, request.user, + respond=respond, streaming=streaming, + ) client = context("prompt-request") # Use the supervisor-decompose prompt template @@ -182,7 +185,10 @@ class SupervisorPattern(PatternBase): logger.warning("Synthesis called with no subagent results") subagent_results = {"(no results)": "No subagent results available"} - context = self.make_context(flow, request.user) + context = self.make_context( + flow, request.user, + respond=respond, streaming=streaming, + ) client = context("prompt-request") await think("Synthesising final answer from sub-agent results", is_final=True) diff --git a/trustgraph-flow/trustgraph/agent/react/agent_manager.py b/trustgraph-flow/trustgraph/agent/react/agent_manager.py index 18598b38..e86a2d6c 100644 --- a/trustgraph-flow/trustgraph/agent/react/agent_manager.py +++ b/trustgraph-flow/trustgraph/agent/react/agent_manager.py @@ -291,7 +291,8 @@ class AgentManager: logger.error(f"Response was: {response_text}") raise RuntimeError(f"Failed to parse agent response: {e}") - async def react(self, question, history, think, observe, context, streaming=False, answer=None): + async def react(self, question, history, think, observe, context, + streaming=False, answer=None, on_action=None): act = await self.reason( question = question, @@ -325,6 +326,10 @@ class AgentManager: else: raise RuntimeError(f"No action for {act.name}!") + # Notify caller before tool execution (for provenance) + if on_action: + await on_action(act) + resp = await action.implementation(context).invoke( **act.arguments ) diff --git a/trustgraph-flow/trustgraph/agent/react/service.py b/trustgraph-flow/trustgraph/agent/react/service.py index 6c06f71a..af088ec9 100755 --- a/trustgraph-flow/trustgraph/agent/react/service.py +++ b/trustgraph-flow/trustgraph/agent/react/service.py @@ -36,6 +36,7 @@ from trustgraph.provenance import ( agent_final_uri, agent_session_triples, agent_iteration_triples, + agent_observation_triples, agent_final_triples, set_graph, GRAPH_RETRIEVAL, @@ -465,13 +466,12 @@ class Processor(AgentService): logger.debug(f"Emitted session triples for {session_uri}") # Send explain event for session - if streaming: - await respond(AgentResponse( - chunk_type="explain", - content="", - explain_id=session_uri, - explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=session_uri, + explain_graph=GRAPH_RETRIEVAL, + )) logger.info(f"Question: {request.question}") @@ -480,6 +480,9 @@ class Processor(AgentService): logger.debug(f"History: {history}") + thought_msg_id = agent_thought_uri(session_id, iteration_num) + observation_msg_id = agent_observation_uri(session_id, iteration_num) + async def think(x, is_final=False): logger.debug(f"Think: {x} (is_final={is_final})") @@ -490,6 +493,7 @@ class Processor(AgentService): content=x, end_of_message=is_final, end_of_dialog=False, + message_id=thought_msg_id, ) else: r = AgentResponse( @@ -497,6 +501,7 @@ class Processor(AgentService): content=x, end_of_message=True, end_of_dialog=False, + message_id=thought_msg_id, ) await respond(r) @@ -511,6 +516,7 @@ class Processor(AgentService): content=x, end_of_message=is_final, end_of_dialog=False, + message_id=observation_msg_id, ) else: r = AgentResponse( @@ -518,6 +524,7 @@ class Processor(AgentService): content=x, end_of_message=True, end_of_dialog=False, + message_id=observation_msg_id, ) await respond(r) @@ -572,6 +579,62 @@ class Processor(AgentService): client._current_user = self._user return client + # Callback: emit Analysis+ToolUse triples before tool executes + async def on_action(act_decision): + iter_uri = agent_iteration_uri(session_id, iteration_num) + if iteration_num > 1: + iter_q_uri = None + iter_prev_uri = agent_observation_uri(session_id, iteration_num - 1) + else: + iter_q_uri = session_uri + iter_prev_uri = None + + # Save thought to librarian + t_doc_id = None + if act_decision.thought: + t_doc_id = f"urn:trustgraph:agent:{session_id}/i{iteration_num}/thought" + try: + await self.save_answer_content( + doc_id=t_doc_id, + user=request.user, + content=act_decision.thought, + title=f"Agent Thought: {act_decision.name}", + ) + except Exception as e: + logger.warning(f"Failed to save thought to librarian: {e}") + t_doc_id = None + + t_entity_uri = agent_thought_uri(session_id, iteration_num) + + iter_triples = set_graph( + agent_iteration_triples( + iter_uri, + question_uri=iter_q_uri, + previous_uri=iter_prev_uri, + action=act_decision.name, + arguments=act_decision.arguments, + thought_uri=t_entity_uri if t_doc_id else None, + thought_document_id=t_doc_id, + ), + GRAPH_RETRIEVAL + ) + await flow("explainability").send(Triples( + metadata=Metadata( + id=iter_uri, + user=request.user, + collection=collection, + ), + triples=iter_triples, + )) + logger.debug(f"Emitted iteration triples for {iter_uri}") + + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=iter_uri, + explain_graph=GRAPH_RETRIEVAL, + )) + act = await temp_agent.react( question = request.question, history = history, @@ -580,6 +643,7 @@ class Processor(AgentService): answer = answer, context = UserAwareContext(flow, request.user), streaming = streaming, + on_action = on_action, ) logger.debug(f"Action: {act}") @@ -595,10 +659,10 @@ class Processor(AgentService): # Emit final answer provenance triples final_uri = agent_final_uri(session_id) - # No iterations: link to question; otherwise: link to last iteration + # No iterations: link to question; otherwise: link to last observation if iteration_num > 1: final_question_uri = None - final_previous_uri = agent_iteration_uri(session_id, iteration_num - 1) + final_previous_uri = agent_observation_uri(session_id, iteration_num - 1) else: final_question_uri = session_uri final_previous_uri = None @@ -639,13 +703,12 @@ class Processor(AgentService): logger.debug(f"Emitted final triples for {final_uri}") # Send explain event for conclusion - if streaming: - await respond(AgentResponse( - chunk_type="explain", - content="", - explain_id=final_uri, - explain_graph=GRAPH_RETRIEVAL, - )) + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=final_uri, + explain_graph=GRAPH_RETRIEVAL, + )) if streaming: # End-of-dialog marker — answer chunks already sent via callback @@ -671,33 +734,9 @@ class Processor(AgentService): logger.debug("Send next...") - # Emit iteration provenance triples + # Emit standalone observation provenance (iteration was emitted in on_action) iteration_uri = agent_iteration_uri(session_id, iteration_num) - # First iteration links to question, subsequent to previous - if iteration_num > 1: - iter_question_uri = None - iter_previous_uri = agent_iteration_uri(session_id, iteration_num - 1) - else: - iter_question_uri = session_uri - iter_previous_uri = None - - # Save thought to librarian - thought_doc_id = None - if act.thought: - thought_doc_id = f"urn:trustgraph:agent:{session_id}/i{iteration_num}/thought" - try: - await self.save_answer_content( - doc_id=thought_doc_id, - user=request.user, - content=act.thought, - title=f"Agent Thought: {act.name}", - ) - logger.debug(f"Saved thought to librarian: {thought_doc_id}") - except Exception as e: - logger.warning(f"Failed to save thought to librarian: {e}") - thought_doc_id = None - - # Save observation to librarian + observation_entity_uri = agent_observation_uri(session_id, iteration_num) observation_doc_id = None if act.observation: observation_doc_id = f"urn:trustgraph:agent:{session_id}/i{iteration_num}/observation" @@ -706,48 +745,38 @@ class Processor(AgentService): doc_id=observation_doc_id, user=request.user, content=act.observation, - title=f"Agent Observation: {act.name}", + title=f"Agent Observation", ) logger.debug(f"Saved observation to librarian: {observation_doc_id}") except Exception as e: logger.warning(f"Failed to save observation to librarian: {e}") observation_doc_id = None - thought_entity_uri = agent_thought_uri(session_id, iteration_num) - observation_entity_uri = agent_observation_uri(session_id, iteration_num) - - iter_triples = set_graph( - agent_iteration_triples( + obs_triples = set_graph( + agent_observation_triples( + observation_entity_uri, iteration_uri, - question_uri=iter_question_uri, - previous_uri=iter_previous_uri, - action=act.name, - arguments=act.arguments, - thought_uri=thought_entity_uri if thought_doc_id else None, - thought_document_id=thought_doc_id, - observation_uri=observation_entity_uri if observation_doc_id else None, - observation_document_id=observation_doc_id, + document_id=observation_doc_id, ), GRAPH_RETRIEVAL ) await flow("explainability").send(Triples( metadata=Metadata( - id=iteration_uri, + id=observation_entity_uri, user=request.user, collection=collection, ), - triples=iter_triples, + triples=obs_triples, )) - logger.debug(f"Emitted iteration triples for {iteration_uri}") + logger.debug(f"Emitted observation triples for {observation_entity_uri}") - # Send explain event for iteration - if streaming: - await respond(AgentResponse( - chunk_type="explain", - content="", - explain_id=iteration_uri, - explain_graph=GRAPH_RETRIEVAL, - )) + # Send explain event for observation + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=observation_entity_uri, + explain_graph=GRAPH_RETRIEVAL, + )) history.append(act) diff --git a/trustgraph-flow/trustgraph/agent/react/tools.py b/trustgraph-flow/trustgraph/agent/react/tools.py index 441c8f38..86b515e1 100644 --- a/trustgraph-flow/trustgraph/agent/react/tools.py +++ b/trustgraph-flow/trustgraph/agent/react/tools.py @@ -12,7 +12,7 @@ class KnowledgeQueryImpl: def __init__(self, context, collection=None): self.context = context self.collection = collection - + @staticmethod def get_arguments(): return [ @@ -22,13 +22,39 @@ class KnowledgeQueryImpl: description="The question to ask the knowledge base" ) ] - + async def invoke(self, **arguments): client = self.context("graph-rag-request") logger.debug("Graph RAG question...") + + # Build explain_callback to forward sub-trace explain events + # to the agent's response stream + explain_callback = None + parent_uri = "" + + respond = getattr(self.context, 'respond', None) + streaming = getattr(self.context, 'streaming', False) + current_uri = getattr(self.context, 'current_explain_uri', None) + + if respond: + from ... schema import AgentResponse + + async def explain_callback(explain_id, explain_graph): + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=explain_id, + explain_graph=explain_graph, + )) + + if current_uri: + parent_uri = current_uri + return await client.rag( arguments.get("question"), - collection=self.collection if self.collection else "default" + collection=self.collection if self.collection else "default", + explain_callback=explain_callback, + parent_uri=parent_uri, ) # This tool implementation knows how to do text completion. This uses diff --git a/trustgraph-flow/trustgraph/retrieval/graph_rag/graph_rag.py b/trustgraph-flow/trustgraph/retrieval/graph_rag/graph_rag.py index ea9326a4..704613c6 100644 --- a/trustgraph-flow/trustgraph/retrieval/graph_rag/graph_rag.py +++ b/trustgraph-flow/trustgraph/retrieval/graph_rag/graph_rag.py @@ -555,6 +555,7 @@ class GraphRag: streaming = False, chunk_callback = None, explain_callback = None, save_answer_callback = None, + parent_uri = "", ): """ Execute a GraphRAG query with real-time explainability tracking. @@ -593,7 +594,10 @@ class GraphRag: # Emit question explainability immediately if explain_callback: q_triples = set_graph( - question_triples(q_uri, query, timestamp), + question_triples( + q_uri, query, timestamp, + parent_uri=parent_uri or None, + ), GRAPH_RETRIEVAL ) await explain_callback(q_triples, q_uri) diff --git a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py index c3244b90..85a7491e 100755 --- a/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py +++ b/trustgraph-flow/trustgraph/retrieval/graph_rag/rag.py @@ -342,6 +342,7 @@ class Processor(FlowProcessor): chunk_callback = send_chunk, explain_callback = send_explainability, save_answer_callback = save_answer, + parent_uri = v.parent_uri, ) else: @@ -355,6 +356,7 @@ class Processor(FlowProcessor): edge_limit = edge_limit, explain_callback = send_explainability, save_answer_callback = save_answer, + parent_uri = v.parent_uri, ) # Send chunk with response