diff --git a/trustgraph-base/trustgraph/api/__init__.py b/trustgraph-base/trustgraph/api/__init__.py index dc1405ac..e956db65 100644 --- a/trustgraph-base/trustgraph/api/__init__.py +++ b/trustgraph-base/trustgraph/api/__init__.py @@ -82,6 +82,10 @@ from .explainability import ( Reflection, Analysis, Conclusion, + Decomposition, + Finding, + Plan, + StepResult, EdgeSelection, wire_triples_to_tuples, extract_term_value, diff --git a/trustgraph-base/trustgraph/api/explainability.py b/trustgraph-base/trustgraph/api/explainability.py index 1c986efb..7b406a59 100644 --- a/trustgraph-base/trustgraph/api/explainability.py +++ b/trustgraph-base/trustgraph/api/explainability.py @@ -44,6 +44,16 @@ TG_GRAPH_RAG_QUESTION = TG + "GraphRagQuestion" TG_DOC_RAG_QUESTION = TG + "DocRagQuestion" TG_AGENT_QUESTION = TG + "AgentQuestion" +# Orchestrator entity types +TG_DECOMPOSITION = TG + "Decomposition" +TG_FINDING = TG + "Finding" +TG_PLAN_TYPE = TG + "Plan" +TG_STEP_RESULT = TG + "StepResult" + +# Orchestrator predicates +TG_SUBAGENT_GOAL = TG + "subagentGoal" +TG_PLAN_STEP = TG + "planStep" + # PROV-O predicates PROV = "http://www.w3.org/ns/prov#" PROV_STARTED_AT_TIME = PROV + "startedAtTime" @@ -82,6 +92,14 @@ class ExplainEntity: return Exploration.from_triples(uri, triples) elif TG_FOCUS in types: return Focus.from_triples(uri, triples) + elif TG_DECOMPOSITION in types: + return Decomposition.from_triples(uri, triples) + elif TG_FINDING in types: + return Finding.from_triples(uri, triples) + elif TG_PLAN_TYPE in types: + return Plan.from_triples(uri, triples) + elif TG_STEP_RESULT in types: + return StepResult.from_triples(uri, triples) elif TG_SYNTHESIS in types: return Synthesis.from_triples(uri, triples) elif TG_REFLECTION_TYPE in types: @@ -314,6 +332,70 @@ class Conclusion(ExplainEntity): ) +@dataclass +class Decomposition(ExplainEntity): + """Decomposition entity - supervisor broke question into sub-goals.""" + goals: List[str] = field(default_factory=list) + + @classmethod + def from_triples(cls, uri: str, triples: List[Tuple[str, str, Any]]) -> "Decomposition": + goals = [] + for s, p, o in triples: + if p == TG_SUBAGENT_GOAL: + goals.append(o) + return cls(uri=uri, entity_type="decomposition", goals=goals) + + +@dataclass +class Finding(ExplainEntity): + """Finding entity - a subagent's result.""" + goal: str = "" + document: str = "" + + @classmethod + def from_triples(cls, uri: str, triples: List[Tuple[str, str, Any]]) -> "Finding": + goal = "" + document = "" + for s, p, o in triples: + if p == TG_SUBAGENT_GOAL: + goal = o + elif p == TG_DOCUMENT: + document = o + return cls(uri=uri, entity_type="finding", goal=goal, document=document) + + +@dataclass +class Plan(ExplainEntity): + """Plan entity - a structured plan of steps.""" + steps: List[str] = field(default_factory=list) + + @classmethod + def from_triples(cls, uri: str, triples: List[Tuple[str, str, Any]]) -> "Plan": + steps = [] + for s, p, o in triples: + if p == TG_PLAN_STEP: + steps.append(o) + return cls(uri=uri, entity_type="plan", steps=steps) + + +@dataclass +class StepResult(ExplainEntity): + """StepResult entity - a plan step's result.""" + step: str = "" + document: str = "" + + @classmethod + def from_triples(cls, uri: str, triples: List[Tuple[str, str, Any]]) -> "StepResult": + step = "" + document = "" + for s, p, o in triples: + if p == TG_PLAN_STEP: + step = o + elif p == TG_DOCUMENT: + document = o + return cls(uri=uri, entity_type="step-result", step=step, document=document) + + def parse_edge_selection_triples(triples: List[Tuple[str, str, Any]]) -> EdgeSelection: """Parse triples for an edge selection entity.""" uri = triples[0][0] if triples else "" @@ -895,7 +977,10 @@ class ExplainabilityClient: """ Fetch the complete Agent trace starting from a session URI. - Follows the provenance chain: Question -> Analysis(s) -> Conclusion + Follows the provenance chain for all patterns: + - ReAct: Question -> Analysis(s) -> Conclusion + - Supervisor: Question -> Decomposition -> Finding(s) -> Synthesis + - Plan-then-Execute: Question -> Plan -> StepResult(s) -> Synthesis Args: session_uri: The agent session/question URI @@ -906,14 +991,15 @@ class ExplainabilityClient: max_content: Maximum content length for conclusion Returns: - Dict with question, iterations (Analysis list), conclusion entities + Dict with question, steps (mixed entity list), conclusion/synthesis """ if graph is None: graph = "urn:graph:retrieval" trace = { "question": None, - "iterations": [], + "steps": [], + "iterations": [], # Backwards compatibility for ReAct "conclusion": None, } @@ -923,64 +1009,79 @@ class ExplainabilityClient: return trace trace["question"] = question - # Follow the chain: wasGeneratedBy for first hop, wasDerivedFrom after - current_uri = session_uri - is_first = True - max_iterations = 50 # Safety limit + # Follow the provenance chain from the question + self._follow_provenance_chain( + session_uri, trace, graph, user, collection, + is_first=True, max_depth=50, + ) - for _ in range(max_iterations): - # First hop uses wasGeneratedBy (entity←activity), - # subsequent hops use wasDerivedFrom (entity←entity) - if is_first: - derived_triples = self.flow.triples_query( - p=PROV_WAS_GENERATED_BY, - o=current_uri, - g=graph, - user=user, - collection=collection, - limit=10 - ) - # Fall back to wasDerivedFrom for backwards compatibility - if not derived_triples: - derived_triples = self.flow.triples_query( - p=PROV_WAS_DERIVED_FROM, - o=current_uri, - g=graph, - user=user, - collection=collection, - limit=10 - ) - is_first = False - else: + # Backwards compat: populate iterations from steps + trace["iterations"] = [ + s for s in trace["steps"] if isinstance(s, Analysis) + ] + + return trace + + def _follow_provenance_chain( + self, current_uri, trace, graph, user, collection, + is_first=False, max_depth=50, + ): + """Recursively follow the provenance chain, handling branches.""" + if max_depth <= 0: + return + + # Find entities derived from current_uri + if is_first: + derived_triples = self.flow.triples_query( + p=PROV_WAS_GENERATED_BY, + o=current_uri, + g=graph, user=user, collection=collection, + limit=20 + ) + if not derived_triples: derived_triples = self.flow.triples_query( p=PROV_WAS_DERIVED_FROM, o=current_uri, - g=graph, - user=user, - collection=collection, - limit=10 + g=graph, user=user, collection=collection, + limit=20 ) + else: + derived_triples = self.flow.triples_query( + p=PROV_WAS_DERIVED_FROM, + o=current_uri, + g=graph, user=user, collection=collection, + limit=20 + ) - if not derived_triples: - break + if not derived_triples: + return - derived_uri = extract_term_value(derived_triples[0].get("s", {})) + derived_uris = [ + extract_term_value(t.get("s", {})) + for t in derived_triples + ] + + for derived_uri in derived_uris: if not derived_uri: - break + continue entity = self.fetch_entity(derived_uri, graph, user, collection) + if entity is None: + continue - if isinstance(entity, Analysis): - trace["iterations"].append(entity) - current_uri = derived_uri - elif isinstance(entity, Conclusion): + if isinstance(entity, (Analysis, Decomposition, Finding, + Plan, StepResult)): + trace["steps"].append(entity) + + # Continue following from this entity + self._follow_provenance_chain( + derived_uri, trace, graph, user, collection, + max_depth=max_depth - 1, + ) + + elif isinstance(entity, (Conclusion, Synthesis)): + trace["steps"].append(entity) trace["conclusion"] = entity - break - else: - # Unknown entity type, stop - break - - return trace def list_sessions( self, @@ -1082,7 +1183,7 @@ class ExplainabilityClient: for child_uri in all_child_uris: entity = self.fetch_entity(child_uri, graph, user, collection) - if isinstance(entity, Analysis): + if isinstance(entity, (Analysis, Decomposition, Plan)): return "agent" if isinstance(entity, Exploration): return "graphrag" diff --git a/trustgraph-base/trustgraph/provenance/__init__.py b/trustgraph-base/trustgraph/provenance/__init__.py index ac52c5e5..304f17a7 100644 --- a/trustgraph-base/trustgraph/provenance/__init__.py +++ b/trustgraph-base/trustgraph/provenance/__init__.py @@ -53,6 +53,12 @@ from . uris import ( agent_thought_uri, agent_observation_uri, agent_final_uri, + # Orchestrator provenance URIs + agent_decomposition_uri, + agent_finding_uri, + agent_plan_uri, + agent_step_result_uri, + agent_synthesis_uri, # Document RAG provenance URIs docrag_question_uri, docrag_grounding_uri, @@ -94,6 +100,9 @@ from . namespaces import ( TG_GRAPH_RAG_QUESTION, TG_DOC_RAG_QUESTION, TG_AGENT_QUESTION, # Agent provenance predicates TG_THOUGHT, TG_ACTION, TG_ARGUMENTS, TG_OBSERVATION, + TG_SUBAGENT_GOAL, TG_PLAN_STEP, + # Orchestrator entity types + TG_DECOMPOSITION, TG_FINDING, TG_PLAN_TYPE, TG_STEP_RESULT, # Document reference predicate TG_DOCUMENT, # Named graphs @@ -124,6 +133,12 @@ from . agent import ( agent_session_triples, agent_iteration_triples, agent_final_triples, + # Orchestrator provenance triple builders + agent_decomposition_triples, + agent_finding_triples, + agent_plan_triples, + agent_step_result_triples, + agent_synthesis_triples, ) # Vocabulary bootstrap @@ -159,6 +174,12 @@ __all__ = [ "agent_thought_uri", "agent_observation_uri", "agent_final_uri", + # Orchestrator provenance URIs + "agent_decomposition_uri", + "agent_finding_uri", + "agent_plan_uri", + "agent_step_result_uri", + "agent_synthesis_uri", # Document RAG provenance URIs "docrag_question_uri", "docrag_grounding_uri", @@ -193,6 +214,9 @@ __all__ = [ "TG_GRAPH_RAG_QUESTION", "TG_DOC_RAG_QUESTION", "TG_AGENT_QUESTION", # Agent provenance predicates "TG_THOUGHT", "TG_ACTION", "TG_ARGUMENTS", "TG_OBSERVATION", + "TG_SUBAGENT_GOAL", "TG_PLAN_STEP", + # Orchestrator entity types + "TG_DECOMPOSITION", "TG_FINDING", "TG_PLAN_TYPE", "TG_STEP_RESULT", # Document reference predicate "TG_DOCUMENT", # Named graphs @@ -215,6 +239,12 @@ __all__ = [ "agent_session_triples", "agent_iteration_triples", "agent_final_triples", + # Orchestrator provenance triple builders + "agent_decomposition_triples", + "agent_finding_triples", + "agent_plan_triples", + "agent_step_result_triples", + "agent_synthesis_triples", # Utility "set_graph", # Vocabulary diff --git a/trustgraph-base/trustgraph/provenance/agent.py b/trustgraph-base/trustgraph/provenance/agent.py index f1aeab0d..d25109a7 100644 --- a/trustgraph-base/trustgraph/provenance/agent.py +++ b/trustgraph-base/trustgraph/provenance/agent.py @@ -1,10 +1,15 @@ """ Helper functions to build PROV-O triples for agent provenance. -Agent provenance tracks the reasoning trace of ReAct agent sessions: +Agent provenance tracks the reasoning trace of agent sessions: - Question: The root activity with query and timestamp -- Analysis: Each think/act/observe cycle -- Conclusion: The final answer +- Analysis: Each think/act/observe cycle (ReAct) +- Conclusion: The final answer (ReAct) +- Decomposition: Supervisor broke question into sub-goals +- Finding: A subagent's result (Supervisor) +- Plan: Structured plan of steps (Plan-then-Execute) +- StepResult: A plan step's result (Plan-then-Execute) +- Synthesis: Final synthesised answer (Supervisor, Plan-then-Execute) """ import json @@ -21,6 +26,8 @@ from . namespaces import ( TG_QUESTION, TG_ANALYSIS, TG_CONCLUSION, TG_DOCUMENT, TG_ANSWER_TYPE, TG_REFLECTION_TYPE, TG_THOUGHT_TYPE, TG_OBSERVATION_TYPE, TG_AGENT_QUESTION, + TG_DECOMPOSITION, TG_FINDING, TG_PLAN_TYPE, TG_STEP_RESULT, + TG_SYNTHESIS, TG_SUBAGENT_GOAL, TG_PLAN_STEP, ) @@ -203,3 +210,97 @@ def agent_final_triples( triples.append(_triple(final_uri, TG_DOCUMENT, _iri(document_id))) return triples + + +def agent_decomposition_triples( + uri: str, + session_uri: str, + goals: List[str], +) -> List[Triple]: + """Build triples for a supervisor decomposition step.""" + triples = [ + _triple(uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(uri, RDF_TYPE, _iri(TG_DECOMPOSITION)), + _triple(uri, RDFS_LABEL, + _literal(f"Decomposed into {len(goals)} research threads")), + _triple(uri, PROV_WAS_GENERATED_BY, _iri(session_uri)), + ] + for goal in goals: + triples.append(_triple(uri, TG_SUBAGENT_GOAL, _literal(goal))) + return triples + + +def agent_finding_triples( + uri: str, + decomposition_uri: str, + goal: str, + document_id: Optional[str] = None, +) -> List[Triple]: + """Build triples for a subagent finding.""" + triples = [ + _triple(uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(uri, RDF_TYPE, _iri(TG_FINDING)), + _triple(uri, RDF_TYPE, _iri(TG_ANSWER_TYPE)), + _triple(uri, RDFS_LABEL, _literal(f"Finding: {goal[:60]}")), + _triple(uri, PROV_WAS_DERIVED_FROM, _iri(decomposition_uri)), + _triple(uri, TG_SUBAGENT_GOAL, _literal(goal)), + ] + if document_id: + triples.append(_triple(uri, TG_DOCUMENT, _iri(document_id))) + return triples + + +def agent_plan_triples( + uri: str, + session_uri: str, + steps: List[str], +) -> List[Triple]: + """Build triples for a plan-then-execute plan.""" + triples = [ + _triple(uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(uri, RDF_TYPE, _iri(TG_PLAN_TYPE)), + _triple(uri, RDFS_LABEL, + _literal(f"Plan with {len(steps)} steps")), + _triple(uri, PROV_WAS_GENERATED_BY, _iri(session_uri)), + ] + for step in steps: + triples.append(_triple(uri, TG_PLAN_STEP, _literal(step))) + return triples + + +def agent_step_result_triples( + uri: str, + plan_uri: str, + goal: str, + document_id: Optional[str] = None, +) -> List[Triple]: + """Build triples for a plan step result.""" + triples = [ + _triple(uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(uri, RDF_TYPE, _iri(TG_STEP_RESULT)), + _triple(uri, RDF_TYPE, _iri(TG_ANSWER_TYPE)), + _triple(uri, RDFS_LABEL, _literal(f"Step result: {goal[:60]}")), + _triple(uri, PROV_WAS_DERIVED_FROM, _iri(plan_uri)), + _triple(uri, TG_PLAN_STEP, _literal(goal)), + ] + if document_id: + triples.append(_triple(uri, TG_DOCUMENT, _iri(document_id))) + return triples + + +def agent_synthesis_triples( + uri: str, + previous_uri: str, + document_id: Optional[str] = None, +) -> List[Triple]: + """Build triples for a synthesis answer.""" + triples = [ + _triple(uri, RDF_TYPE, _iri(PROV_ENTITY)), + _triple(uri, RDF_TYPE, _iri(TG_SYNTHESIS)), + _triple(uri, RDF_TYPE, _iri(TG_ANSWER_TYPE)), + _triple(uri, RDFS_LABEL, _literal("Synthesis")), + _triple(uri, PROV_WAS_DERIVED_FROM, _iri(previous_uri)), + ] + if document_id: + triples.append(_triple(uri, TG_DOCUMENT, _iri(document_id))) + return triples diff --git a/trustgraph-base/trustgraph/provenance/namespaces.py b/trustgraph-base/trustgraph/provenance/namespaces.py index 3dc16fa2..69134dfb 100644 --- a/trustgraph-base/trustgraph/provenance/namespaces.py +++ b/trustgraph-base/trustgraph/provenance/namespaces.py @@ -94,8 +94,14 @@ TG_SYNTHESIS = TG + "Synthesis" TG_ANALYSIS = TG + "Analysis" TG_CONCLUSION = TG + "Conclusion" +# Orchestrator entity types +TG_DECOMPOSITION = TG + "Decomposition" # Supervisor decomposed into sub-goals +TG_FINDING = TG + "Finding" # Subagent result +TG_PLAN_TYPE = TG + "Plan" # Plan-then-execute plan +TG_STEP_RESULT = TG + "StepResult" # Plan step result + # Unifying types for answer and intermediate commentary -TG_ANSWER_TYPE = TG + "Answer" # Final answer (Synthesis, Conclusion) +TG_ANSWER_TYPE = TG + "Answer" # Final answer (Synthesis, Conclusion, Finding, StepResult) TG_REFLECTION_TYPE = TG + "Reflection" # Intermediate commentary (Thought, Observation) TG_THOUGHT_TYPE = TG + "Thought" # Agent reasoning TG_OBSERVATION_TYPE = TG + "Observation" # Agent tool result @@ -110,6 +116,8 @@ TG_THOUGHT = TG + "thought" # Links iteration to thought sub-entity TG_ACTION = TG + "action" TG_ARGUMENTS = TG + "arguments" TG_OBSERVATION = TG + "observation" # Links iteration to observation sub-entity +TG_SUBAGENT_GOAL = TG + "subagentGoal" # Goal string on Decomposition/Finding +TG_PLAN_STEP = TG + "planStep" # Step goal string on Plan/StepResult # Named graph URIs for RDF datasets # These separate different types of data while keeping them in the same collection diff --git a/trustgraph-base/trustgraph/provenance/uris.py b/trustgraph-base/trustgraph/provenance/uris.py index ac221515..a3aadef6 100644 --- a/trustgraph-base/trustgraph/provenance/uris.py +++ b/trustgraph-base/trustgraph/provenance/uris.py @@ -234,6 +234,31 @@ def agent_final_uri(session_id: str) -> str: return f"urn:trustgraph:agent:{session_id}/final" +def agent_decomposition_uri(session_id: str) -> str: + """Generate URI for a supervisor decomposition step.""" + return f"urn:trustgraph:agent:{session_id}/decompose" + + +def agent_finding_uri(session_id: str, index: int) -> str: + """Generate URI for a subagent finding.""" + return f"urn:trustgraph:agent:{session_id}/finding/{index}" + + +def agent_plan_uri(session_id: str) -> str: + """Generate URI for a plan-then-execute plan.""" + return f"urn:trustgraph:agent:{session_id}/plan" + + +def agent_step_result_uri(session_id: str, index: int) -> str: + """Generate URI for a plan step result.""" + return f"urn:trustgraph:agent:{session_id}/step/{index}" + + +def agent_synthesis_uri(session_id: str) -> str: + """Generate URI for a synthesis answer.""" + return f"urn:trustgraph:agent:{session_id}/synthesis" + + # Document RAG provenance URIs # These URIs use the urn:trustgraph:docrag: namespace to distinguish # document RAG provenance from graph RAG provenance diff --git a/trustgraph-base/trustgraph/provenance/vocabulary.py b/trustgraph-base/trustgraph/provenance/vocabulary.py index 018e2bfe..afb5c30f 100644 --- a/trustgraph-base/trustgraph/provenance/vocabulary.py +++ b/trustgraph-base/trustgraph/provenance/vocabulary.py @@ -27,6 +27,8 @@ from . namespaces import ( TG_DOCUMENT_TYPE, TG_PAGE_TYPE, TG_CHUNK_TYPE, TG_SUBGRAPH_TYPE, TG_CONCEPT, TG_ENTITY, TG_GROUNDING, TG_ANSWER_TYPE, TG_REFLECTION_TYPE, TG_THOUGHT_TYPE, TG_OBSERVATION_TYPE, + TG_DECOMPOSITION, TG_FINDING, TG_PLAN_TYPE, TG_STEP_RESULT, + TG_SUBAGENT_GOAL, TG_PLAN_STEP, ) @@ -87,6 +89,10 @@ TG_CLASS_LABELS = [ _label_triple(TG_REFLECTION_TYPE, "Reflection"), _label_triple(TG_THOUGHT_TYPE, "Thought"), _label_triple(TG_OBSERVATION_TYPE, "Observation"), + _label_triple(TG_DECOMPOSITION, "Decomposition"), + _label_triple(TG_FINDING, "Finding"), + _label_triple(TG_PLAN_TYPE, "Plan"), + _label_triple(TG_STEP_RESULT, "Step Result"), ] # TrustGraph predicate labels @@ -109,6 +115,8 @@ TG_PREDICATE_LABELS = [ _label_triple(TG_SOURCE_CHAR_LENGTH, "source character length"), _label_triple(TG_CONCEPT, "concept"), _label_triple(TG_ENTITY, "entity"), + _label_triple(TG_SUBAGENT_GOAL, "subagent goal"), + _label_triple(TG_PLAN_STEP, "plan step"), ] diff --git a/trustgraph-cli/trustgraph/cli/invoke_agent.py b/trustgraph-cli/trustgraph/cli/invoke_agent.py index 9879025f..2a1ba7c2 100644 --- a/trustgraph-cli/trustgraph/cli/invoke_agent.py +++ b/trustgraph-cli/trustgraph/cli/invoke_agent.py @@ -13,6 +13,11 @@ from trustgraph.api import ( Question, Analysis, Conclusion, + Decomposition, + Finding, + Plan, + StepResult, + Synthesis, AgentThought, AgentObservation, AgentAnswer, @@ -209,6 +214,35 @@ def question_explainable( if entity.observation: print(f" Observation: {entity.observation}", file=sys.stderr) + elif isinstance(entity, Decomposition): + print(f"\n [decompose] {prov_id}", file=sys.stderr) + for i, goal in enumerate(entity.goals): + print(f" Thread {i}: {goal}", file=sys.stderr) + + elif isinstance(entity, Finding): + print(f"\n [finding] {prov_id}", file=sys.stderr) + if entity.goal: + print(f" Goal: {entity.goal}", file=sys.stderr) + if entity.document: + print(f" Document: {entity.document}", file=sys.stderr) + + elif isinstance(entity, Plan): + print(f"\n [plan] {prov_id}", file=sys.stderr) + for i, step in enumerate(entity.steps): + print(f" Step {i}: {step}", file=sys.stderr) + + elif isinstance(entity, StepResult): + print(f"\n [step-result] {prov_id}", file=sys.stderr) + if entity.step: + print(f" Step: {entity.step}", file=sys.stderr) + if entity.document: + print(f" Document: {entity.document}", file=sys.stderr) + + elif isinstance(entity, Synthesis): + print(f"\n [synthesis] {prov_id}", file=sys.stderr) + if entity.document: + print(f" Document: {entity.document}", file=sys.stderr) + elif isinstance(entity, Conclusion): print(f"\n [conclusion] {prov_id}", file=sys.stderr) if entity.document: diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py index b66bc4f5..ddc4aed9 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py @@ -20,9 +20,19 @@ from trustgraph.provenance import ( agent_thought_uri, agent_observation_uri, agent_final_uri, + agent_decomposition_uri, + agent_finding_uri, + agent_plan_uri, + agent_step_result_uri, + agent_synthesis_uri, agent_session_triples, agent_iteration_triples, agent_final_triples, + agent_decomposition_triples, + agent_finding_triples, + agent_plan_triples, + agent_step_result_triples, + agent_synthesis_triples, set_graph, GRAPH_RETRIEVAL, ) @@ -359,6 +369,146 @@ class PatternBase: explain_graph=GRAPH_RETRIEVAL, )) + # ---- Orchestrator provenance helpers ------------------------------------ + + async def emit_decomposition_triples( + self, flow, session_id, session_uri, goals, user, collection, + respond, streaming, + ): + """Emit provenance for a supervisor decomposition step.""" + uri = agent_decomposition_uri(session_id) + triples = set_graph( + agent_decomposition_triples(uri, session_uri, goals), + GRAPH_RETRIEVAL, + ) + await flow("explainability").send(Triples( + metadata=Metadata(id=uri, user=user, collection=collection), + triples=triples, + )) + if streaming: + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) + + async def emit_finding_triples( + self, flow, session_id, index, goal, answer_text, user, collection, + respond, streaming, + ): + """Emit provenance for a subagent finding.""" + uri = agent_finding_uri(session_id, index) + decomposition_uri = agent_decomposition_uri(session_id) + + doc_id = f"urn:trustgraph:agent:{session_id}/finding/{index}/doc" + try: + await self.processor.save_answer_content( + doc_id=doc_id, user=user, + content=answer_text, + title=f"Finding: {goal[:60]}", + ) + except Exception as e: + logger.warning(f"Failed to save finding to librarian: {e}") + doc_id = None + + triples = set_graph( + agent_finding_triples(uri, decomposition_uri, goal, doc_id), + GRAPH_RETRIEVAL, + ) + await flow("explainability").send(Triples( + metadata=Metadata(id=uri, user=user, collection=collection), + triples=triples, + )) + if streaming: + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) + + async def emit_plan_triples( + self, flow, session_id, session_uri, steps, user, collection, + respond, streaming, + ): + """Emit provenance for a plan creation.""" + uri = agent_plan_uri(session_id) + triples = set_graph( + agent_plan_triples(uri, session_uri, steps), + GRAPH_RETRIEVAL, + ) + await flow("explainability").send(Triples( + metadata=Metadata(id=uri, user=user, collection=collection), + triples=triples, + )) + if streaming: + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) + + async def emit_step_result_triples( + self, flow, session_id, index, goal, answer_text, user, collection, + respond, streaming, + ): + """Emit provenance for a plan step result.""" + uri = agent_step_result_uri(session_id, index) + plan_uri = agent_plan_uri(session_id) + + doc_id = f"urn:trustgraph:agent:{session_id}/step/{index}/doc" + try: + await self.processor.save_answer_content( + doc_id=doc_id, user=user, + content=answer_text, + title=f"Step result: {goal[:60]}", + ) + except Exception as e: + logger.warning(f"Failed to save step result to librarian: {e}") + doc_id = None + + triples = set_graph( + agent_step_result_triples(uri, plan_uri, goal, doc_id), + GRAPH_RETRIEVAL, + ) + await flow("explainability").send(Triples( + metadata=Metadata(id=uri, user=user, collection=collection), + triples=triples, + )) + if streaming: + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) + + async def emit_synthesis_triples( + self, flow, session_id, previous_uri, answer_text, user, collection, + respond, streaming, + ): + """Emit provenance for a synthesis answer.""" + uri = agent_synthesis_uri(session_id) + + doc_id = f"urn:trustgraph:agent:{session_id}/synthesis/doc" + try: + await self.processor.save_answer_content( + doc_id=doc_id, user=user, + content=answer_text, + title="Synthesis", + ) + except Exception as e: + logger.warning(f"Failed to save synthesis to librarian: {e}") + doc_id = None + + triples = set_graph( + agent_synthesis_triples(uri, previous_uri, doc_id), + GRAPH_RETRIEVAL, + ) + await flow("explainability").send(Triples( + metadata=Metadata(id=uri, user=user, collection=collection), + triples=triples, + )) + if streaming: + await respond(AgentResponse( + chunk_type="explain", content="", + explain_id=uri, explain_graph=GRAPH_RETRIEVAL, + )) + # ---- Response helpers --------------------------------------------------- async def prompt_as_answer(self, client, prompt_id, variables, diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py index 4c61039f..d6abb058 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py @@ -11,7 +11,7 @@ import uuid from ... schema import AgentRequest, AgentResponse, AgentStep, PlanStep -from ..react.types import Action + from . pattern_base import PatternBase @@ -126,6 +126,13 @@ class PlanThenExecutePattern(PatternBase): thought_text = f"Created plan with {len(plan_steps)} steps" await think(thought_text, is_final=True) + # Emit plan provenance + step_goals = [ps.get("goal", "") for ps in plan_steps] + await self.emit_plan_triples( + flow, session_id, session_uri, step_goals, + request.user, collection, respond, streaming, + ) + # Build PlanStep objects plan_agent_steps = [ PlanStep( @@ -263,16 +270,10 @@ class PlanThenExecutePattern(PatternBase): result=step_result, ) - # Emit iteration provenance - prov_act = Action( - thought=f"Plan step {pending_idx}: {goal}", - name=tool_name, - arguments=tool_arguments, - observation=step_result, - ) - await self.emit_iteration_triples( - flow, session_id, iteration_num, session_uri, - prov_act, request, respond, streaming, + # Emit step result provenance + await self.emit_step_result_triples( + flow, session_id, pending_idx, goal, step_result, + request.user, collection, respond, streaming, ) # Build execution step for history @@ -340,9 +341,12 @@ class PlanThenExecutePattern(PatternBase): streaming=streaming, ) - await self.emit_final_triples( - flow, session_id, iteration_num, session_uri, - response_text, request, respond, streaming, + # Emit synthesis provenance (links back to last step result) + from trustgraph.provenance import agent_step_result_uri + last_step_uri = agent_step_result_uri(session_id, len(plan) - 1) + await self.emit_synthesis_triples( + flow, session_id, last_step_uri, + response_text, request.user, collection, respond, streaming, ) if self.is_subagent(request): diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/service.py b/trustgraph-flow/trustgraph/agent/orchestrator/service.py index 9c9980d4..9ca3fe59 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/service.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/service.py @@ -427,6 +427,7 @@ class Processor(AgentService): correlation_id = request.correlation_id subagent_goal = getattr(request, 'subagent_goal', '') + parent_session_id = getattr(request, 'parent_session_id', '') # Extract the answer from the completion step answer_text = "" @@ -451,13 +452,26 @@ class Processor(AgentService): ) return + # Emit finding provenance for this subagent + template = self.aggregator.get_original_request(correlation_id) + if template and parent_session_id: + entry = self.aggregator.correlations.get(correlation_id) + finding_index = len(entry["results"]) - 1 if entry else 0 + collection = getattr(template, 'collection', 'default') + + await self.supervisor_pattern.emit_finding_triples( + flow, parent_session_id, finding_index, + subagent_goal, answer_text, + template.user, collection, + respond, template.streaming, + ) + if all_done: logger.info( f"All subagents complete for {correlation_id}, " f"dispatching synthesis" ) - template = self.aggregator.get_original_request(correlation_id) if template is None: logger.error( f"No template for correlation {correlation_id}" diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py index 51c2d500..8588e400 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py @@ -16,7 +16,7 @@ import uuid from ... schema import AgentRequest, AgentResponse, AgentStep -from ..react.types import Action, Final +from trustgraph.provenance import agent_finding_uri from . pattern_base import PatternBase @@ -121,15 +121,9 @@ class SupervisorPattern(PatternBase): correlation_id = str(uuid.uuid4()) # Emit decomposition provenance - decompose_act = Action( - thought=f"Decomposed into {len(goals)} sub-goals", - name="decompose", - arguments={"goals": json.dumps(goals), "correlation_id": correlation_id}, - observation=f"Fanning out {len(goals)} subagents", - ) - await self.emit_iteration_triples( - flow, session_id, iteration_num, session_uri, - decompose_act, request, respond, streaming, + await self.emit_decomposition_triples( + flow, session_id, session_uri, goals, + request.user, collection, respond, streaming, ) # Fan out: emit a subagent request for each goal @@ -207,10 +201,15 @@ class SupervisorPattern(PatternBase): streaming=streaming, ) - await self.emit_final_triples( - flow, session_id, iteration_num, session_uri, - response_text, request, respond, streaming, + # Emit synthesis provenance (links back to last finding) + last_finding_uri = agent_finding_uri( + session_id, len(subagent_results) - 1 ) + await self.emit_synthesis_triples( + flow, session_id, last_finding_uri, + response_text, request.user, collection, respond, streaming, + ) + await self.send_final_response( respond, streaming, response_text, already_streamed=streaming, )