Minor agent-orchestrator updates (#746)

Tidy agent-orchestrator logs

Added CLI support for selecting the pattern...

  tg-invoke-agent -q "What is the document about?" -p supervisor -v
  tg-invoke-agent -q "What is the document about?" -p plan-then-execute -v
  tg-invoke-agent -q "What is the document about?" -p react -v

Added new event types to tg-show-explain-trace
This commit is contained in:
cybermaggedon 2026-03-31 13:29:04 +01:00 committed by GitHub
parent 816a8cfcf6
commit 89e13a756a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 152 additions and 75 deletions

View file

@ -999,8 +999,6 @@ class ExplainabilityClient:
trace = { trace = {
"question": None, "question": None,
"steps": [], "steps": [],
"iterations": [], # Backwards compatibility for ReAct
"conclusion": None,
} }
# Fetch question/session # Fetch question/session
@ -1015,11 +1013,6 @@ class ExplainabilityClient:
is_first=True, max_depth=50, is_first=True, max_depth=50,
) )
# Backwards compat: populate iterations from steps
trace["iterations"] = [
s for s in trace["steps"] if isinstance(s, Analysis)
]
return trace return trace
def _follow_provenance_chain( def _follow_provenance_chain(
@ -1081,7 +1074,6 @@ class ExplainabilityClient:
elif isinstance(entity, (Conclusion, Synthesis)): elif isinstance(entity, (Conclusion, Synthesis)):
trace["steps"].append(entity) trace["steps"].append(entity)
trace["conclusion"] = entity
def list_sessions( def list_sessions(
self, self,

View file

@ -267,7 +267,8 @@ def question_explainable(
def question( def question(
url, question, flow_id, user, collection, url, question, flow_id, user, collection,
plan=None, state=None, group=None, verbose=False, streaming=True, plan=None, state=None, group=None, pattern=None,
verbose=False, streaming=True,
token=None, explainable=False, debug=False token=None, explainable=False, debug=False
): ):
# Explainable mode uses the API to capture and process provenance events # Explainable mode uses the API to capture and process provenance events
@ -307,6 +308,8 @@ def question(
request_params["state"] = state request_params["state"] = state
if group is not None: if group is not None:
request_params["group"] = group request_params["group"] = group
if pattern is not None:
request_params["pattern"] = pattern
try: try:
# Call agent # Call agent
@ -430,6 +433,12 @@ def main():
help=f'Agent plan (default: unspecified)' help=f'Agent plan (default: unspecified)'
) )
parser.add_argument(
'-p', '--pattern',
choices=['react', 'plan-then-execute', 'supervisor'],
help='Force execution pattern (default: auto-selected by meta-router)'
)
parser.add_argument( parser.add_argument(
'-s', '--state', '-s', '--state',
help=f'Agent initial state (default: unspecified)' help=f'Agent initial state (default: unspecified)'
@ -478,6 +487,7 @@ def main():
plan = args.plan, plan = args.plan,
state = args.state, state = args.state,
group = args.group, group = args.group,
pattern = args.pattern,
verbose = args.verbose, verbose = args.verbose,
streaming = not args.no_streaming, streaming = not args.no_streaming,
token = args.token, token = args.token,

View file

@ -27,6 +27,10 @@ from trustgraph.api import (
Synthesis, Synthesis,
Analysis, Analysis,
Conclusion, Conclusion,
Decomposition,
Finding,
Plan,
StepResult,
) )
default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/') default_url = os.getenv("TRUSTGRAPH_URL", 'http://localhost:8088/')
@ -297,6 +301,23 @@ def print_docrag_text(trace, explain_client, api, user):
print("No synthesis data found") print("No synthesis data found")
def _print_document_content(explain_client, api, user, document_uri, label="Answer"):
"""Fetch and print document content, or fall back to URI."""
if not document_uri:
return
content = ""
if api:
content = explain_client.fetch_document_content(
document_uri, api, user
)
if content:
print(f"{label}:")
for line in content.split("\n"):
print(f" {line}")
else:
print(f"Document: {document_uri}")
def print_agent_text(trace, explain_client, api, user): def print_agent_text(trace, explain_client, api, user):
"""Print Agent trace in text format.""" """Print Agent trace in text format."""
question = trace.get("question") question = trace.get("question")
@ -310,82 +331,143 @@ def print_agent_text(trace, explain_client, api, user):
print(f"Time: {question.timestamp}") print(f"Time: {question.timestamp}")
print() print()
# Analysis steps # Walk the steps list which contains all entity types
print("--- Analysis ---") steps = trace.get("steps", [])
iterations = trace.get("iterations", [])
if iterations:
for i, analysis in enumerate(iterations, 1):
print(f"Analysis {i}:")
print(f" Thought: {analysis.thought or 'N/A'}")
print(f" Action: {analysis.action or 'N/A'}")
for step in steps:
if analysis.arguments: if isinstance(step, Decomposition):
# Try to pretty-print JSON arguments print("--- Decomposition ---")
print(f"Decomposed into {len(step.goals)} research threads:")
for i, goal in enumerate(step.goals):
print(f" {i}: {goal}")
print()
elif isinstance(step, Finding):
print("--- Finding ---")
print(f"Goal: {step.goal}")
_print_document_content(
explain_client, api, user, step.document, "Result",
)
print()
elif isinstance(step, Plan):
print("--- Plan ---")
print(f"Plan with {len(step.steps)} steps:")
for i, s in enumerate(step.steps):
print(f" {i}: {s}")
print()
elif isinstance(step, StepResult):
print("--- Step Result ---")
print(f"Step: {step.step}")
_print_document_content(
explain_client, api, user, step.document, "Result",
)
print()
elif isinstance(step, Analysis):
print("--- Analysis ---")
print(f" Action: {step.action or 'N/A'}")
if step.arguments:
try: try:
args_obj = json.loads(analysis.arguments) args_obj = json.loads(step.arguments)
args_str = json.dumps(args_obj, indent=4) args_str = json.dumps(args_obj, indent=4)
print(f" Arguments:") print(f" Arguments:")
for line in args_str.split('\n'): for line in args_str.split('\n'):
print(f" {line}") print(f" {line}")
except Exception: except Exception:
print(f" Arguments: {analysis.arguments}") print(f" Arguments: {step.arguments}")
else:
print(f" Arguments: N/A")
obs = analysis.observation or 'N/A' obs = step.observation or 'N/A'
if obs and len(obs) > 200: if obs and len(obs) > 200:
obs = obs[:200] + "... [truncated]" obs = obs[:200] + "... [truncated]"
print(f" Observation: {obs}") print(f" Observation: {obs}")
print() print()
else:
print("No analysis steps recorded")
print()
# Conclusion elif isinstance(step, Synthesis):
print("--- Conclusion ---") print("--- Synthesis ---")
conclusion = trace.get("conclusion") _print_document_content(
if conclusion: explain_client, api, user, step.document, "Answer",
content = ""
if conclusion.document and api:
content = explain_client.fetch_document_content(
conclusion.document, api, user
) )
if content: print()
print("Answer:")
for line in content.split("\n"): elif isinstance(step, Conclusion):
print(f" {line}") print("--- Conclusion ---")
elif conclusion.document: _print_document_content(
print(f"Document: {conclusion.document}") explain_client, api, user, step.document, "Answer",
else: )
print("No conclusion recorded") print()
else:
print("No conclusion recorded") if not steps:
print("No trace steps recorded")
print()
def trace_to_dict(trace, trace_type): def trace_to_dict(trace, trace_type):
"""Convert trace entities to JSON-serializable dict.""" """Convert trace entities to JSON-serializable dict."""
if trace_type == "agent": if trace_type == "agent":
question = trace.get("question") question = trace.get("question")
def _step_to_dict(step):
if isinstance(step, Decomposition):
return {
"type": "decomposition",
"id": step.uri,
"goals": step.goals,
}
elif isinstance(step, Finding):
return {
"type": "finding",
"id": step.uri,
"goal": step.goal,
"document": step.document,
}
elif isinstance(step, Plan):
return {
"type": "plan",
"id": step.uri,
"steps": step.steps,
}
elif isinstance(step, StepResult):
return {
"type": "step-result",
"id": step.uri,
"step": step.step,
"document": step.document,
}
elif isinstance(step, Analysis):
return {
"type": "analysis",
"id": step.uri,
"action": step.action,
"arguments": step.arguments,
"thought": step.thought,
"observation": step.observation,
}
elif isinstance(step, Synthesis):
return {
"type": "synthesis",
"id": step.uri,
"document": step.document,
}
elif isinstance(step, Conclusion):
return {
"type": "conclusion",
"id": step.uri,
"document": step.document,
}
return {"type": step.entity_type, "id": step.uri}
steps = trace.get("steps", [])
return { return {
"type": "agent", "type": "agent",
"session_id": question.uri if question else None, "session_id": question.uri if question else None,
"question": question.query if question else None, "question": question.query if question else None,
"time": question.timestamp if question else None, "time": question.timestamp if question else None,
"iterations": [ "steps": [_step_to_dict(s) for s in steps],
{
"id": a.uri,
"thought": a.thought,
"action": a.action,
"arguments": a.arguments,
"observation": a.observation,
}
for a in trace.get("iterations", [])
],
"conclusion": {
"id": trace["conclusion"].uri,
"document": trace["conclusion"].document,
} if trace.get("conclusion") else None,
} }
elif trace_type == "docrag": elif trace_type == "docrag":
question = trace.get("question") question = trace.get("question")

View file

@ -57,7 +57,7 @@ class Aggregator:
"request_template": request_template, "request_template": request_template,
"created_at": time.time(), "created_at": time.time(),
} }
logger.info( logger.debug(
f"Aggregator: registered fan-out {correlation_id}, " f"Aggregator: registered fan-out {correlation_id}, "
f"expecting {expected_siblings} subagents" f"expecting {expected_siblings} subagents"
) )
@ -82,7 +82,7 @@ class Aggregator:
completed = len(entry["results"]) completed = len(entry["results"])
expected = entry["expected"] expected = entry["expected"]
logger.info( logger.debug(
f"Aggregator: {correlation_id}" f"Aggregator: {correlation_id}"
f"{completed}/{expected} subagents complete" f"{completed}/{expected} subagents complete"
) )

View file

@ -106,7 +106,7 @@ class PatternBase:
) )
await next(completion_request) await next(completion_request)
logger.info( logger.debug(
f"Subagent completion emitted for " f"Subagent completion emitted for "
f"correlation={request.correlation_id}, " f"correlation={request.correlation_id}, "
f"goal={getattr(request, 'subagent_goal', '')}" f"goal={getattr(request, 'subagent_goal', '')}"

View file

@ -60,10 +60,6 @@ class ReactPattern(PatternBase):
filtered_tools = self.filter_tools( filtered_tools = self.filter_tools(
self.processor.agent.tools, request, self.processor.agent.tools, request,
) )
logger.info(
f"Filtered from {len(self.processor.agent.tools)} "
f"to {len(filtered_tools)} available tools"
)
# Create temporary agent with filtered tools and optional framing # Create temporary agent with filtered tools and optional framing
additional_context = self.processor.agent.additional_context additional_context = self.processor.agent.additional_context

View file

@ -414,7 +414,6 @@ class Processor(AgentService):
self.meta_router = MetaRouter(config=config) self.meta_router = MetaRouter(config=config)
logger.info(f"Loaded {len(tools)} tools") logger.info(f"Loaded {len(tools)} tools")
logger.info("Tool configuration reloaded.")
except Exception as e: except Exception as e:
logger.error( logger.error(
@ -436,7 +435,7 @@ class Processor(AgentService):
answer_text = step.observation answer_text = step.observation
break break
logger.info( logger.debug(
f"Received subagent completion: " f"Received subagent completion: "
f"correlation={correlation_id}, goal={subagent_goal}" f"correlation={correlation_id}, goal={subagent_goal}"
) )

View file

@ -550,8 +550,6 @@ class Processor(AgentService):
current_state=getattr(request, 'state', None) current_state=getattr(request, 'state', None)
) )
logger.info(f"Filtered from {len(self.agent.tools)} to {len(filtered_tools)} available tools")
# Create temporary agent with filtered tools # Create temporary agent with filtered tools
temp_agent = AgentManager( temp_agent = AgentManager(
tools=filtered_tools, tools=filtered_tools,

View file

@ -34,7 +34,7 @@ def filter_tools_by_group_and_state(
if current_state is None or current_state == "": if current_state is None or current_state == "":
current_state = "undefined" current_state = "undefined"
logger.info(f"Filtering tools with groups={requested_groups}, state={current_state}") logger.debug(f"Filtering tools with groups={requested_groups}, state={current_state}")
filtered_tools = {} filtered_tools = {}
@ -44,7 +44,7 @@ def filter_tools_by_group_and_state(
else: else:
logger.debug(f"Tool {tool_name} filtered out") logger.debug(f"Tool {tool_name} filtered out")
logger.info(f"Filtered {len(tools)} tools to {len(filtered_tools)} available tools") logger.debug(f"Filtered {len(tools)} tools to {len(filtered_tools)} available tools")
return filtered_tools return filtered_tools