From 2bcf375103700349b36bf2ddf7e84d8e4aa897d5 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Wed, 1 Apr 2026 13:27:41 +0100 Subject: [PATCH] Wire message_id on all answer chunks, fix DAG structure (#748) Wire message_id on all answer chunks, fix DAG structure message_id: - Add message_id to AgentAnswer dataclass and propagate in socket_client._parse_chunk - Wire message_id into answer callbacks and send_final_response for all three patterns (react, plan-then-execute, supervisor) - Supervisor decomposition thought and synthesis answer chunks now carry message_id DAG structure fixes: - Observation derives from sub-trace Synthesis (not Analysis) when a tool produces a sub-trace; tracked via last_sub_explain_uri on context - Subagent sessions derive from parent's Decomposition via parent_uri on agent_session_triples - Findings derive from subagent Conclusions (not Decomposition) - Synthesis derives from all findings (multiple wasDerivedFrom) ensuring single terminal node - agent_synthesis_triples accepts list of parent URIs - Explainability chain walker follows from sub-trace terminal to find downstream Observation Emit Analysis before tool execution: - Add on_action callback to react() in agent_manager.py, called after reason() but before tool invocation - Orchestrator and old service emit Analysis+ToolUse triples via on_action so sub-traces appear after their parent in the stream --- .../trustgraph/api/explainability.py | 9 ++++ .../trustgraph/api/socket_client.py | 3 +- trustgraph-base/trustgraph/api/types.py | 1 + .../trustgraph/provenance/agent.py | 29 +++++++++-- .../agent/orchestrator/pattern_base.py | 49 ++++++++++++++----- .../agent/orchestrator/plan_pattern.py | 5 ++ .../agent/orchestrator/react_pattern.py | 14 +++++- .../trustgraph/agent/orchestrator/service.py | 3 ++ .../agent/orchestrator/supervisor_pattern.py | 29 ++++++++--- .../trustgraph/agent/react/service.py | 19 ++++++- .../trustgraph/agent/react/tools.py | 1 + 11 files changed, 134 insertions(+), 28 deletions(-) diff --git a/trustgraph-base/trustgraph/api/explainability.py b/trustgraph-base/trustgraph/api/explainability.py index fa6c4a0c..08d0b4e7 100644 --- a/trustgraph-base/trustgraph/api/explainability.py +++ b/trustgraph-base/trustgraph/api/explainability.py @@ -1095,6 +1095,15 @@ class ExplainabilityClient: "trace": sub_trace, }) + # Continue from the sub-trace's terminal entity + # (Observation may derive from Synthesis) + terminal = sub_trace.get("synthesis") + if terminal: + self._follow_provenance_chain( + terminal.uri, trace, graph, user, collection, + max_depth=max_depth - 1, + ) + elif isinstance(entity, (Conclusion, Synthesis)): trace["steps"].append(entity) diff --git a/trustgraph-base/trustgraph/api/socket_client.py b/trustgraph-base/trustgraph/api/socket_client.py index 3b463762..847513d3 100644 --- a/trustgraph-base/trustgraph/api/socket_client.py +++ b/trustgraph-base/trustgraph/api/socket_client.py @@ -397,7 +397,8 @@ class SocketClient: return AgentAnswer( content=resp.get("content", ""), end_of_message=resp.get("end_of_message", False), - end_of_dialog=resp.get("end_of_dialog", False) + end_of_dialog=resp.get("end_of_dialog", False), + message_id=resp.get("message_id", ""), ) elif chunk_type == "action": return AgentThought( diff --git a/trustgraph-base/trustgraph/api/types.py b/trustgraph-base/trustgraph/api/types.py index 3e3f1520..0715293b 100644 --- a/trustgraph-base/trustgraph/api/types.py +++ b/trustgraph-base/trustgraph/api/types.py @@ -188,6 +188,7 @@ class AgentAnswer(StreamingChunk): """ chunk_type: str = "final-answer" end_of_dialog: bool = False + message_id: str = "" @dataclasses.dataclass class RAGChunk(StreamingChunk): diff --git a/trustgraph-base/trustgraph/provenance/agent.py b/trustgraph-base/trustgraph/provenance/agent.py index 4fc1f2b5..7203174e 100644 --- a/trustgraph-base/trustgraph/provenance/agent.py +++ b/trustgraph-base/trustgraph/provenance/agent.py @@ -51,6 +51,7 @@ def agent_session_triples( session_uri: str, query: str, timestamp: Optional[str] = None, + parent_uri: Optional[str] = None, ) -> List[Triple]: """ Build triples for an agent session start (Question). @@ -58,11 +59,13 @@ def agent_session_triples( Creates: - Activity declaration with tg:Question type - Query text and timestamp + - wasDerivedFrom link to parent (for subagent sessions) Args: session_uri: URI of the session (from agent_session_uri) query: The user's query text timestamp: ISO timestamp (defaults to now) + parent_uri: URI of the parent entity (e.g. Decomposition) for subagents Returns: List of Triple objects @@ -70,7 +73,7 @@ def agent_session_triples( if timestamp is None: timestamp = datetime.utcnow().isoformat() + "Z" - return [ + triples = [ _triple(session_uri, RDF_TYPE, _iri(PROV_ENTITY)), _triple(session_uri, RDF_TYPE, _iri(TG_QUESTION)), _triple(session_uri, RDF_TYPE, _iri(TG_AGENT_QUESTION)), @@ -79,6 +82,13 @@ def agent_session_triples( _triple(session_uri, TG_QUERY, _literal(query)), ] + if parent_uri: + triples.append( + _triple(session_uri, PROV_WAS_DERIVED_FROM, _iri(parent_uri)) + ) + + return triples + def agent_iteration_triples( iteration_uri: str, @@ -308,17 +318,28 @@ def agent_step_result_triples( def agent_synthesis_triples( uri: str, - previous_uri: str, + previous_uris, document_id: Optional[str] = None, ) -> List[Triple]: - """Build triples for a synthesis answer.""" + """Build triples for a synthesis answer. + + Args: + uri: URI of the synthesis entity + previous_uris: Single URI string or list of URIs to derive from + document_id: Librarian document ID for the answer content + """ triples = [ _triple(uri, RDF_TYPE, _iri(PROV_ENTITY)), _triple(uri, RDF_TYPE, _iri(TG_SYNTHESIS)), _triple(uri, RDF_TYPE, _iri(TG_ANSWER_TYPE)), _triple(uri, RDFS_LABEL, _literal("Synthesis")), - _triple(uri, PROV_WAS_DERIVED_FROM, _iri(previous_uri)), ] + + if isinstance(previous_uris, str): + previous_uris = [previous_uris] + for prev in previous_uris: + triples.append(_triple(uri, PROV_WAS_DERIVED_FROM, _iri(prev))) + if document_id: triples.append(_triple(uri, TG_DOCUMENT, _iri(document_id))) return triples diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py index f999b132..8849a206 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py @@ -53,6 +53,7 @@ class UserAwareContext: self.respond = respond self.streaming = streaming self.current_explain_uri = None + self.last_sub_explain_uri = None def __call__(self, service_name): client = self._flow(service_name) @@ -190,7 +191,7 @@ class PatternBase: await respond(r) return observe - def make_answer_callback(self, respond, streaming): + def make_answer_callback(self, respond, streaming, message_id=""): """Create the answer callback for streaming/non-streaming.""" async def answer(x): logger.debug(f"Answer: {x}") @@ -200,6 +201,7 @@ class PatternBase: content=x, end_of_message=False, end_of_dialog=False, + message_id=message_id, ) else: r = AgentResponse( @@ -207,6 +209,7 @@ class PatternBase: content=x, end_of_message=True, end_of_dialog=False, + message_id=message_id, ) await respond(r) return answer @@ -214,11 +217,15 @@ class PatternBase: # ---- Provenance emission ------------------------------------------------ async def emit_session_triples(self, flow, session_uri, question, user, - collection, respond, streaming): + collection, respond, streaming, + parent_uri=None): """Emit provenance triples for a new session.""" timestamp = datetime.utcnow().isoformat() + "Z" triples = set_graph( - agent_session_triples(session_uri, question, timestamp), + agent_session_triples( + session_uri, question, timestamp, + parent_uri=parent_uri, + ), GRAPH_RETRIEVAL, ) await flow("explainability").send(Triples( @@ -301,11 +308,18 @@ class PatternBase: )) async def emit_observation_triples(self, flow, session_id, iteration_num, - observation_text, request, respond): + observation_text, request, respond, + context=None): """Emit provenance triples for a standalone Observation entity.""" iteration_uri = agent_iteration_uri(session_id, iteration_num) observation_entity_uri = agent_observation_uri(session_id, iteration_num) + # Derive from the last sub-trace entity if available (e.g. Synthesis), + # otherwise fall back to the iteration (Analysis+ToolUse). + parent_uri = iteration_uri + if context and getattr(context, 'last_sub_explain_uri', None): + parent_uri = context.last_sub_explain_uri + # Save observation to librarian observation_doc_id = None if observation_text: @@ -326,7 +340,7 @@ class PatternBase: obs_triples = set_graph( agent_observation_triples( observation_entity_uri, - iteration_uri, + parent_uri, document_id=observation_doc_id, ), GRAPH_RETRIEVAL, @@ -427,11 +441,17 @@ class PatternBase: async def emit_finding_triples( self, flow, session_id, index, goal, answer_text, user, collection, - respond, streaming, + respond, streaming, subagent_session_id="", ): """Emit provenance for a subagent finding.""" uri = agent_finding_uri(session_id, index) - decomposition_uri = agent_decomposition_uri(session_id) + + # Derive from the subagent's conclusion if available, + # otherwise fall back to the decomposition. + if subagent_session_id: + parent_uri = agent_final_uri(subagent_session_id) + else: + parent_uri = agent_decomposition_uri(session_id) doc_id = f"urn:trustgraph:agent:{session_id}/finding/{index}/doc" try: @@ -445,7 +465,7 @@ class PatternBase: doc_id = None triples = set_graph( - agent_finding_triples(uri, decomposition_uri, goal, doc_id), + agent_finding_triples(uri, parent_uri, goal, doc_id), GRAPH_RETRIEVAL, ) await flow("explainability").send(Triples( @@ -509,7 +529,7 @@ class PatternBase: )) async def emit_synthesis_triples( - self, flow, session_id, previous_uri, answer_text, user, collection, + self, flow, session_id, previous_uris, answer_text, user, collection, respond, streaming, ): """Emit provenance for a synthesis answer.""" @@ -527,7 +547,7 @@ class PatternBase: doc_id = None triples = set_graph( - agent_synthesis_triples(uri, previous_uri, doc_id), + agent_synthesis_triples(uri, previous_uris, doc_id), GRAPH_RETRIEVAL, ) await flow("explainability").send(Triples( @@ -542,7 +562,7 @@ class PatternBase: # ---- Response helpers --------------------------------------------------- async def prompt_as_answer(self, client, prompt_id, variables, - respond, streaming): + respond, streaming, message_id=""): """Call a prompt template, forwarding chunks as answer AgentResponse messages when streaming is enabled. @@ -559,6 +579,7 @@ class PatternBase: content=text, end_of_message=False, end_of_dialog=False, + message_id=message_id, )) await client.prompt( @@ -576,13 +597,14 @@ class PatternBase: ) async def send_final_response(self, respond, streaming, answer_text, - already_streamed=False): + already_streamed=False, message_id=""): """Send the answer content and end-of-dialog marker. Args: already_streamed: If True, answer chunks were already sent via streaming callbacks (e.g. ReactPattern). Only the end-of-dialog marker is emitted. + message_id: Provenance URI for the answer entity. """ if streaming and not already_streamed: # Answer wasn't streamed yet — send it as a chunk first @@ -592,6 +614,7 @@ class PatternBase: content=answer_text, end_of_message=False, end_of_dialog=False, + message_id=message_id, )) if streaming: # End-of-dialog marker @@ -600,6 +623,7 @@ class PatternBase: content="", end_of_message=True, end_of_dialog=True, + message_id=message_id, )) else: await respond(AgentResponse( @@ -607,6 +631,7 @@ class PatternBase: content=answer_text, end_of_message=True, end_of_dialog=True, + message_id=message_id, )) def build_next_request(self, request, history, session_id, collection, diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py index 4775212e..59d22929 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py @@ -15,6 +15,7 @@ from trustgraph.provenance import ( agent_step_result_uri as make_step_result_uri, agent_thought_uri, agent_observation_uri, + agent_synthesis_uri, ) from . pattern_base import PatternBase @@ -352,6 +353,8 @@ class PlanThenExecutePattern(PatternBase): await think("Synthesising final answer from plan results", is_final=True) + synthesis_msg_id = agent_synthesis_uri(session_id) + response_text = await self.prompt_as_answer( client, "plan-synthesise", variables={ @@ -361,6 +364,7 @@ class PlanThenExecutePattern(PatternBase): }, respond=respond, streaming=streaming, + message_id=synthesis_msg_id, ) # Emit synthesis provenance (links back to last step result) @@ -375,4 +379,5 @@ class PlanThenExecutePattern(PatternBase): else: await self.send_final_response( respond, streaming, response_text, already_streamed=streaming, + message_id=synthesis_msg_id, ) diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py index f6af65c2..67ded823 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py @@ -15,6 +15,8 @@ from trustgraph.provenance import ( agent_iteration_uri, agent_thought_uri, agent_observation_uri, + agent_final_uri, + agent_decomposition_uri, ) from ..react.agent_manager import AgentManager @@ -47,9 +49,16 @@ class ReactPattern(PatternBase): # Emit session provenance on first iteration if iteration_num == 1: + # Subagents link back to the parent's decomposition + parent_session_id = getattr(request, 'parent_session_id', '') + parent_uri = ( + agent_decomposition_uri(parent_session_id) + if parent_session_id else None + ) await self.emit_session_triples( flow, session_uri, request.question, request.user, collection, respond, streaming, + parent_uri=parent_uri, ) logger.info(f"ReactPattern iteration {iteration_num}: {request.question}") @@ -60,11 +69,12 @@ class ReactPattern(PatternBase): # Compute URIs upfront for message_id thought_msg_id = agent_thought_uri(session_id, iteration_num) observation_msg_id = agent_observation_uri(session_id, iteration_num) + answer_msg_id = agent_final_uri(session_id) # Build callbacks think = self.make_think_callback(respond, streaming, message_id=thought_msg_id) observe = self.make_observe_callback(respond, streaming, message_id=observation_msg_id) - answer_cb = self.make_answer_callback(respond, streaming) + answer_cb = self.make_answer_callback(respond, streaming, message_id=answer_msg_id) # Filter tools filtered_tools = self.filter_tools( @@ -133,6 +143,7 @@ class ReactPattern(PatternBase): else: await self.send_final_response( respond, streaming, f, already_streamed=streaming, + message_id=answer_msg_id, ) return @@ -140,6 +151,7 @@ class ReactPattern(PatternBase): await self.emit_observation_triples( flow, session_id, iteration_num, act.observation, request, respond, + context=context, ) history.append(act) diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/service.py b/trustgraph-flow/trustgraph/agent/orchestrator/service.py index ed4c3983..ea0afd60 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/service.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/service.py @@ -458,11 +458,14 @@ class Processor(AgentService): finding_index = len(entry["results"]) - 1 if entry else 0 collection = getattr(template, 'collection', 'default') + subagent_session_id = getattr(request, 'session_id', '') + await self.supervisor_pattern.emit_finding_triples( flow, parent_session_id, finding_index, subagent_goal, answer_text, template.user, collection, respond, template.streaming, + subagent_session_id=subagent_session_id, ) if all_done: diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py index 951063cf..d5537876 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py @@ -16,7 +16,11 @@ import uuid from ... schema import AgentRequest, AgentResponse, AgentStep -from trustgraph.provenance import agent_finding_uri +from trustgraph.provenance import ( + agent_finding_uri, + agent_decomposition_uri, + agent_synthesis_uri, +) from . pattern_base import PatternBase @@ -81,7 +85,10 @@ class SupervisorPattern(PatternBase): session_uri, iteration_num): """Decompose the question into sub-goals and fan out subagents.""" - think = self.make_think_callback(respond, streaming) + decompose_msg_id = agent_decomposition_uri(session_id) + think = self.make_think_callback( + respond, streaming, message_id=decompose_msg_id, + ) framing = getattr(request, 'framing', '') tools = self.filter_tools(self.processor.agent.tools, request) @@ -171,7 +178,10 @@ class SupervisorPattern(PatternBase): session_uri, iteration_num): """Synthesise final answer from subagent results.""" - think = self.make_think_callback(respond, streaming) + synthesis_msg_id = agent_synthesis_uri(session_id) + think = self.make_think_callback( + respond, streaming, message_id=synthesis_msg_id, + ) framing = getattr(request, 'framing', '') # Collect subagent results from history @@ -205,17 +215,20 @@ class SupervisorPattern(PatternBase): }, respond=respond, streaming=streaming, + message_id=synthesis_msg_id, ) - # Emit synthesis provenance (links back to last finding) - last_finding_uri = agent_finding_uri( - session_id, len(subagent_results) - 1 - ) + # Emit synthesis provenance (links back to all findings) + finding_uris = [ + agent_finding_uri(session_id, i) + for i in range(len(subagent_results)) + ] await self.emit_synthesis_triples( - flow, session_id, last_finding_uri, + flow, session_id, finding_uris, response_text, request.user, collection, respond, streaming, ) await self.send_final_response( respond, streaming, response_text, already_streamed=streaming, + message_id=synthesis_msg_id, ) diff --git a/trustgraph-flow/trustgraph/agent/react/service.py b/trustgraph-flow/trustgraph/agent/react/service.py index af088ec9..0e783349 100755 --- a/trustgraph-flow/trustgraph/agent/react/service.py +++ b/trustgraph-flow/trustgraph/agent/react/service.py @@ -529,6 +529,8 @@ class Processor(AgentService): await respond(r) + answer_msg_id = agent_final_uri(session_id) + async def answer(x): logger.debug(f"Answer: {x}") @@ -539,6 +541,7 @@ class Processor(AgentService): content=x, end_of_message=False, end_of_dialog=False, + message_id=answer_msg_id, ) else: r = AgentResponse( @@ -546,6 +549,7 @@ class Processor(AgentService): content=x, end_of_message=True, end_of_dialog=False, + message_id=answer_msg_id, ) await respond(r) @@ -571,6 +575,7 @@ class Processor(AgentService): def __init__(self, flow, user): self._flow = flow self._user = user + self.last_sub_explain_uri = None def __call__(self, service_name): client = self._flow(service_name) @@ -635,13 +640,15 @@ class Processor(AgentService): explain_graph=GRAPH_RETRIEVAL, )) + user_context = UserAwareContext(flow, request.user) + act = await temp_agent.react( question = request.question, history = history, think = think, observe = observe, answer = answer, - context = UserAwareContext(flow, request.user), + context = user_context, streaming = streaming, on_action = on_action, ) @@ -717,6 +724,7 @@ class Processor(AgentService): content="", end_of_message=True, end_of_dialog=True, + message_id=answer_msg_id, ) else: r = AgentResponse( @@ -724,6 +732,7 @@ class Processor(AgentService): content=f, end_of_message=True, end_of_dialog=True, + message_id=answer_msg_id, ) await respond(r) @@ -737,6 +746,12 @@ class Processor(AgentService): # Emit standalone observation provenance (iteration was emitted in on_action) iteration_uri = agent_iteration_uri(session_id, iteration_num) observation_entity_uri = agent_observation_uri(session_id, iteration_num) + + # Derive from last sub-trace entity if available, else iteration + obs_parent_uri = iteration_uri + if user_context.last_sub_explain_uri: + obs_parent_uri = user_context.last_sub_explain_uri + observation_doc_id = None if act.observation: observation_doc_id = f"urn:trustgraph:agent:{session_id}/i{iteration_num}/observation" @@ -755,7 +770,7 @@ class Processor(AgentService): obs_triples = set_graph( agent_observation_triples( observation_entity_uri, - iteration_uri, + obs_parent_uri, document_id=observation_doc_id, ), GRAPH_RETRIEVAL diff --git a/trustgraph-flow/trustgraph/agent/react/tools.py b/trustgraph-flow/trustgraph/agent/react/tools.py index 86b515e1..041558ec 100644 --- a/trustgraph-flow/trustgraph/agent/react/tools.py +++ b/trustgraph-flow/trustgraph/agent/react/tools.py @@ -40,6 +40,7 @@ class KnowledgeQueryImpl: from ... schema import AgentResponse async def explain_callback(explain_id, explain_graph): + self.context.last_sub_explain_uri = explain_id await respond(AgentResponse( chunk_type="explain", content="",