From e65ea217a207cce476c77edfdd85078dcea13a53 Mon Sep 17 00:00:00 2001 From: cybermaggedon Date: Tue, 31 Mar 2026 11:24:30 +0100 Subject: [PATCH] agent-orchestrator improvements (#743) agent-orchestrator improvements: - Improve agent trace - Improve queue dumping - Fixing supervisor pattern - Fix synthesis step to remove loop Minor dev environment improvements: - Improve queue dump output for JSON - Reduce dev container rebuild --- Makefile | 4 +- trustgraph-cli/trustgraph/cli/dump_queues.py | 84 ++++++++++------ .../agent/orchestrator/aggregator.py | 21 ++-- .../agent/orchestrator/pattern_base.py | 41 ++++++++ .../agent/orchestrator/plan_pattern.py | 10 +- .../agent/orchestrator/react_pattern.py | 9 +- .../trustgraph/agent/orchestrator/service.py | 65 ++++++++++++ .../agent/orchestrator/supervisor_pattern.py | 18 ++-- .../trustgraph/agent/react/agent_manager.py | 98 +++++-------------- 9 files changed, 225 insertions(+), 125 deletions(-) diff --git a/Makefile b/Makefile index 4d79f554..197a6c63 100644 --- a/Makefile +++ b/Makefile @@ -77,8 +77,8 @@ some-containers: -t ${CONTAINER_BASE}/trustgraph-base:${VERSION} . ${DOCKER} build -f containers/Containerfile.flow \ -t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} . - ${DOCKER} build -f containers/Containerfile.unstructured \ - -t ${CONTAINER_BASE}/trustgraph-unstructured:${VERSION} . +# ${DOCKER} build -f containers/Containerfile.unstructured \ +# -t ${CONTAINER_BASE}/trustgraph-unstructured:${VERSION} . # ${DOCKER} build -f containers/Containerfile.vertexai \ # -t ${CONTAINER_BASE}/trustgraph-vertexai:${VERSION} . # ${DOCKER} build -f containers/Containerfile.mcp \ diff --git a/trustgraph-cli/trustgraph/cli/dump_queues.py b/trustgraph-cli/trustgraph/cli/dump_queues.py index 0a298450..4df61cc3 100644 --- a/trustgraph-cli/trustgraph/cli/dump_queues.py +++ b/trustgraph-cli/trustgraph/cli/dump_queues.py @@ -19,43 +19,67 @@ import argparse from trustgraph.base.subscriber import Subscriber from trustgraph.base.pubsub import get_pubsub +def decode_json_strings(obj): + """Recursively decode JSON-encoded string values within a dict/list.""" + if isinstance(obj, dict): + return {k: decode_json_strings(v) for k, v in obj.items()} + if isinstance(obj, list): + return [decode_json_strings(v) for v in obj] + if isinstance(obj, str): + try: + parsed = json.loads(obj) + if isinstance(parsed, (dict, list)): + return decode_json_strings(parsed) + except (json.JSONDecodeError, TypeError): + pass + return obj + + +def to_dict(value): + """Recursively convert a value to a JSON-serialisable structure.""" + + if value is None or isinstance(value, (bool, int, float)): + return value + + if isinstance(value, bytes): + value = value.decode('utf-8') + + if isinstance(value, str): + try: + return json.loads(value) + except (json.JSONDecodeError, TypeError): + return value + + if isinstance(value, dict): + return {k: to_dict(v) for k, v in value.items()} + + if isinstance(value, (list, tuple)): + return [to_dict(v) for v in value] + + # Pulsar schema objects expose fields via __dict__ + if hasattr(value, '__dict__'): + return { + k: to_dict(v) for k, v in value.__dict__.items() + if not k.startswith('_') + } + + return str(value) + + def format_message(queue_name, msg): """Format a message with timestamp and queue name.""" timestamp = datetime.now().isoformat() - # Try to parse as JSON and pretty-print try: - # Handle both Message objects and raw bytes - if hasattr(msg, 'value'): - # Message object with .value() method - value = msg.value() - else: - # Raw bytes from schema-less subscription - value = msg + value = msg.value() if hasattr(msg, 'value') else msg + parsed = to_dict(value) - # If it's bytes, decode it - if isinstance(value, bytes): - value = value.decode('utf-8') - - # If it's a string, try to parse as JSON - if isinstance(value, str): - try: - parsed = json.loads(value) - body = json.dumps(parsed, indent=2) - except (json.JSONDecodeError, TypeError): - body = value + # Unwrap nested JSON strings (e.g. terms values) + if isinstance(parsed, (dict, list)): + parsed = decode_json_strings(parsed) + body = json.dumps(parsed, indent=2, default=str) else: - # Try to convert to dict for pretty printing - try: - # Pulsar schema objects have __dict__ or similar - if hasattr(value, '__dict__'): - parsed = {k: v for k, v in value.__dict__.items() - if not k.startswith('_')} - else: - parsed = str(value) - body = json.dumps(parsed, indent=2, default=str) - except (TypeError, AttributeError): - body = str(value) + body = str(parsed) except Exception as e: body = f"\n{str(msg)}" diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/aggregator.py b/trustgraph-flow/trustgraph/agent/orchestrator/aggregator.py index bff8822c..9187f21e 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/aggregator.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/aggregator.py @@ -1,10 +1,12 @@ """ -Aggregator — monitors the explainability topic for subagent completions -and triggers synthesis when all siblings in a fan-out have completed. +Aggregator — tracks in-flight fan-out correlations and triggers +synthesis when all subagents have completed. -The aggregator watches for tg:Conclusion triples that carry a -correlation_id. When it detects that all expected siblings have -completed, it emits a synthesis AgentRequest on the agent request topic. +Subagent completions arrive as AgentRequest messages on the agent +request queue with step_type="subagent-completion". The orchestrator +intercepts these and feeds them to the aggregator. When all expected +siblings for a correlation ID have reported, the aggregator builds +a synthesis request for the supervisor pattern. """ import asyncio @@ -87,6 +89,13 @@ class Aggregator: return completed >= expected + def get_original_request(self, correlation_id): + """Peek at the stored request template without consuming it.""" + entry = self.correlations.get(correlation_id) + if entry is None: + return None + return entry["request_template"] + def get_results(self, correlation_id): """Get all results for a correlation and remove the tracking entry.""" entry = self.correlations.pop(correlation_id, None) @@ -138,7 +147,7 @@ class Aggregator: pattern="supervisor", task_type=template.task_type if template else "", framing=template.framing if template else "", - correlation_id=correlation_id, + correlation_id="", parent_session_id="", subagent_goal="", expected_siblings=0, diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py index fc07e745..b66bc4f5 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py @@ -61,6 +61,47 @@ class PatternBase: def __init__(self, processor): self.processor = processor + def is_subagent(self, request): + """Check if this request is running as a subagent of a supervisor.""" + return bool(getattr(request, 'correlation_id', '')) + + async def emit_subagent_completion(self, request, next, answer_text): + """Signal completion back to the orchestrator via the agent request + queue. Instead of sending the final answer to the client, send a + completion message so the aggregator can collect it.""" + + completion_step = AgentStep( + thought="Subagent completed", + action="complete", + arguments={}, + observation=answer_text, + step_type="subagent-completion", + ) + + completion_request = AgentRequest( + question=request.question, + state="", + group=getattr(request, 'group', []), + history=[completion_step], + user=request.user, + collection=getattr(request, 'collection', 'default'), + streaming=False, + session_id=getattr(request, 'session_id', ''), + conversation_id=getattr(request, 'conversation_id', ''), + pattern="", + correlation_id=request.correlation_id, + parent_session_id=getattr(request, 'parent_session_id', ''), + subagent_goal=getattr(request, 'subagent_goal', ''), + expected_siblings=getattr(request, 'expected_siblings', 0), + ) + + await next(completion_request) + logger.info( + f"Subagent completion emitted for " + f"correlation={request.correlation_id}, " + f"goal={getattr(request, 'subagent_goal', '')}" + ) + def filter_tools(self, tools, request): """Apply group/state filtering to the tool set.""" return filter_tools_by_group_and_state( diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py index d5f667c8..4c61039f 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py @@ -344,6 +344,10 @@ class PlanThenExecutePattern(PatternBase): flow, session_id, iteration_num, session_uri, response_text, request, respond, streaming, ) - await self.send_final_response( - respond, streaming, response_text, already_streamed=streaming, - ) + + if self.is_subagent(request): + await self.emit_subagent_completion(request, next, response_text) + else: + await self.send_final_response( + respond, streaming, response_text, already_streamed=streaming, + ) diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py index c0e481f7..a03dc194 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py @@ -106,9 +106,12 @@ class ReactPattern(PatternBase): f, request, respond, streaming, ) - await self.send_final_response( - respond, streaming, f, already_streamed=streaming, - ) + if self.is_subagent(request): + await self.emit_subagent_completion(request, next, f) + else: + await self.send_final_response( + respond, streaming, f, already_streamed=streaming, + ) return # Not final — emit iteration provenance and send next request diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/service.py b/trustgraph-flow/trustgraph/agent/orchestrator/service.py index f7418e60..9c9980d4 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/service.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/service.py @@ -422,9 +422,74 @@ class Processor(AgentService): ) logger.error("Configuration reload failed") + async def _handle_subagent_completion(self, request, respond, next, flow): + """Handle a subagent completion by feeding it to the aggregator.""" + + correlation_id = request.correlation_id + subagent_goal = getattr(request, 'subagent_goal', '') + + # Extract the answer from the completion step + answer_text = "" + for step in request.history: + if getattr(step, 'step_type', '') == 'subagent-completion': + answer_text = step.observation + break + + logger.info( + f"Received subagent completion: " + f"correlation={correlation_id}, goal={subagent_goal}" + ) + + all_done = self.aggregator.record_completion( + correlation_id, subagent_goal, answer_text + ) + + if all_done is None: + logger.warning( + f"Unknown correlation_id {correlation_id} — " + f"possibly timed out or duplicate" + ) + return + + if all_done: + logger.info( + f"All subagents complete for {correlation_id}, " + f"dispatching synthesis" + ) + + template = self.aggregator.get_original_request(correlation_id) + if template is None: + logger.error( + f"No template for correlation {correlation_id}" + ) + return + + synthesis_request = self.aggregator.build_synthesis_request( + correlation_id, + original_question=template.question, + user=template.user, + collection=getattr(template, 'collection', 'default'), + ) + + await next(synthesis_request) + async def agent_request(self, request, respond, next, flow): try: + + # Intercept subagent completion messages + correlation_id = getattr(request, 'correlation_id', '') + if correlation_id and request.history: + is_completion = any( + getattr(h, 'step_type', '') == 'subagent-completion' + for h in request.history + ) + if is_completion: + await self._handle_subagent_completion( + request, respond, next, flow + ) + return + pattern = getattr(request, 'pattern', '') or '' # If no pattern set and this is the first iteration, route diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py index 9070a393..51c2d500 100644 --- a/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py +++ b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py @@ -57,11 +57,8 @@ class SupervisorPattern(PatternBase): has_results = bool( request.history and any( - getattr(h, 'step_type', '') == 'decompose' - for h in request.history - ) - and any( - getattr(h, 'subagent_results', None) + getattr(h, 'step_type', '') == 'synthesise' + and getattr(h, 'subagent_results', None) for h in request.history ) ) @@ -159,9 +156,14 @@ class SupervisorPattern(PatternBase): await next(sub_request) logger.info(f"Fan-out: emitted subagent {i} for goal: {goal}") - # NOTE: The supervisor stops here. The aggregator will detect - # when all subagents complete and emit a synthesis request - # with the results populated. + # Register with aggregator for fan-in tracking + self.processor.aggregator.register_fanout( + correlation_id=correlation_id, + parent_session_id=session_id, + expected_siblings=len(goals), + request_template=request, + ) + logger.info( f"Supervisor fan-out complete: {len(goals)} subagents, " f"correlation_id={correlation_id}" diff --git a/trustgraph-flow/trustgraph/agent/react/agent_manager.py b/trustgraph-flow/trustgraph/agent/react/agent_manager.py index 87cee33d..18598b38 100644 --- a/trustgraph-flow/trustgraph/agent/react/agent_manager.py +++ b/trustgraph-flow/trustgraph/agent/react/agent_manager.py @@ -16,37 +16,37 @@ class AgentManager: def parse_react_response(self, text): """Parse text-based ReAct response format. - + Expected format: Thought: [reasoning about what to do next] Action: [tool_name] Args: { "param": "value" } - + OR - + Thought: [reasoning about the final answer] Final Answer: [the answer] """ if not isinstance(text, str): raise ValueError(f"Expected string response, got {type(text)}") - + # Remove any markdown code blocks that might wrap the response text = re.sub(r'^```[^\n]*\n', '', text.strip()) text = re.sub(r'\n```$', '', text.strip()) - + lines = text.strip().split('\n') - + thought = None action = None args = None final_answer = None - + i = 0 while i < len(lines): line = lines[i].strip() - + # Parse Thought if line.startswith("Thought:"): thought = line[8:].strip() @@ -59,19 +59,19 @@ class AgentManager: thought += " " + next_line i += 1 continue - + # Parse Final Answer if line.startswith("Final Answer:"): final_answer = line[13:].strip() # Handle multi-line final answers (including JSON) i += 1 - + # Check if the answer might be JSON if final_answer.startswith('{') or (i < len(lines) and lines[i].strip().startswith('{')): # Collect potential JSON answer json_text = final_answer if final_answer.startswith('{') else "" brace_count = json_text.count('{') - json_text.count('}') - + while i < len(lines) and (brace_count > 0 or not json_text): current_line = lines[i].strip() if current_line.startswith(("Thought:", "Action:")) and brace_count == 0: @@ -79,7 +79,7 @@ class AgentManager: json_text += ("\n" if json_text else "") + current_line brace_count += current_line.count('{') - current_line.count('}') i += 1 - + # Try to parse as JSON # try: # final_answer = json.loads(json_text) @@ -95,13 +95,13 @@ class AgentManager: break final_answer += " " + next_line i += 1 - + # If we have a final answer, return Final object return Final( thought=thought or "", final=final_answer ) - + # Parse Action if line.startswith("Action:"): action = line[7:].strip() @@ -112,7 +112,7 @@ class AgentManager: while action and action[-1] == '"': action = action[:-1] - + # Parse Args if line.startswith("Args:"): # Check if JSON starts on the same line @@ -123,15 +123,15 @@ class AgentManager: else: args_text = "" brace_count = 0 - + # Collect all lines that form the JSON arguments i += 1 started = bool(args_on_same_line and '{' in args_on_same_line) - + while i < len(lines) and (not started or brace_count > 0): current_line = lines[i] args_text += ("\n" if args_text else "") + current_line - + # Count braces to determine when JSON is complete for char in current_line: if char == '{': @@ -139,22 +139,22 @@ class AgentManager: started = True elif char == '}': brace_count -= 1 - + # If we've started and braces are balanced, we're done if started and brace_count == 0: break - + i += 1 - + # Parse the JSON arguments try: args = json.loads(args_text.strip()) except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON arguments: {args_text}") raise ValueError(f"Invalid JSON in Args: {e}") - + i += 1 - + # If we have an action, return Action object if action: return Action( @@ -163,11 +163,11 @@ class AgentManager: arguments=args or {}, observation="" ) - + # If we only have a thought but no action or final answer if thought and not action and not final_answer: raise ValueError(f"Response has thought but no action or final answer: {text}") - + raise ValueError(f"Could not parse response: {text}") async def reason(self, question, history, context, streaming=False, think=None, observe=None, answer=None): @@ -176,15 +176,10 @@ class AgentManager: tools = self.tools - logger.debug("in reason") - logger.debug(f"tools: {tools}") - tool_names = ",".join([ t for t in self.tools.keys() ]) - logger.debug(f"Tool names: {tool_names}") - variables = { "question": question, "tools": [ @@ -218,17 +213,10 @@ class AgentManager: logger.debug(f"Variables: {json.dumps(variables, indent=4)}") - logger.info(f"prompt: {variables}") - - logger.info(f"DEBUG: streaming={streaming}, think={think is not None}") - # Streaming path - use StreamingReActParser if streaming and think: - logger.info("DEBUG: Entering streaming path") from .streaming_parser import StreamingReActParser - logger.info("DEBUG: Creating StreamingReActParser") - # Collect chunks to send via async callbacks thought_chunks = [] answer_chunks = [] @@ -238,24 +226,19 @@ class AgentManager: on_thought_chunk=lambda chunk: thought_chunks.append(chunk), on_answer_chunk=lambda chunk: answer_chunks.append(chunk), ) - logger.info("DEBUG: StreamingReActParser created") # Create async chunk callback that feeds parser and sends collected chunks async def on_chunk(text, end_of_stream): - logger.info(f"DEBUG: on_chunk called with {len(text)} chars, end_of_stream={end_of_stream}") # Track what we had before prev_thought_count = len(thought_chunks) prev_answer_count = len(answer_chunks) # Feed the parser (synchronous) - logger.info(f"DEBUG: About to call parser.feed") parser.feed(text) - logger.info(f"DEBUG: parser.feed returned") # Send any new thought chunks for i in range(prev_thought_count, len(thought_chunks)): - logger.info(f"DEBUG: Sending thought chunk {i}") # Mark last chunk as final if parser has moved out of THOUGHT state is_last = (i == len(thought_chunks) - 1) is_thought_complete = parser.state.value != "thought" @@ -264,72 +247,52 @@ class AgentManager: # Send any new answer chunks for i in range(prev_answer_count, len(answer_chunks)): - logger.info(f"DEBUG: Sending answer chunk {i}") if answer: await answer(answer_chunks[i]) else: await think(answer_chunks[i]) - logger.info("DEBUG: Getting prompt-request client from context") client = context("prompt-request") - logger.info(f"DEBUG: Got client: {client}") - logger.info("DEBUG: About to call agent_react with streaming=True") # Get streaming response response_text = await client.agent_react( variables=variables, streaming=True, chunk_callback=on_chunk ) - logger.info(f"DEBUG: agent_react returned, got {len(response_text) if response_text else 0} chars") # Finalize parser - logger.info("DEBUG: Finalizing parser") parser.finalize() - logger.info("DEBUG: Parser finalized") # Get result - logger.info("DEBUG: Getting result from parser") result = parser.get_result() if result is None: raise RuntimeError("Parser failed to produce a result") - logger.info(f"Parsed result: {result}") return result else: - logger.info("DEBUG: Entering NON-streaming path") # Non-streaming path - get complete text and parse - logger.info("DEBUG: Getting prompt-request client from context") client = context("prompt-request") - logger.info(f"DEBUG: Got client: {client}") - logger.info("DEBUG: About to call agent_react with streaming=False") response_text = await client.agent_react( variables=variables, streaming=False ) - logger.info(f"DEBUG: agent_react returned, got response") logger.debug(f"Response text:\n{response_text}") - logger.info(f"response: {response_text}") - # Parse the text response try: result = self.parse_react_response(response_text) - logger.info(f"Parsed result: {result}") return result except ValueError as e: logger.error(f"Failed to parse response: {e}") - # Try to provide a helpful error message logger.error(f"Response was: {response_text}") raise RuntimeError(f"Failed to parse agent response: {e}") async def react(self, question, history, think, observe, context, streaming=False, answer=None): - logger.info(f"question: {question}") - act = await self.reason( question = question, history = history, @@ -339,7 +302,6 @@ class AgentManager: observe = observe, answer = answer, ) - logger.info(f"act: {act}") if isinstance(act, Final): @@ -358,16 +320,11 @@ class AgentManager: logger.debug(f"ACTION: {act.name}") - logger.debug(f"Tools: {self.tools.keys()}") - if act.name in self.tools: action = self.tools[act.name] else: - logger.debug(f"Tools: {self.tools}") raise RuntimeError(f"No action for {act.name}!") - logger.debug(f"TOOL>>> {act}") - resp = await action.implementation(context).invoke( **act.arguments ) @@ -378,13 +335,8 @@ class AgentManager: resp = str(resp) resp = resp.strip() - logger.info(f"resp: {resp}") - await observe(resp, is_final=True) act.observation = resp - logger.info(f"iter: {act}") - return act -