agent-orchestrator improvements (#743)

agent-orchestrator improvements:
- Improve agent trace
- Improve queue dumping
- Fixing supervisor pattern
- Fix synthesis step to remove loop

Minor dev environment improvements:
- Improve queue dump output for JSON
- Reduce dev container rebuild
This commit is contained in:
cybermaggedon 2026-03-31 11:24:30 +01:00 committed by GitHub
parent 81ca7bbc11
commit e65ea217a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 225 additions and 125 deletions

View file

@ -77,8 +77,8 @@ some-containers:
-t ${CONTAINER_BASE}/trustgraph-base:${VERSION} .
${DOCKER} build -f containers/Containerfile.flow \
-t ${CONTAINER_BASE}/trustgraph-flow:${VERSION} .
${DOCKER} build -f containers/Containerfile.unstructured \
-t ${CONTAINER_BASE}/trustgraph-unstructured:${VERSION} .
# ${DOCKER} build -f containers/Containerfile.unstructured \
# -t ${CONTAINER_BASE}/trustgraph-unstructured:${VERSION} .
# ${DOCKER} build -f containers/Containerfile.vertexai \
# -t ${CONTAINER_BASE}/trustgraph-vertexai:${VERSION} .
# ${DOCKER} build -f containers/Containerfile.mcp \

View file

@ -19,43 +19,67 @@ import argparse
from trustgraph.base.subscriber import Subscriber
from trustgraph.base.pubsub import get_pubsub
def decode_json_strings(obj):
"""Recursively decode JSON-encoded string values within a dict/list."""
if isinstance(obj, dict):
return {k: decode_json_strings(v) for k, v in obj.items()}
if isinstance(obj, list):
return [decode_json_strings(v) for v in obj]
if isinstance(obj, str):
try:
parsed = json.loads(obj)
if isinstance(parsed, (dict, list)):
return decode_json_strings(parsed)
except (json.JSONDecodeError, TypeError):
pass
return obj
def to_dict(value):
"""Recursively convert a value to a JSON-serialisable structure."""
if value is None or isinstance(value, (bool, int, float)):
return value
if isinstance(value, bytes):
value = value.decode('utf-8')
if isinstance(value, str):
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return value
if isinstance(value, dict):
return {k: to_dict(v) for k, v in value.items()}
if isinstance(value, (list, tuple)):
return [to_dict(v) for v in value]
# Pulsar schema objects expose fields via __dict__
if hasattr(value, '__dict__'):
return {
k: to_dict(v) for k, v in value.__dict__.items()
if not k.startswith('_')
}
return str(value)
def format_message(queue_name, msg):
"""Format a message with timestamp and queue name."""
timestamp = datetime.now().isoformat()
# Try to parse as JSON and pretty-print
try:
# Handle both Message objects and raw bytes
if hasattr(msg, 'value'):
# Message object with .value() method
value = msg.value()
else:
# Raw bytes from schema-less subscription
value = msg
value = msg.value() if hasattr(msg, 'value') else msg
parsed = to_dict(value)
# If it's bytes, decode it
if isinstance(value, bytes):
value = value.decode('utf-8')
# If it's a string, try to parse as JSON
if isinstance(value, str):
try:
parsed = json.loads(value)
body = json.dumps(parsed, indent=2)
except (json.JSONDecodeError, TypeError):
body = value
else:
# Try to convert to dict for pretty printing
try:
# Pulsar schema objects have __dict__ or similar
if hasattr(value, '__dict__'):
parsed = {k: v for k, v in value.__dict__.items()
if not k.startswith('_')}
else:
parsed = str(value)
# Unwrap nested JSON strings (e.g. terms values)
if isinstance(parsed, (dict, list)):
parsed = decode_json_strings(parsed)
body = json.dumps(parsed, indent=2, default=str)
except (TypeError, AttributeError):
body = str(value)
else:
body = str(parsed)
except Exception as e:
body = f"<Error formatting message: {e}>\n{str(msg)}"

View file

@ -1,10 +1,12 @@
"""
Aggregator monitors the explainability topic for subagent completions
and triggers synthesis when all siblings in a fan-out have completed.
Aggregator tracks in-flight fan-out correlations and triggers
synthesis when all subagents have completed.
The aggregator watches for tg:Conclusion triples that carry a
correlation_id. When it detects that all expected siblings have
completed, it emits a synthesis AgentRequest on the agent request topic.
Subagent completions arrive as AgentRequest messages on the agent
request queue with step_type="subagent-completion". The orchestrator
intercepts these and feeds them to the aggregator. When all expected
siblings for a correlation ID have reported, the aggregator builds
a synthesis request for the supervisor pattern.
"""
import asyncio
@ -87,6 +89,13 @@ class Aggregator:
return completed >= expected
def get_original_request(self, correlation_id):
"""Peek at the stored request template without consuming it."""
entry = self.correlations.get(correlation_id)
if entry is None:
return None
return entry["request_template"]
def get_results(self, correlation_id):
"""Get all results for a correlation and remove the tracking entry."""
entry = self.correlations.pop(correlation_id, None)
@ -138,7 +147,7 @@ class Aggregator:
pattern="supervisor",
task_type=template.task_type if template else "",
framing=template.framing if template else "",
correlation_id=correlation_id,
correlation_id="",
parent_session_id="",
subagent_goal="",
expected_siblings=0,

View file

@ -61,6 +61,47 @@ class PatternBase:
def __init__(self, processor):
self.processor = processor
def is_subagent(self, request):
"""Check if this request is running as a subagent of a supervisor."""
return bool(getattr(request, 'correlation_id', ''))
async def emit_subagent_completion(self, request, next, answer_text):
"""Signal completion back to the orchestrator via the agent request
queue. Instead of sending the final answer to the client, send a
completion message so the aggregator can collect it."""
completion_step = AgentStep(
thought="Subagent completed",
action="complete",
arguments={},
observation=answer_text,
step_type="subagent-completion",
)
completion_request = AgentRequest(
question=request.question,
state="",
group=getattr(request, 'group', []),
history=[completion_step],
user=request.user,
collection=getattr(request, 'collection', 'default'),
streaming=False,
session_id=getattr(request, 'session_id', ''),
conversation_id=getattr(request, 'conversation_id', ''),
pattern="",
correlation_id=request.correlation_id,
parent_session_id=getattr(request, 'parent_session_id', ''),
subagent_goal=getattr(request, 'subagent_goal', ''),
expected_siblings=getattr(request, 'expected_siblings', 0),
)
await next(completion_request)
logger.info(
f"Subagent completion emitted for "
f"correlation={request.correlation_id}, "
f"goal={getattr(request, 'subagent_goal', '')}"
)
def filter_tools(self, tools, request):
"""Apply group/state filtering to the tool set."""
return filter_tools_by_group_and_state(

View file

@ -344,6 +344,10 @@ class PlanThenExecutePattern(PatternBase):
flow, session_id, iteration_num, session_uri,
response_text, request, respond, streaming,
)
if self.is_subagent(request):
await self.emit_subagent_completion(request, next, response_text)
else:
await self.send_final_response(
respond, streaming, response_text, already_streamed=streaming,
)

View file

@ -106,6 +106,9 @@ class ReactPattern(PatternBase):
f, request, respond, streaming,
)
if self.is_subagent(request):
await self.emit_subagent_completion(request, next, f)
else:
await self.send_final_response(
respond, streaming, f, already_streamed=streaming,
)

View file

@ -422,9 +422,74 @@ class Processor(AgentService):
)
logger.error("Configuration reload failed")
async def _handle_subagent_completion(self, request, respond, next, flow):
"""Handle a subagent completion by feeding it to the aggregator."""
correlation_id = request.correlation_id
subagent_goal = getattr(request, 'subagent_goal', '')
# Extract the answer from the completion step
answer_text = ""
for step in request.history:
if getattr(step, 'step_type', '') == 'subagent-completion':
answer_text = step.observation
break
logger.info(
f"Received subagent completion: "
f"correlation={correlation_id}, goal={subagent_goal}"
)
all_done = self.aggregator.record_completion(
correlation_id, subagent_goal, answer_text
)
if all_done is None:
logger.warning(
f"Unknown correlation_id {correlation_id}"
f"possibly timed out or duplicate"
)
return
if all_done:
logger.info(
f"All subagents complete for {correlation_id}, "
f"dispatching synthesis"
)
template = self.aggregator.get_original_request(correlation_id)
if template is None:
logger.error(
f"No template for correlation {correlation_id}"
)
return
synthesis_request = self.aggregator.build_synthesis_request(
correlation_id,
original_question=template.question,
user=template.user,
collection=getattr(template, 'collection', 'default'),
)
await next(synthesis_request)
async def agent_request(self, request, respond, next, flow):
try:
# Intercept subagent completion messages
correlation_id = getattr(request, 'correlation_id', '')
if correlation_id and request.history:
is_completion = any(
getattr(h, 'step_type', '') == 'subagent-completion'
for h in request.history
)
if is_completion:
await self._handle_subagent_completion(
request, respond, next, flow
)
return
pattern = getattr(request, 'pattern', '') or ''
# If no pattern set and this is the first iteration, route

View file

@ -57,11 +57,8 @@ class SupervisorPattern(PatternBase):
has_results = bool(
request.history
and any(
getattr(h, 'step_type', '') == 'decompose'
for h in request.history
)
and any(
getattr(h, 'subagent_results', None)
getattr(h, 'step_type', '') == 'synthesise'
and getattr(h, 'subagent_results', None)
for h in request.history
)
)
@ -159,9 +156,14 @@ class SupervisorPattern(PatternBase):
await next(sub_request)
logger.info(f"Fan-out: emitted subagent {i} for goal: {goal}")
# NOTE: The supervisor stops here. The aggregator will detect
# when all subagents complete and emit a synthesis request
# with the results populated.
# Register with aggregator for fan-in tracking
self.processor.aggregator.register_fanout(
correlation_id=correlation_id,
parent_session_id=session_id,
expected_siblings=len(goals),
request_template=request,
)
logger.info(
f"Supervisor fan-out complete: {len(goals)} subagents, "
f"correlation_id={correlation_id}"

View file

@ -176,15 +176,10 @@ class AgentManager:
tools = self.tools
logger.debug("in reason")
logger.debug(f"tools: {tools}")
tool_names = ",".join([
t for t in self.tools.keys()
])
logger.debug(f"Tool names: {tool_names}")
variables = {
"question": question,
"tools": [
@ -218,17 +213,10 @@ class AgentManager:
logger.debug(f"Variables: {json.dumps(variables, indent=4)}")
logger.info(f"prompt: {variables}")
logger.info(f"DEBUG: streaming={streaming}, think={think is not None}")
# Streaming path - use StreamingReActParser
if streaming and think:
logger.info("DEBUG: Entering streaming path")
from .streaming_parser import StreamingReActParser
logger.info("DEBUG: Creating StreamingReActParser")
# Collect chunks to send via async callbacks
thought_chunks = []
answer_chunks = []
@ -238,24 +226,19 @@ class AgentManager:
on_thought_chunk=lambda chunk: thought_chunks.append(chunk),
on_answer_chunk=lambda chunk: answer_chunks.append(chunk),
)
logger.info("DEBUG: StreamingReActParser created")
# Create async chunk callback that feeds parser and sends collected chunks
async def on_chunk(text, end_of_stream):
logger.info(f"DEBUG: on_chunk called with {len(text)} chars, end_of_stream={end_of_stream}")
# Track what we had before
prev_thought_count = len(thought_chunks)
prev_answer_count = len(answer_chunks)
# Feed the parser (synchronous)
logger.info(f"DEBUG: About to call parser.feed")
parser.feed(text)
logger.info(f"DEBUG: parser.feed returned")
# Send any new thought chunks
for i in range(prev_thought_count, len(thought_chunks)):
logger.info(f"DEBUG: Sending thought chunk {i}")
# Mark last chunk as final if parser has moved out of THOUGHT state
is_last = (i == len(thought_chunks) - 1)
is_thought_complete = parser.state.value != "thought"
@ -264,72 +247,52 @@ class AgentManager:
# Send any new answer chunks
for i in range(prev_answer_count, len(answer_chunks)):
logger.info(f"DEBUG: Sending answer chunk {i}")
if answer:
await answer(answer_chunks[i])
else:
await think(answer_chunks[i])
logger.info("DEBUG: Getting prompt-request client from context")
client = context("prompt-request")
logger.info(f"DEBUG: Got client: {client}")
logger.info("DEBUG: About to call agent_react with streaming=True")
# Get streaming response
response_text = await client.agent_react(
variables=variables,
streaming=True,
chunk_callback=on_chunk
)
logger.info(f"DEBUG: agent_react returned, got {len(response_text) if response_text else 0} chars")
# Finalize parser
logger.info("DEBUG: Finalizing parser")
parser.finalize()
logger.info("DEBUG: Parser finalized")
# Get result
logger.info("DEBUG: Getting result from parser")
result = parser.get_result()
if result is None:
raise RuntimeError("Parser failed to produce a result")
logger.info(f"Parsed result: {result}")
return result
else:
logger.info("DEBUG: Entering NON-streaming path")
# Non-streaming path - get complete text and parse
logger.info("DEBUG: Getting prompt-request client from context")
client = context("prompt-request")
logger.info(f"DEBUG: Got client: {client}")
logger.info("DEBUG: About to call agent_react with streaming=False")
response_text = await client.agent_react(
variables=variables,
streaming=False
)
logger.info(f"DEBUG: agent_react returned, got response")
logger.debug(f"Response text:\n{response_text}")
logger.info(f"response: {response_text}")
# Parse the text response
try:
result = self.parse_react_response(response_text)
logger.info(f"Parsed result: {result}")
return result
except ValueError as e:
logger.error(f"Failed to parse response: {e}")
# Try to provide a helpful error message
logger.error(f"Response was: {response_text}")
raise RuntimeError(f"Failed to parse agent response: {e}")
async def react(self, question, history, think, observe, context, streaming=False, answer=None):
logger.info(f"question: {question}")
act = await self.reason(
question = question,
history = history,
@ -339,7 +302,6 @@ class AgentManager:
observe = observe,
answer = answer,
)
logger.info(f"act: {act}")
if isinstance(act, Final):
@ -358,16 +320,11 @@ class AgentManager:
logger.debug(f"ACTION: {act.name}")
logger.debug(f"Tools: {self.tools.keys()}")
if act.name in self.tools:
action = self.tools[act.name]
else:
logger.debug(f"Tools: {self.tools}")
raise RuntimeError(f"No action for {act.name}!")
logger.debug(f"TOOL>>> {act}")
resp = await action.implementation(context).invoke(
**act.arguments
)
@ -378,13 +335,8 @@ class AgentManager:
resp = str(resp)
resp = resp.strip()
logger.info(f"resp: {resp}")
await observe(resp, is_final=True)
act.observation = resp
logger.info(f"iter: {act}")
return act