Add multi-pattern orchestrator with plan-then-execute and supervisor

Introduce an agent orchestrator service that supports three
execution patterns (ReAct, plan-then-execute, supervisor) with
LLM-based meta-routing to select the appropriate pattern and task
type per request. Update the agent schema to support
orchestration fields (correlation, sub-agents, plan steps) and
remove legacy response fields (answer, thought, observation).
This commit is contained in:
Cyber MacGeddon 2026-03-23 19:46:44 +00:00
parent 7af1d60db8
commit ade5606a92
21 changed files with 3006 additions and 172 deletions

View file

@ -0,0 +1,939 @@
# TrustGraph Agent Orchestration — Technical Specification
## Overview
This specification describes the extension of TrustGraph's agent architecture
from a single ReACT execution pattern to a multi-pattern orchestration
model. The existing Pulsar-based self-queuing loop is pattern-agnostic — the
same infrastructure supports ReACT, Plan-then-Execute, Supervisor/Subagent
fan-out, and other execution strategies without changes to the message
transport. The extension adds a routing layer that selects the appropriate
pattern for each task, a set of pattern implementations that share common
iteration infrastructure, and a fan-out/fan-in mechanism for multi-agent
coordination.
The central design principle is that
**trust and explainability are structural properties of the architecture**,
achieved by constraining LLM decisions to
graph-defined option sets and recording those constraints in the execution
trace.
---
## Background
### Existing Architecture
The current agent manager is built on the ReACT pattern (Reasoning + Acting)
with these properties:
- **Self-queuing loop**: Each iteration emits a new Pulsar message carrying
the accumulated history. The agent manager picks this up and runs the next
iteration.
- **Stateless agent manager**: No in-process state. All state lives in the
message payload.
- **Natural parallelism**: Multiple independent agent requests are handled
concurrently across Pulsar consumers.
- **Durability**: Crash recovery is inherent — the message survives process
failure.
- **Real-time feedback**: Streaming thought, action, observation and answer
chunks are emitted as iterations complete.
- **Tool calling and MCP invocation**: Tool calls into knowledge graphs,
external services, and MCP-connected systems.
- **Decision traces written to the knowledge graph**: Every iteration records
PROV-O triples — session, analysis, and conclusion entities — forming the
basis of explainability.
### Current Message Flow
```
AgentRequest arrives (question, history=[], state, group, session_id)
Filter tools by group/state
AgentManager.react() → LLM call → parse → Action or Final
│ │
│ [Action] │ [Final]
▼ ▼
Execute tool, capture observation Emit conclusion triples
Emit iteration triples Send AgentResponse
Append to history (end_of_dialog=True)
Emit new AgentRequest → "next" topic
└── (picked up again by consumer, loop continues)
```
The key insight is that this loop structure is not ReACT-specific. The
plumbing — receive message, do work, emit next message — is the same
regardless of what the "work" step does. The payload and the pattern logic
define the behaviour; the infrastructure remains constant.
### Current Limitations
- Only one execution pattern (ReACT) is available regardless of task
characteristics.
- No mechanism for one agent to spawn and coordinate subagents.
- Pattern selection is implicit — every task gets the same treatment.
- The provenance model assumes a linear iteration chain (analysis N derives
from analysis N-1), with no support for parallel branches.
---
## Design Goals
- **Pattern-agnostic iteration infrastructure**: The self-queuing loop, tool
filtering, provenance emission, and streaming feedback should be shared
across all patterns.
- **Graph-constrained pattern selection**: The LLM selects patterns from a
graph-defined set, not from unconstrained reasoning. This makes the
selection auditable and explainable.
- **Genuinely parallel fan-out**: Subagent tasks execute concurrently on the
Pulsar queue, not sequentially in a single process.
- **Stateless coordination**: Fan-in uses the knowledge graph as coordination
substrate. The agent manager remains stateless.
- **Additive change**: The existing ReACT flow continues to work
unchanged. New patterns are added alongside it, not in place of it.
---
## Patterns
### ReACT as One Pattern Among Many
ReACT is one point in a wider space of agent execution strategies:
| Pattern | Structure | Strengths |
|---|---|---|
| **ReACT** | Interleaved reasoning and action | Adaptive, good for open-ended tasks |
| **Plan-then-Execute** | Decompose into a step DAG, then execute | More predictable, auditable plan |
| **Reflexion** | ReACT + self-critique after each action | Agents improve within the episode |
| **Supervisor/Subagent** | One agent orchestrates others | Parallel decomposition, synthesis |
| **Debate/Ensemble** | Multiple agents reason independently | Diverse perspectives, reconciliation |
| **LLM-as-router** | No reasoning loop, pure dispatch | Fast classification and routing |
Not all of these need to be implemented at once. The architecture should
support them; the initial implementation delivers ReACT (already exists),
Plan-then-Execute, and Supervisor/Subagent.
### Pattern Storage
Patterns are stored as configuration items via the config API. They are
finite in number, mechanically well-defined, have enumerable properties,
and change slowly. Each pattern is a JSON object stored under the
`agent-pattern` config type.
```json
Config type: "agent-pattern"
Config key: "react"
Value: {
"name": "react",
"description": "ReACT — Reasoning + Acting",
"when_to_use": "Adaptive, good for open-ended tasks"
}
```
These are written at deployment time and change rarely. If the architecture
later benefits from graph-based pattern storage (e.g. for richer ontological
relationships), the config items can be migrated to graph nodes — the
meta-router's selection logic is the same regardless of backend.
---
## Task Types
### What a Task Type Represents
A **task type** characterises the problem domain — what the agent is being
asked to accomplish, and how a domain expert would frame it analytically.
- Carries domain-specific methodology (e.g. "intelligence analysis always
applies structured analytic techniques")
- Pre-populates initial reasoning context via a framing prompt
- Constrains which patterns are valid for this class of problem
- Can define domain-specific termination criteria
### Identification
Task types are identified from plain-text task descriptions by the
LLM. Building a formal ontology over task descriptions is premature — natural
language is too varied and context-dependent. The LLM reads the description;
the graph provides the structure downstream.
### Task Type Storage
Task types are stored as configuration items via the config API under the
`agent-task-type` config type. Each task type is a JSON object that
references valid patterns by name.
```json
Config type: "agent-task-type"
Config key: "risk-assessment"
Value: {
"name": "risk-assessment",
"description": "Due Diligence / Risk Assessment",
"framing_prompt": "Analyse across financial, reputational, legal and operational dimensions using structured analytic techniques.",
"valid_patterns": ["supervisor", "plan-then-execute", "react"],
"when_to_use": "Multi-dimensional analysis requiring structured assessment"
}
```
The `valid_patterns` list defines the constrained decision space — the LLM
can only select patterns that the task type's configuration says are valid.
This is the many-to-many relationship between task types and patterns,
expressed as configuration rather than graph edges.
### Selection Flow
```
Task Description (plain text, from AgentRequest.question)
│ [LLM interprets, constrained by available task types from config]
Task Type (config item — domain framing and methodology)
│ [config lookup — valid_patterns list]
Pattern Candidates (config items)
│ [LLM selects within constrained set,
│ informed by task description signals:
│ complexity, urgency, scope]
Selected Pattern
```
The task description may carry modulating signals (complexity, urgency, scope)
that influence which pattern is selected within the constrained set. But the
raw description never directly selects a pattern — it always passes through
the task type layer first.
---
## Explainability Through Constrained Decision Spaces
A central principle of TrustGraph's explainability architecture is that
**explainability comes from constrained decision spaces**.
When a decision is made from an unconstrained space — a raw LLM call with no
guardrails — the reasoning is opaque even if the LLM produces a rationale,
because that rationale is post-hoc and unverifiable.
When a decision is made from a **constrained set defined in configuration**,
you can always answer:
- What valid options were available
- What criteria narrowed the set
- What signal made the final selection within that set
This principle already governs the existing decision trace architecture and
extends naturally to pattern selection. The routing decision — which task type
and which pattern — is itself recorded as a provenance node, making the first
decision in the execution trace auditable.
**Trust becomes a structural property of the architecture, not a claimed
property of the model.**
---
## Orchestration Architecture
### The Meta-Router
The meta-router is the entry point for all agent requests. It runs as a
pre-processing step before the pattern-specific iteration loop begins. Its
job is to determine the task type and select the execution pattern.
**When it runs**: On receipt of an `AgentRequest` with empty history (i.e. a
new task, not a continuation). Requests with non-empty history are already
mid-iteration and bypass the meta-router.
**What it does**:
1. Lists all available task types from the config API
(`config.list("agent-task-type")`).
2. Presents these to the LLM alongside the task description. The LLM
identifies which task type applies (or "general" as a fallback).
3. Reads the selected task type's configuration to get the `valid_patterns`
list.
4. Loads the candidate pattern definitions from config and presents them to
the LLM. The LLM selects one, influenced by signals in the task
description (complexity, number of independent dimensions, urgency).
5. Records the routing decision as a provenance node (see Provenance Model
below).
6. Populates the `AgentRequest` with the selected pattern, task type framing
prompt, and any pattern-specific configuration, then emits it onto the
queue.
**Where it lives**: The meta-router is a phase within the agent-orchestrator,
not a separate service. The agent-orchestrator is a new executable that
uses the same service identity as the existing agent-manager-react, making
it a drop-in replacement on the same Pulsar queues. It includes the full
ReACT implementation alongside the new orchestration patterns. The
distinction between "route" and "iterate" is determined by whether the
request already has a pattern set.
### Pattern Dispatch
Once the meta-router has annotated the request with a pattern, the agent
manager dispatches to the appropriate pattern implementation. This is a
straightforward branch on the pattern field:
```
request arrives
├── history is empty → meta-router → annotate with pattern → re-emit
└── history is non-empty (or pattern is set)
├── pattern = "react" → ReACT iteration
├── pattern = "plan-then-execute" → PtE iteration
├── pattern = "supervisor" → Supervisor iteration
└── (no pattern) → ReACT iteration (default)
```
Each pattern implementation follows the same contract: receive a request, do
one iteration of work, then either emit a "next" message (continue) or emit a
response (done). The self-queuing loop doesn't change.
### Pattern Implementations
#### ReACT (Existing)
No changes. The existing `AgentManager.react()` path continues to work
as-is.
#### Plan-then-Execute
Two-phase pattern:
**Planning phase** (first iteration):
- LLM receives the question plus task type framing.
- Produces a structured plan: an ordered list of steps, each with a goal,
expected tool, and dependencies on prior steps.
- The plan is recorded in the history as a special "plan" step.
- Emits a "next" message to begin execution.
**Execution phase** (subsequent iterations):
- Reads the plan from history.
- Identifies the next unexecuted step.
- Executes that step (tool call + observation), similar to a single ReACT
action.
- Records the result against the plan step.
- If all steps complete, synthesises a final answer.
- If a step fails or produces unexpected results, the LLM can revise the
remaining plan (bounded re-planning, not a full restart).
The plan lives in the history, so it travels with the message. No external
state is needed.
#### Supervisor/Subagent
The supervisor pattern introduces fan-out and fan-in. This is the most
architecturally significant addition.
**Supervisor planning iteration**:
- LLM receives the question plus task type framing.
- Decomposes the task into independent subagent goals.
- For each subagent, emits a new `AgentRequest` with:
- A focused question (the subagent's specific goal)
- A shared correlation ID tying it to the parent task
- The subagent's own pattern (typically ReACT, but could be anything)
- Relevant context sliced from the parent request
**Subagent execution**:
- Each subagent request is picked up by an agent manager instance and runs its
own independent iteration loop.
- Subagents are ordinary agent executions — they self-queue, use tools, emit
provenance, stream feedback.
- When a subagent reaches a Final answer, it writes a completion record to the
knowledge graph under the shared correlation ID.
**Fan-in and synthesis**:
- An aggregator detects when all sibling subagents for a correlation ID have
completed.
- It emits a synthesis request to the supervisor carrying the correlation ID.
- The supervisor queries the graph for subagent results, reasons across them,
and decides whether to emit a final answer or iterate again.
**Supervisor re-iteration**:
- After synthesis, the supervisor may determine that the results are
incomplete, contradictory, or reveal gaps requiring further investigation.
- Rather than emitting a final answer, it can fan out again with new or
refined subagent goals under a new correlation ID. This is the same
self-queuing loop — the supervisor emits new subagent requests and stops,
the aggregator detects completion, and synthesis runs again.
- The supervisor's iteration count (planning + synthesis rounds) is bounded
to prevent unbounded looping.
This is detailed further in the Fan-Out / Fan-In section below.
---
## Message Schema Evolution
### Shared Schema Principle
The `AgentRequest` and `AgentResponse` schemas are the shared contract
between the agent-manager (existing ReACT execution) and the
agent-orchestrator (meta-routing, supervisor, plan-then-execute). Both
services consume from the same *agent request* topic using the same
schema. Any schema changes must be reflected in both — the schema is
the integration point, not the service implementation.
This means the orchestrator does not introduce separate message types for
its own use. Subagent requests, synthesis triggers, and meta-router
outputs are all `AgentRequest` messages with different field values. The
agent-manager ignores orchestration fields it doesn't use.
### New Fields
The `AgentRequest` schema needs new fields to carry orchestration
metadata.
```python
@dataclass
class AgentRequest:
# Existing fields (unchanged)
question: str = ""
state: str = ""
group: list[str] | None = None
history: list[AgentStep] = field(default_factory=list)
user: str = ""
collection: str = "default"
streaming: bool = False
session_id: str = ""
# New orchestration fields
conversation_id: str = "" # Optional caller-generated ID grouping related requests
pattern: str = "" # "react", "plan-then-execute", "supervisor", ""
task_type: str = "" # Identified task type name
framing: str = "" # Task type framing prompt injected into LLM context
correlation_id: str = "" # Shared ID linking subagents to parent
parent_session_id: str = "" # Parent's session_id (for subagents)
subagent_goal: str = "" # Focused goal for this subagent
expected_siblings: int = 0 # How many sibling subagents exist
```
The `AgentStep` schema also extends to accommodate non-ReACT iteration types:
```python
@dataclass
class AgentStep:
# Existing fields (unchanged)
thought: str = ""
action: str = ""
arguments: dict[str, str] = field(default_factory=dict)
observation: str = ""
user: str = ""
# New fields
step_type: str = "" # "react", "plan", "execute", "supervise", "synthesise"
plan: list[PlanStep] | None = None # For plan-then-execute: the structured plan
subagent_results: dict | None = None # For supervisor: collected subagent outputs
```
The `PlanStep` structure for Plan-then-Execute:
```python
@dataclass
class PlanStep:
goal: str = "" # What this step should accomplish
tool_hint: str = "" # Suggested tool (advisory, not binding)
depends_on: list[int] = field(default_factory=list) # Indices of prerequisite steps
status: str = "pending" # "pending", "complete", "failed", "revised"
result: str = "" # Observation from execution
```
---
## Fan-Out and Fan-In
### Why This Matters
Fan-out is the mechanism that makes multi-agent coordination genuinely
parallel rather than simulated. With Pulsar, emitting multiple messages means
multiple consumers can pick them up concurrently. This is not threading or
async simulation — it is real distributed parallelism across agent manager
instances.
### Fan-Out: Supervisor Emits Subagent Requests
When a supervisor iteration decides to decompose a task, it:
1. Generates a **correlation ID** — a UUID that groups the sibling subagents.
2. For each subagent, constructs a new `AgentRequest`:
- `question` = the subagent's focused goal (from `subagent_goal`)
- `correlation_id` = the shared correlation ID
- `parent_session_id` = the supervisor's session_id
- `pattern` = typically "react", but the supervisor can specify any pattern
- `session_id` = a new unique ID for this subagent's own provenance chain
- `expected_siblings` = total number of sibling subagents
- `history` = empty (fresh start, but framing context inherited)
- `group`, `user`, `collection` = inherited from parent
3. Emits each subagent request onto the agent request topic.
4. Records the fan-out decision in the provenance graph (see below).
The supervisor then **stops**. It does not wait. It does not poll. It has
emitted its messages and its iteration is complete. The graph and the
aggregator handle the rest.
### Fan-In: Graph-Based Completion Detection
When a subagent reaches its Final answer, it writes a **completion node** to
the knowledge graph:
```
Completion node:
rdf:type tg:SubagentCompletion
tg:correlationId <shared correlation ID>
tg:subagentSessionId <this subagent's session_id>
tg:parentSessionId <supervisor's session_id>
tg:subagentGoal <what this subagent was asked to do>
tg:result → <document URI in librarian>
prov:wasGeneratedBy → <this subagent's conclusion entity>
```
The **aggregator** is a component that watches for completion nodes. When it
detects that all expected siblings for a correlation ID have written
completion nodes, it:
1. Collects all sibling results from the graph and librarian.
2. Constructs a **synthesis request** — a new `AgentRequest` addressed to the supervisor flow:
- `session_id` = the original supervisor's session_id
- `pattern` = "supervisor"
- `step_type` = "synthesise" (carried in history)
- `subagent_results` = the collected findings
- `history` = the supervisor's history up to the fan-out point, plus the synthesis step
3. Emits this onto the agent request topic.
The supervisor picks this up, reasons across the aggregated findings, and
produces its final answer.
### Aggregator Design
The aggregator is event-driven, consistent with TrustGraph's Pulsar-based
architecture. Polling would be an anti-pattern in a system where all
coordination is message-driven.
**Mechanism**: The aggregator is a Pulsar consumer on the explainability
topic. Subagent completion nodes are emitted as triples on this topic as
part of the existing provenance flow. When the aggregator receives a
`tg:SubagentCompletion` triple, it:
1. Extracts the `tg:correlationId` from the completion node.
2. Queries the graph to count how many siblings for that correlation ID
have completed.
3. If all `expected_siblings` are present, triggers fan-in immediately —
collects results and emits the synthesis request.
**State**: The aggregator is stateless in the same sense as the agent
manager — it holds no essential in-memory state. The graph is the source
of truth for completion counts. If the aggregator restarts, it can
re-process unacknowledged completion messages from Pulsar and re-check the
graph. No coordination state is lost.
**Consistency**: Because the completion check queries the graph rather than
relying on an in-memory counter, the aggregator is tolerant of duplicate
messages, out-of-order delivery, and restarts. The graph query is
idempotent — asking "are all siblings complete?" gives the same answer
regardless of how many times or in what order the events arrive.
### Timeout and Failure
- **Subagent timeout**: The aggregator records the timestamp of the first
sibling completion (from the graph). A periodic timeout check (the one
concession to polling — but over local state, not the graph) detects
stalled correlation IDs. If `expected_siblings` completions are not
reached within a configurable timeout, the aggregator emits a partial
synthesis request with whatever results are available, flagging the
incomplete subagents.
- **Subagent failure**: If a subagent errors out, it writes an error
completion node (with `tg:status = "error"` and an error message). The
aggregator treats this as a completion — the supervisor receives the error
in its synthesis input and can reason about partial results.
- **Supervisor iteration limit**: The supervisor's own iteration count
(planning + synthesis) is bounded by `max_iterations` just like any other
pattern.
---
## Provenance Model Extensions
### Routing Decision
The meta-router's task type and pattern selection is recorded as the first
provenance node in the session:
```
Routing node:
rdf:type prov:Entity, tg:RoutingDecision
prov:wasGeneratedBy → session (Question) activity
tg:taskType → TaskType node URI
tg:selectedPattern → Pattern node URI
tg:candidatePatterns → [Pattern node URIs] (what was available)
tg:routingRationale → document URI in librarian (LLM's reasoning)
```
This captures the constrained decision space: what candidates existed, which
was selected, and why. The candidates are graph-derived; the rationale is
LLM-generated but verifiable against the candidates.
### Fan-Out Provenance
When a supervisor fans out, the provenance records the decomposition:
```
FanOut node:
rdf:type prov:Entity, tg:FanOut
prov:wasDerivedFrom → supervisor's routing or planning iteration
tg:correlationId <correlation ID>
tg:subagentGoals → [document URIs for each subagent goal]
tg:expectedSiblings <count>
```
Each subagent's provenance chain is independent (its own session, iterations,
conclusion) but linked back to the parent via:
```
Subagent session:
rdf:type prov:Activity, tg:Question, tg:AgentQuestion
tg:parentCorrelationId <correlation ID>
tg:parentSessionId <supervisor session URI>
```
### Fan-In Provenance
The synthesis step links back to all subagent conclusions:
```
Synthesis node:
rdf:type prov:Entity, tg:Synthesis
prov:wasDerivedFrom → [all subagent Conclusion entities]
tg:correlationId <correlation ID>
```
This creates a DAG in the provenance graph: the supervisor's routing fans out
to N parallel subagent chains, which fan back in to a synthesis node. The
entire multi-agent execution is traceable from a single correlation ID.
### URI Scheme
Extending the existing `urn:trustgraph:agent:{session_id}` pattern:
| Entity | URI Pattern |
|---|---|
| Session (existing) | `urn:trustgraph:agent:{session_id}` |
| Iteration (existing) | `urn:trustgraph:agent:{session_id}/i{n}` |
| Conclusion (existing) | `urn:trustgraph:agent:{session_id}/answer` |
| Routing decision | `urn:trustgraph:agent:{session_id}/routing` |
| Fan-out record | `urn:trustgraph:agent:{session_id}/fanout/{correlation_id}` |
| Subagent completion | `urn:trustgraph:agent:{session_id}/completion` |
---
## Storage Responsibilities
Pattern and task type definitions live in the config API. Runtime state and
provenance live in the knowledge graph. The division is:
| Role | Storage | When Written | Content |
|---|---|---|---|
| Pattern definitions | Config API | At design time | Pattern properties, descriptions |
| Task type definitions | Config API | At design time | Domain framing, valid pattern lists |
| Routing decision trace | Knowledge graph | At request arrival | Why this task type and pattern were selected |
| Iteration decision trace | Knowledge graph | During execution | Each think/act/observe cycle, per existing model |
| Fan-out coordination | Knowledge graph | During fan-out | Subagent goals, correlation ID, expected count |
| Subagent completion | Knowledge graph | During fan-in | Per-subagent results under shared correlation ID |
| Execution audit trail | Knowledge graph | Post-execution | Full multi-agent reasoning trace as a DAG |
The config API holds the definitions that constrain decisions. The knowledge
graph holds the runtime decisions and their provenance. The fan-in
coordination state is part of the provenance automatically — subagent
completion nodes are both coordination signals and audit trail entries.
---
## Worked Example: Partner Risk Assessment
**Request**: "Assess the risk profile of Company X as a potential partner"
**1. Request arrives** on the *agent request* topic with empty history.
The agent manager picks it up.
**2. Meta-router**:
- Queries config API, finds task types: *Risk Assessment*, *Research*,
*Summarisation*, *General*.
- LLM identifies *Risk Assessment*. Framing prompt loaded: "analyse across
financial, reputational, legal and operational dimensions using structured
analytic techniques."
- Valid patterns for *Risk Assessment*: [*Supervisor/Subagent*,
*Plan-then-Execute*, *ReACT*].
- LLM selects *Supervisor/Subagent* — task has four independent investigative
dimensions, well-suited to parallel decomposition.
- Routing decision written to graph. Request re-emitted on the
*agent request* topic with `pattern="supervisor"`, framing populated.
**3. Supervisor iteration** (picked up from *agent request* topic):
- LLM receives question + framing. Reasons that four independent investigative
threads are required.
- Generates correlation ID `corr-abc123`.
- Emits four subagent requests on the *agent request* topic:
- Financial analysis (pattern="react", subagent_goal="Analyse financial
health and stability of Company X")
- Legal analysis (pattern="react", subagent_goal="Review regulatory filings,
sanctions, and legal exposure for Company X")
- Reputational analysis (pattern="react", subagent_goal="Analyse news
sentiment and public reputation of Company X")
- Operational analysis (pattern="react", subagent_goal="Assess supply chain
dependencies and operational risks for Company X")
- Fan-out node written to graph.
**4. Four subagents run in parallel** (each picked up from the *agent
request* topic by agent manager instances), each as an independent ReACT
loop:
- Financial — queries financial data services and knowledge graph
relationships
- Legal — searches regulatory filings and sanctions lists
- Reputational — searches news, analyses sentiment
- Operational — queries supply chain databases
Each self-queues its iterations on the *agent request* topic. Each writes
its own decision trace to the graph as it progresses. Each completes
independently.
**5. Fan-in**:
- Each subagent writes a `tg:SubagentCompletion` node to the graph on
completion, emitted on the *explainability* topic. The completion node
references the subagent's result document in the librarian.
- Aggregator (consuming the *explainability* topic) sees each completion
event. It queries the graph for the fan-out node to get the expected
sibling count, then checks how many completions exist for
`corr-abc123`.
- When all four siblings are complete, the aggregator emits a synthesis
request on the *agent request* topic with the correlation ID. It does
not fetch or bundle subagent results — the supervisor will query the
graph for those.
**6. Supervisor synthesis** (picked up from *agent request* topic):
- Receives the synthesis trigger carrying the correlation ID.
- Queries the graph for `tg:SubagentCompletion` nodes under
`corr-abc123`, retrieving each subagent's goal and result document
reference.
- Fetches the result documents from the librarian.
- Reasons across all four dimensions, produces a structured risk
assessment with confidence scores.
- Emits final answer on the *agent response* topic and writes conclusion
provenance to the graph.
**7. Response delivered** — the supervisor's synthesis streams on the
*agent response* topic as the LLM generates it, with `end_of_dialog`
on the final chunk. The collated answer is saved to the librarian and
referenced from conclusion provenance in the graph. The graph now holds
a complete, human-readable trace of the entire multi-agent execution —
from pattern selection through four parallel investigations to final
synthesis.
---
## Class Hierarchy
The agent-orchestrator executable (`agent-orchestrator`) uses the same
service identity as agent-manager-react, making it a drop-in replacement.
The pattern dispatch model suggests a class hierarchy where shared iteration
infrastructure lives in a base class and pattern-specific logic is in
subclasses:
```
AgentService (base — Pulsar consumer/producer specs, request handling)
└── Processor (agent-orchestrator service)
├── MetaRouter — task type identification, pattern selection
├── PatternBase — shared: tool filtering, provenance, streaming, history
│ ├── ReactPattern — existing ReACT logic (extract from current AgentManager)
│ ├── PlanThenExecutePattern — plan phase + execute phase
│ └── SupervisorPattern — fan-out, synthesis
└── Aggregator — fan-in completion detection
```
`PatternBase` captures what is currently spread across `Processor` and
`AgentManager`: tool filtering, LLM invocation, provenance triple emission,
streaming callbacks, history management. The pattern subclasses implement only
the decision logic specific to their execution strategy — what to do with the
LLM output, when to terminate, whether to fan out.
This refactoring is not strictly necessary for the first iteration — the
meta-router and pattern dispatch could be added as branches within the
existing `Processor.agent_request()` method. But the class hierarchy clarifies
where shared vs. pattern-specific logic lives and will prevent duplication as
more patterns are added.
---
## Configuration
### Config API Seeding
Pattern and task type definitions are stored via the config API and need to
be seeded at deployment time. This is analogous to how flow blueprints and
parameter types are loaded — a bootstrap step that writes the initial
configuration.
The initial seed includes:
**Patterns** (config type `agent-pattern`):
- `react` — interleaved reasoning and action
- `plan-then-execute` — structured plan followed by step execution
- `supervisor` — decomposition, fan-out to subagents, synthesis
**Task types** (config type `agent-task-type`, initial set, expected to grow):
- `general` — no specific domain framing, all patterns valid
- `research` — open-ended investigation, valid patterns: react, plan-then-execute
- `risk-assessment` — multi-dimensional analysis, valid patterns: supervisor,
plan-then-execute, react
- `summarisation` — condense information, valid patterns: react
The seed data is configuration, not code. It can be extended via the config
API (or the configuration UI) without redeploying the agent manager.
### Migration Path
The config API provides a practical starting point. If richer ontological
relationships between patterns, task types, and domain knowledge become
valuable, the definitions can be migrated to graph storage. The meta-router's
selection logic queries an abstract set of task types and patterns — the
storage backend is an implementation detail.
### Fallback Behaviour
If the config contains no patterns or task types:
- Task type defaults to `general`.
- Pattern defaults to `react`.
- The system degrades gracefully to existing behaviour.
---
## Design Decisions
| Decision | Resolution | Rationale |
|---|---|---|
| Task type identification | LLM interprets from plain text | Natural language too varied to formalise prematurely |
| Pattern/task type storage | Config API initially, graph later if needed | Avoids graph model complexity upfront; config API already has UI support; migration path is straightforward |
| Meta-router location | Phase within agent manager, not separate service | Avoids an extra network hop; routing is fast |
| Fan-in mechanism | Event-driven via explainability topic | Consistent with Pulsar-based architecture; graph query for completion count is idempotent and restart-safe |
| Aggregator deployment | Separate lightweight process | Decoupled from agent manager lifecycle |
| Subagent pattern selection | Supervisor specifies per-subagent | Supervisor has task context to make this choice |
| Plan storage | In message history | No external state needed; plan travels with message |
| Default pattern | Empty pattern field → ReACT | Sensible default when meta-router is not configured |
---
## Streaming Protocol
### Current Model
The existing agent response schema has two levels:
- **`end_of_message`** — marks the end of a complete thought, observation,
or answer. Chunks belonging to the same message arrive sequentially.
- **`end_of_dialog`** — marks the end of the entire agent execution. No
more messages will follow.
This works because the current system produces messages serially — one
thought at a time, one agent at a time.
### Fan-Out Breaks Serial Assumptions
With supervisor/subagent fan-out, multiple subagents stream chunks
concurrently on the same *agent response* topic. The caller receives
interleaved chunks from different sources and needs to demultiplex them.
### Resolution: Message ID
Each chunk carries a `message_id` — a per-message UUID generated when
the agent begins streaming a new thought, observation, or answer. The
caller groups chunks by `message_id` and assembles each message
independently.
```
Response chunk fields:
message_id UUID for this message (groups chunks)
session_id Which agent session produced this chunk
chunk_type "thought" | "observation" | "answer" | ...
content The chunk text
end_of_message True on the final chunk of this message
end_of_dialog True on the final message of the entire execution
```
A single subagent emits multiple messages (thought, observation, thought,
answer), each with a distinct `message_id`. The `session_id` identifies
which subagent the message belongs to. The caller can display, group, or
filter by either.
### Provenance Trigger
`end_of_message` is the trigger for provenance storage. When a complete
message has been assembled from its chunks:
1. The collated text is saved to the librarian as a single document.
2. A provenance node is written to the graph referencing the document URI.
This follows the pattern established by GraphRAG, where streaming synthesis
chunks are delivered live but the stored provenance references the collated
answer text. Streaming is for the caller; provenance needs complete messages.
---
## Open Questions
- **Re-planning depth** (resolved): Runtime parameter on the
agent-orchestrator executable, default 2. Bounds how many times
Plan-then-Execute can revise its plan before forcing termination.
- **Nested fan-out** (phase B): A subagent can itself be a supervisor
that fans out further. The architecture supports this — correlation IDs
are independent and the aggregator is stateless. The protocols and
message schema should not preclude nested fan-out, but implementation
is deferred. Depth limits will need to be enforced to prevent runaway
decomposition.
- **Task type evolution** (resolved): Manually curated for now. See
Future Directions below for automated discovery.
- **Cost attribution** (deferred): Costs are measured at the
text-completion queue level as they are today. Per-request attribution
across subagents is not yet implemented and is not a blocker for
orchestration.
- **Conversation ID** (resolved): An optional `conversation_id` field on
`AgentRequest`, generated by the caller. When present, all objects
created during the execution (provenance nodes, librarian documents,
subagent completion records) are tagged with the conversation ID. This
enables querying all interactions in a conversation with a single
lookup, and provides the foundation for conversation-scoped memory.
No explicit open/close — the first request with a new conversation ID
implicitly starts the conversation. Omit for one-shot queries.
- **Tool scoping per subagent** (resolved): Subagents inherit the
parent's tool group by default. The supervisor can optionally override
the group per subagent to constrain capabilities (e.g. financial
subagent gets only financial tools). The `group` field on
`AgentRequest` already supports this — the supervisor just sets it
when constructing subagent requests.
---
## Future Directions
### Automated Task Type Discovery
Task types are manually curated in the initial implementation. However,
the architecture is well-suited to automated discovery because all agent
requests and their execution traces flow through Pulsar topics. A
learning service could consume these messages and analyse patterns in
how tasks are framed, which patterns are selected, and how successfully
they execute. Over time, it could propose new task types based on
clusters of similar requests that don't map well to existing types, or
suggest refinements to framing prompts based on which framings lead to
better outcomes. This service would write proposed task types to the
config API for human review — automated discovery, manual approval. The
agent-orchestrator does not need to change; it always reads task types
from config regardless of how they got there.

View file

@ -87,10 +87,11 @@ def sample_message_data():
"history": []
},
"AgentResponse": {
"answer": "Machine learning is a subset of AI.",
"chunk_type": "answer",
"content": "Machine learning is a subset of AI.",
"end_of_message": True,
"end_of_dialog": True,
"error": None,
"thought": "I need to provide information about machine learning.",
"observation": None
},
"Metadata": {
"id": "test-doc-123",

View file

@ -212,10 +212,11 @@ class TestAgentMessageContracts:
# Test required fields
response = AgentResponse(**response_data)
assert hasattr(response, 'answer')
assert hasattr(response, 'chunk_type')
assert hasattr(response, 'content')
assert hasattr(response, 'end_of_message')
assert hasattr(response, 'end_of_dialog')
assert hasattr(response, 'error')
assert hasattr(response, 'thought')
assert hasattr(response, 'observation')
def test_agent_step_schema_contract(self):
"""Test AgentStep schema contract"""

View file

@ -188,12 +188,10 @@ class TestAgentTranslatorCompletionFlags:
# Arrange
translator = TranslatorRegistry.get_response_translator("agent")
response = AgentResponse(
answer="4",
error=None,
thought=None,
observation=None,
chunk_type="answer",
content="4",
end_of_message=True,
end_of_dialog=True
end_of_dialog=True,
)
# Act
@ -201,7 +199,7 @@ class TestAgentTranslatorCompletionFlags:
# Assert
assert is_final is True, "is_final must be True when end_of_dialog=True"
assert response_dict["answer"] == "4"
assert response_dict["content"] == "4"
assert response_dict["end_of_dialog"] is True
def test_agent_translator_is_final_with_end_of_dialog_false(self):
@ -212,12 +210,10 @@ class TestAgentTranslatorCompletionFlags:
# Arrange
translator = TranslatorRegistry.get_response_translator("agent")
response = AgentResponse(
answer=None,
error=None,
thought="I need to solve this.",
observation=None,
chunk_type="thought",
content="I need to solve this.",
end_of_message=True,
end_of_dialog=False
end_of_dialog=False,
)
# Act
@ -225,31 +221,9 @@ class TestAgentTranslatorCompletionFlags:
# Assert
assert is_final is False, "is_final must be False when end_of_dialog=False"
assert response_dict["thought"] == "I need to solve this."
assert response_dict["content"] == "I need to solve this."
assert response_dict["end_of_dialog"] is False
def test_agent_translator_is_final_fallback_with_answer(self):
"""
Test that AgentResponseTranslator returns is_final=True
when answer is present (fallback for legacy responses).
"""
# Arrange
translator = TranslatorRegistry.get_response_translator("agent")
# Legacy response without end_of_dialog flag
response = AgentResponse(
answer="4",
error=None,
thought=None,
observation=None
)
# Act
response_dict, is_final = translator.from_response_with_completion(response)
# Assert
assert is_final is True, "is_final must be True when answer is present (legacy fallback)"
assert response_dict["answer"] == "4"
def test_agent_translator_intermediate_message_is_not_final(self):
"""
Test that intermediate messages (thought/observation) return is_final=False.
@ -259,12 +233,10 @@ class TestAgentTranslatorCompletionFlags:
# Test thought message
thought_response = AgentResponse(
answer=None,
error=None,
thought="Processing...",
observation=None,
chunk_type="thought",
content="Processing...",
end_of_message=True,
end_of_dialog=False
end_of_dialog=False,
)
# Act
@ -275,12 +247,10 @@ class TestAgentTranslatorCompletionFlags:
# Test observation message
observation_response = AgentResponse(
answer=None,
error=None,
thought=None,
observation="Result found",
chunk_type="observation",
content="Result found",
end_of_message=True,
end_of_dialog=False
end_of_dialog=False,
)
# Act
@ -302,10 +272,6 @@ class TestAgentTranslatorCompletionFlags:
content="",
end_of_message=True,
end_of_dialog=True,
answer=None,
error=None,
thought=None,
observation=None
)
# Act

View file

@ -82,16 +82,16 @@ class TestAgentServiceNonStreaming:
# Check thought message
thought_response = sent_responses[0]
assert isinstance(thought_response, AgentResponse)
assert thought_response.thought == "I need to solve this."
assert thought_response.answer is None
assert thought_response.chunk_type == "thought"
assert thought_response.content == "I need to solve this."
assert thought_response.end_of_message is True, "Thought message must have end_of_message=True"
assert thought_response.end_of_dialog is False, "Thought message must have end_of_dialog=False"
# Check observation message
observation_response = sent_responses[1]
assert isinstance(observation_response, AgentResponse)
assert observation_response.observation == "The answer is 4."
assert observation_response.answer is None
assert observation_response.chunk_type == "observation"
assert observation_response.content == "The answer is 4."
assert observation_response.end_of_message is True, "Observation message must have end_of_message=True"
assert observation_response.end_of_dialog is False, "Observation message must have end_of_dialog=False"
@ -161,9 +161,8 @@ class TestAgentServiceNonStreaming:
# Check final answer message
answer_response = sent_responses[0]
assert isinstance(answer_response, AgentResponse)
assert answer_response.answer == "4"
assert answer_response.thought is None
assert answer_response.observation is None
assert answer_response.chunk_type == "answer"
assert answer_response.content == "4"
assert answer_response.end_of_message is True, "Final answer must have end_of_message=True"
assert answer_response.end_of_dialog is True, "Final answer must have end_of_dialog=True"

View file

@ -402,23 +402,6 @@ class SocketClient:
content=resp.get("content", ""),
end_of_message=resp.get("end_of_message", False)
)
# Non-streaming agent format: chunk_type is empty but has thought/observation/answer fields
elif resp.get("thought"):
return AgentThought(
content=resp.get("thought", ""),
end_of_message=resp.get("end_of_message", False)
)
elif resp.get("observation"):
return AgentObservation(
content=resp.get("observation", ""),
end_of_message=resp.get("end_of_message", False)
)
elif resp.get("answer"):
return AgentAnswer(
content=resp.get("answer", ""),
end_of_message=resp.get("end_of_message", False),
end_of_dialog=resp.get("end_of_dialog", False)
)
else:
content = resp.get("response", resp.get("chunk", resp.get("text", "")))
return RAGChunk(

View file

@ -57,8 +57,7 @@ class AgentClient(RequestResponse):
await self.request(
AgentRequest(
question = question,
plan = plan,
state = state,
state = state or "",
history = history,
),
recipient=recipient,

View file

@ -90,9 +90,6 @@ class AgentService(FlowProcessor):
type = "agent-error",
message = str(e),
),
thought = None,
observation = None,
answer = None,
end_of_message = True,
end_of_dialog = True,
),

View file

@ -16,6 +16,14 @@ class AgentRequestTranslator(MessageTranslator):
collection=data.get("collection", "default"),
streaming=data.get("streaming", False),
session_id=data.get("session_id", ""),
conversation_id=data.get("conversation_id", ""),
pattern=data.get("pattern", ""),
task_type=data.get("task_type", ""),
framing=data.get("framing", ""),
correlation_id=data.get("correlation_id", ""),
parent_session_id=data.get("parent_session_id", ""),
subagent_goal=data.get("subagent_goal", ""),
expected_siblings=data.get("expected_siblings", 0),
)
def from_pulsar(self, obj: AgentRequest) -> Dict[str, Any]:
@ -28,6 +36,14 @@ class AgentRequestTranslator(MessageTranslator):
"collection": getattr(obj, "collection", "default"),
"streaming": getattr(obj, "streaming", False),
"session_id": getattr(obj, "session_id", ""),
"conversation_id": getattr(obj, "conversation_id", ""),
"pattern": getattr(obj, "pattern", ""),
"task_type": getattr(obj, "task_type", ""),
"framing": getattr(obj, "framing", ""),
"correlation_id": getattr(obj, "correlation_id", ""),
"parent_session_id": getattr(obj, "parent_session_id", ""),
"subagent_goal": getattr(obj, "subagent_goal", ""),
"expected_siblings": getattr(obj, "expected_siblings", 0),
}
@ -40,24 +56,15 @@ class AgentResponseTranslator(MessageTranslator):
def from_pulsar(self, obj: AgentResponse) -> Dict[str, Any]:
result = {}
# Check if this is a streaming response (has chunk_type)
if hasattr(obj, 'chunk_type') and obj.chunk_type:
if obj.chunk_type:
result["chunk_type"] = obj.chunk_type
if obj.content:
result["content"] = obj.content
result["end_of_message"] = getattr(obj, "end_of_message", False)
result["end_of_dialog"] = getattr(obj, "end_of_dialog", False)
else:
# Legacy format (non-streaming)
if obj.answer:
result["answer"] = obj.answer
if obj.thought:
result["thought"] = obj.thought
if obj.observation:
result["observation"] = obj.observation
# Include completion flags for legacy format too
result["end_of_message"] = getattr(obj, "end_of_message", False)
result["end_of_dialog"] = getattr(obj, "end_of_dialog", False)
if getattr(obj, "message_id", ""):
result["message_id"] = obj.message_id
# Include explainability fields if present
explain_id = getattr(obj, "explain_id", None)
@ -76,11 +83,5 @@ class AgentResponseTranslator(MessageTranslator):
def from_response_with_completion(self, obj: AgentResponse) -> Tuple[Dict[str, Any], bool]:
"""Returns (response_dict, is_final)"""
# For streaming responses, check end_of_dialog
if hasattr(obj, 'chunk_type') and obj.chunk_type:
is_final = getattr(obj, 'end_of_dialog', False)
else:
# For legacy responses, check if answer is present
is_final = (obj.answer is not None)
return self.from_pulsar(obj), is_final

View file

@ -1,5 +1,6 @@
from dataclasses import dataclass, field
from typing import Optional
from ..core.topic import topic
from ..core.primitives import Error
@ -8,6 +9,14 @@ from ..core.primitives import Error
# Prompt services, abstract the prompt generation
@dataclass
class PlanStep:
goal: str = ""
tool_hint: str = "" # Suggested tool for this step
depends_on: list[int] = field(default_factory=list) # Indices of prerequisite steps
status: str = "pending" # pending, running, completed, failed
result: str = "" # Result of step execution
@dataclass
class AgentStep:
thought: str = ""
@ -15,6 +24,9 @@ class AgentStep:
arguments: dict[str, str] = field(default_factory=dict)
observation: str = ""
user: str = "" # User context for the step
step_type: str = "" # "react", "plan", "execute", "decompose", "synthesise"
plan: list[PlanStep] = field(default_factory=list) # Plan steps (for plan-then-execute)
subagent_results: dict[str, str] = field(default_factory=dict) # Subagent results keyed by goal
@dataclass
class AgentRequest:
@ -27,6 +39,16 @@ class AgentRequest:
streaming: bool = False # Enable streaming response delivery (default false)
session_id: str = "" # For provenance tracking across iterations
# Orchestration fields
conversation_id: str = "" # Groups related requests into a conversation
pattern: str = "" # Selected pattern: "react", "plan-then-execute", "supervisor"
task_type: str = "" # Task type from config: "general", "research", etc.
framing: str = "" # Domain framing text injected into prompts
correlation_id: str = "" # Links fan-out subagents to parent for fan-in
parent_session_id: str = "" # Session ID of the supervisor that spawned this subagent
subagent_goal: str = "" # Specific goal for a subagent (set by supervisor)
expected_siblings: int = 0 # Number of sibling subagents in this fan-out
@dataclass
class AgentResponse:
# Streaming-first design
@ -39,11 +61,10 @@ class AgentResponse:
explain_id: str | None = None # Provenance URI (announced as created)
explain_graph: str | None = None # Named graph where explain was stored
# Legacy fields (deprecated but kept for backward compatibility)
answer: str = ""
# Orchestration fields
message_id: str = "" # Unique ID for this response message
error: Error | None = None
thought: str = ""
observation: str = ""
############################################################################

View file

@ -56,6 +56,7 @@ Homepage = "https://github.com/trustgraph-ai/trustgraph"
[project.scripts]
agent-manager-react = "trustgraph.agent.react:run"
agent-orchestrator = "trustgraph.agent.orchestrator:run"
api-gateway = "trustgraph.gateway:run"
chunker-recursive = "trustgraph.chunking.recursive:run"
chunker-token = "trustgraph.chunking.token:run"

View file

@ -0,0 +1,2 @@
from . service import *

View file

@ -0,0 +1,6 @@
#!/usr/bin/env python3
from . service import run
if __name__ == '__main__':
run()

View file

@ -0,0 +1,157 @@
"""
Aggregator monitors the explainability topic for subagent completions
and triggers synthesis when all siblings in a fan-out have completed.
The aggregator watches for tg:Conclusion triples that carry a
correlation_id. When it detects that all expected siblings have
completed, it emits a synthesis AgentRequest on the agent request topic.
"""
import asyncio
import json
import logging
import time
import uuid
from ... schema import AgentRequest, AgentStep
logger = logging.getLogger(__name__)
# How long to wait for stalled correlations before giving up (seconds)
DEFAULT_TIMEOUT = 300
class Aggregator:
"""
Tracks in-flight fan-out correlations and triggers synthesis
when all subagents complete.
State is held in-memory; if the process restarts, in-flight
correlations are lost (acceptable for v1).
"""
def __init__(self, timeout=DEFAULT_TIMEOUT):
self.timeout = timeout
# correlation_id -> {
# "parent_session_id": str,
# "expected": int,
# "results": {goal: answer},
# "request_template": AgentRequest or None,
# "created_at": float,
# }
self.correlations = {}
def register_fanout(self, correlation_id, parent_session_id,
expected_siblings, request_template=None):
"""
Register a new fan-out. Called by the supervisor after emitting
subagent requests.
"""
self.correlations[correlation_id] = {
"parent_session_id": parent_session_id,
"expected": expected_siblings,
"results": {},
"request_template": request_template,
"created_at": time.time(),
}
logger.info(
f"Aggregator: registered fan-out {correlation_id}, "
f"expecting {expected_siblings} subagents"
)
def record_completion(self, correlation_id, subagent_goal, result):
"""
Record a subagent completion.
Returns:
True if all siblings are now complete, False otherwise.
Returns None if the correlation_id is unknown.
"""
if correlation_id not in self.correlations:
logger.warning(
f"Aggregator: unknown correlation_id {correlation_id}"
)
return None
entry = self.correlations[correlation_id]
entry["results"][subagent_goal] = result
completed = len(entry["results"])
expected = entry["expected"]
logger.info(
f"Aggregator: {correlation_id}"
f"{completed}/{expected} subagents complete"
)
return completed >= expected
def get_results(self, correlation_id):
"""Get all results for a correlation and remove the tracking entry."""
entry = self.correlations.pop(correlation_id, None)
if entry is None:
return None, None, None
return (
entry["results"],
entry["parent_session_id"],
entry["request_template"],
)
def build_synthesis_request(self, correlation_id, original_question,
user, collection):
"""
Build the AgentRequest that triggers the synthesis phase.
"""
results, parent_session_id, template = self.get_results(correlation_id)
if results is None:
raise RuntimeError(
f"No results for correlation_id {correlation_id}"
)
# Build history with decompose step + results
synthesis_step = AgentStep(
thought="All subagents completed",
action="aggregate",
arguments={},
observation=json.dumps(results),
step_type="synthesise",
subagent_results=results,
)
history = []
if template and template.history:
history = list(template.history)
history.append(synthesis_step)
return AgentRequest(
question=original_question,
state="",
group=template.group if template else [],
history=history,
user=user,
collection=collection,
streaming=template.streaming if template else False,
session_id=parent_session_id,
conversation_id=template.conversation_id if template else "",
pattern="supervisor",
task_type=template.task_type if template else "",
framing=template.framing if template else "",
correlation_id=correlation_id,
parent_session_id="",
subagent_goal="",
expected_siblings=0,
)
def cleanup_stale(self):
"""Remove correlations that have timed out."""
now = time.time()
stale = [
cid for cid, entry in self.correlations.items()
if now - entry["created_at"] > self.timeout
]
for cid in stale:
logger.warning(f"Aggregator: timing out stale correlation {cid}")
self.correlations.pop(cid, None)
return stale

View file

@ -0,0 +1,168 @@
"""
MetaRouter selects the task type and execution pattern for a query.
Uses the config API to look up available task types and patterns, then
asks the LLM to classify the query and select the best pattern.
Falls back to ("react", "general", "") if config is empty.
"""
import json
import logging
logger = logging.getLogger(__name__)
DEFAULT_PATTERN = "react"
DEFAULT_TASK_TYPE = "general"
DEFAULT_FRAMING = ""
class MetaRouter:
def __init__(self, config=None):
"""
Args:
config: The full config dict from the config service.
May contain "agent-pattern" and "agent-task-type" keys.
"""
self.patterns = {}
self.task_types = {}
if config:
# Load from config API
if "agent-pattern" in config:
for pid, pval in config["agent-pattern"].items():
try:
self.patterns[pid] = json.loads(pval)
except (json.JSONDecodeError, TypeError):
self.patterns[pid] = {"name": pid}
if "agent-task-type" in config:
for tid, tval in config["agent-task-type"].items():
try:
self.task_types[tid] = json.loads(tval)
except (json.JSONDecodeError, TypeError):
self.task_types[tid] = {"name": tid}
# If config has no patterns/task-types, default to react/general
if not self.patterns:
self.patterns = {
"react": {"name": "react", "description": "Interleaved reasoning and action"},
}
if not self.task_types:
self.task_types = {
"general": {"name": "general", "description": "General queries", "valid_patterns": ["react"], "framing": ""},
}
async def identify_task_type(self, question, context):
"""
Use the LLM to classify the question into one of the known task types.
Args:
question: The user's query.
context: UserAwareContext (flow wrapper).
Returns:
(task_type_id, framing) tuple.
"""
if len(self.task_types) <= 1:
tid = next(iter(self.task_types), DEFAULT_TASK_TYPE)
framing = self.task_types.get(tid, {}).get("framing", DEFAULT_FRAMING)
return tid, framing
try:
client = context("prompt-request")
response = await client.prompt(
id="task-type-classify",
variables={
"question": question,
"task_types": [
{"name": tid, "description": tdata.get("description", tid)}
for tid, tdata in self.task_types.items()
],
},
)
selected = response.strip().lower().replace('"', '').replace("'", "")
if selected in self.task_types:
framing = self.task_types[selected].get("framing", DEFAULT_FRAMING)
logger.info(f"MetaRouter: identified task type '{selected}'")
return selected, framing
else:
logger.warning(
f"MetaRouter: LLM returned unknown task type '{selected}', "
f"falling back to '{DEFAULT_TASK_TYPE}'"
)
except Exception as e:
logger.warning(f"MetaRouter: task type classification failed: {e}")
framing = self.task_types.get(DEFAULT_TASK_TYPE, {}).get(
"framing", DEFAULT_FRAMING
)
return DEFAULT_TASK_TYPE, framing
async def select_pattern(self, question, task_type, context):
"""
Use the LLM to select the best execution pattern for this task type.
Args:
question: The user's query.
task_type: The identified task type ID.
context: UserAwareContext (flow wrapper).
Returns:
Pattern ID string.
"""
task_config = self.task_types.get(task_type, {})
valid_patterns = task_config.get("valid_patterns", list(self.patterns.keys()))
if len(valid_patterns) <= 1:
return valid_patterns[0] if valid_patterns else DEFAULT_PATTERN
try:
client = context("prompt-request")
response = await client.prompt(
id="pattern-select",
variables={
"question": question,
"task_type": task_type,
"task_type_description": task_config.get("description", task_type),
"patterns": [
{"name": pid, "description": self.patterns.get(pid, {}).get("description", pid)}
for pid in valid_patterns
if pid in self.patterns
],
},
)
selected = response.strip().lower().replace('"', '').replace("'", "")
if selected in valid_patterns:
logger.info(f"MetaRouter: selected pattern '{selected}'")
return selected
else:
logger.warning(
f"MetaRouter: LLM returned invalid pattern '{selected}', "
f"falling back to '{valid_patterns[0]}'"
)
return valid_patterns[0]
except Exception as e:
logger.warning(f"MetaRouter: pattern selection failed: {e}")
return valid_patterns[0] if valid_patterns else DEFAULT_PATTERN
async def route(self, question, context):
"""
Full routing pipeline: identify task type, then select pattern.
Args:
question: The user's query.
context: UserAwareContext (flow wrapper).
Returns:
(pattern, task_type, framing) tuple.
"""
task_type, framing = await self.identify_task_type(question, context)
pattern = await self.select_pattern(question, task_type, context)
logger.info(
f"MetaRouter: route result — "
f"pattern={pattern}, task_type={task_type}, framing={framing!r}"
)
return pattern, task_type, framing

View file

@ -0,0 +1,428 @@
"""
Base class for agent patterns.
Provides shared infrastructure used by all patterns: tool filtering,
provenance emission, streaming callbacks, history management, and
librarian integration.
"""
import json
import logging
import uuid
from datetime import datetime
from ... schema import AgentRequest, AgentResponse, AgentStep, Error
from ... schema import Triples, Metadata
from trustgraph.provenance import (
agent_session_uri,
agent_iteration_uri,
agent_thought_uri,
agent_observation_uri,
agent_final_uri,
agent_session_triples,
agent_iteration_triples,
agent_final_triples,
set_graph,
GRAPH_RETRIEVAL,
)
from ..react.types import Action, Final
from ..tool_filter import filter_tools_by_group_and_state, get_next_state
logger = logging.getLogger(__name__)
class UserAwareContext:
"""Wraps flow interface to inject user context for tools that need it."""
def __init__(self, flow, user):
self._flow = flow
self._user = user
def __call__(self, service_name):
client = self._flow(service_name)
if service_name in (
"structured-query-request",
"row-embeddings-query-request",
):
client._current_user = self._user
return client
class PatternBase:
"""
Shared infrastructure for all agent patterns.
Subclasses implement iterate() to perform one iteration of their
pattern-specific logic.
"""
def __init__(self, processor):
self.processor = processor
def filter_tools(self, tools, request):
"""Apply group/state filtering to the tool set."""
return filter_tools_by_group_and_state(
tools=tools,
requested_groups=getattr(request, 'group', None),
current_state=getattr(request, 'state', None),
)
def make_context(self, flow, user):
"""Create a user-aware context wrapper."""
return UserAwareContext(flow, user)
def build_history(self, request):
"""Convert AgentStep history into Action objects."""
if not request.history:
return []
return [
Action(
thought=h.thought,
name=h.action,
arguments=h.arguments,
observation=h.observation,
)
for h in request.history
]
# ---- Streaming callbacks ------------------------------------------------
def make_think_callback(self, respond, streaming):
"""Create the think callback for streaming/non-streaming."""
async def think(x, is_final=False):
logger.debug(f"Think: {x} (is_final={is_final})")
if streaming:
r = AgentResponse(
chunk_type="thought",
content=x,
end_of_message=is_final,
end_of_dialog=False,
)
else:
r = AgentResponse(
chunk_type="thought",
content=x,
end_of_message=True,
end_of_dialog=False,
)
await respond(r)
return think
def make_observe_callback(self, respond, streaming):
"""Create the observe callback for streaming/non-streaming."""
async def observe(x, is_final=False):
logger.debug(f"Observe: {x} (is_final={is_final})")
if streaming:
r = AgentResponse(
chunk_type="observation",
content=x,
end_of_message=is_final,
end_of_dialog=False,
)
else:
r = AgentResponse(
chunk_type="observation",
content=x,
end_of_message=True,
end_of_dialog=False,
)
await respond(r)
return observe
def make_answer_callback(self, respond, streaming):
"""Create the answer callback for streaming/non-streaming."""
async def answer(x):
logger.debug(f"Answer: {x}")
if streaming:
r = AgentResponse(
chunk_type="answer",
content=x,
end_of_message=False,
end_of_dialog=False,
)
else:
r = AgentResponse(
chunk_type="answer",
content=x,
end_of_message=True,
end_of_dialog=False,
)
await respond(r)
return answer
# ---- Provenance emission ------------------------------------------------
async def emit_session_triples(self, flow, session_uri, question, user,
collection, respond, streaming):
"""Emit provenance triples for a new session."""
timestamp = datetime.utcnow().isoformat() + "Z"
triples = set_graph(
agent_session_triples(session_uri, question, timestamp),
GRAPH_RETRIEVAL,
)
await flow("explainability").send(Triples(
metadata=Metadata(
id=session_uri,
user=user,
collection=collection,
),
triples=triples,
))
logger.debug(f"Emitted session triples for {session_uri}")
if streaming:
await respond(AgentResponse(
chunk_type="explain",
content="",
explain_id=session_uri,
explain_graph=GRAPH_RETRIEVAL,
))
async def emit_iteration_triples(self, flow, session_id, iteration_num,
session_uri, act, request, respond,
streaming):
"""Emit provenance triples for an iteration and save to librarian."""
iteration_uri = agent_iteration_uri(session_id, iteration_num)
if iteration_num > 1:
iter_question_uri = None
iter_previous_uri = agent_iteration_uri(session_id, iteration_num - 1)
else:
iter_question_uri = session_uri
iter_previous_uri = None
# Save thought to librarian
thought_doc_id = None
if act.thought:
thought_doc_id = (
f"urn:trustgraph:agent:{session_id}/i{iteration_num}/thought"
)
try:
await self.processor.save_answer_content(
doc_id=thought_doc_id,
user=request.user,
content=act.thought,
title=f"Agent Thought: {act.name}",
)
except Exception as e:
logger.warning(f"Failed to save thought to librarian: {e}")
thought_doc_id = None
# Save observation to librarian
observation_doc_id = None
if act.observation:
observation_doc_id = (
f"urn:trustgraph:agent:{session_id}/i{iteration_num}/observation"
)
try:
await self.processor.save_answer_content(
doc_id=observation_doc_id,
user=request.user,
content=act.observation,
title=f"Agent Observation: {act.name}",
)
except Exception as e:
logger.warning(f"Failed to save observation to librarian: {e}")
observation_doc_id = None
thought_entity_uri = agent_thought_uri(session_id, iteration_num)
observation_entity_uri = agent_observation_uri(session_id, iteration_num)
iter_triples = set_graph(
agent_iteration_triples(
iteration_uri,
question_uri=iter_question_uri,
previous_uri=iter_previous_uri,
action=act.name,
arguments=act.arguments,
thought_uri=thought_entity_uri if thought_doc_id else None,
thought_document_id=thought_doc_id,
observation_uri=observation_entity_uri if observation_doc_id else None,
observation_document_id=observation_doc_id,
),
GRAPH_RETRIEVAL,
)
await flow("explainability").send(Triples(
metadata=Metadata(
id=iteration_uri,
user=request.user,
collection=getattr(request, 'collection', 'default'),
),
triples=iter_triples,
))
logger.debug(f"Emitted iteration triples for {iteration_uri}")
if streaming:
await respond(AgentResponse(
chunk_type="explain",
content="",
explain_id=iteration_uri,
explain_graph=GRAPH_RETRIEVAL,
))
async def emit_final_triples(self, flow, session_id, iteration_num,
session_uri, answer_text, request, respond,
streaming):
"""Emit provenance triples for the final answer and save to librarian."""
final_uri = agent_final_uri(session_id)
if iteration_num > 1:
final_question_uri = None
final_previous_uri = agent_iteration_uri(session_id, iteration_num - 1)
else:
final_question_uri = session_uri
final_previous_uri = None
# Save answer to librarian
answer_doc_id = None
if answer_text:
answer_doc_id = f"urn:trustgraph:agent:{session_id}/answer"
try:
await self.processor.save_answer_content(
doc_id=answer_doc_id,
user=request.user,
content=answer_text,
title=f"Agent Answer: {request.question[:50]}...",
)
logger.debug(f"Saved answer to librarian: {answer_doc_id}")
except Exception as e:
logger.warning(f"Failed to save answer to librarian: {e}")
answer_doc_id = None
final_triples = set_graph(
agent_final_triples(
final_uri,
question_uri=final_question_uri,
previous_uri=final_previous_uri,
document_id=answer_doc_id,
),
GRAPH_RETRIEVAL,
)
await flow("explainability").send(Triples(
metadata=Metadata(
id=final_uri,
user=request.user,
collection=getattr(request, 'collection', 'default'),
),
triples=final_triples,
))
logger.debug(f"Emitted final triples for {final_uri}")
if streaming:
await respond(AgentResponse(
chunk_type="explain",
content="",
explain_id=final_uri,
explain_graph=GRAPH_RETRIEVAL,
))
# ---- Response helpers ---------------------------------------------------
async def prompt_as_answer(self, client, prompt_id, variables,
respond, streaming):
"""Call a prompt template, forwarding chunks as answer
AgentResponse messages when streaming is enabled.
Returns the full accumulated answer text (needed for provenance).
"""
if streaming:
accumulated = []
async def on_chunk(text, end_of_stream):
if text:
accumulated.append(text)
await respond(AgentResponse(
chunk_type="answer",
content=text,
end_of_message=False,
end_of_dialog=False,
))
await client.prompt(
id=prompt_id,
variables=variables,
streaming=True,
chunk_callback=on_chunk,
)
return "".join(accumulated)
else:
return await client.prompt(
id=prompt_id,
variables=variables,
)
async def send_final_response(self, respond, streaming, answer_text,
already_streamed=False):
"""Send the answer content and end-of-dialog marker.
Args:
already_streamed: If True, answer chunks were already sent
via streaming callbacks (e.g. ReactPattern). Only the
end-of-dialog marker is emitted.
"""
if streaming and not already_streamed:
# Answer wasn't streamed yet — send it as a chunk first
if answer_text:
await respond(AgentResponse(
chunk_type="answer",
content=answer_text,
end_of_message=False,
end_of_dialog=False,
))
if streaming:
# End-of-dialog marker
await respond(AgentResponse(
chunk_type="answer",
content="",
end_of_message=True,
end_of_dialog=True,
))
else:
await respond(AgentResponse(
chunk_type="answer",
content=answer_text,
end_of_message=True,
end_of_dialog=True,
))
def build_next_request(self, request, history, session_id, collection,
streaming, next_state):
"""Build the AgentRequest for the next iteration."""
return AgentRequest(
question=request.question,
state=next_state,
group=getattr(request, 'group', []),
history=[
AgentStep(
thought=h.thought,
action=h.name,
arguments={k: str(v) for k, v in h.arguments.items()},
observation=h.observation,
)
for h in history
],
user=request.user,
collection=collection,
streaming=streaming,
session_id=session_id,
# Preserve orchestration fields
conversation_id=getattr(request, 'conversation_id', ''),
pattern=getattr(request, 'pattern', ''),
task_type=getattr(request, 'task_type', ''),
framing=getattr(request, 'framing', ''),
correlation_id=getattr(request, 'correlation_id', ''),
parent_session_id=getattr(request, 'parent_session_id', ''),
subagent_goal=getattr(request, 'subagent_goal', ''),
expected_siblings=getattr(request, 'expected_siblings', 0),
)
async def iterate(self, request, respond, next, flow):
"""
Perform one iteration of this pattern.
Must be implemented by subclasses.
"""
raise NotImplementedError

View file

@ -0,0 +1,349 @@
"""
PlanThenExecutePattern structured planning followed by step execution.
Phase 1 (planning): LLM produces a structured plan of steps.
Phase 2 (execution): Each step is executed via single-shot tool call.
"""
import json
import logging
import uuid
from ... schema import AgentRequest, AgentResponse, AgentStep, PlanStep
from ..react.types import Action
from . pattern_base import PatternBase
logger = logging.getLogger(__name__)
class PlanThenExecutePattern(PatternBase):
"""
Plan-then-Execute pattern.
History tracks the current phase via AgentStep.step_type:
- "plan" step: contains the plan in step.plan
- "execute" step: a normal react iteration executing a plan step
On the first call (empty history), a planning iteration is run.
Subsequent calls execute the next pending plan step via ReACT.
"""
async def iterate(self, request, respond, next, flow):
streaming = getattr(request, 'streaming', False)
session_id = getattr(request, 'session_id', '') or str(uuid.uuid4())
collection = getattr(request, 'collection', 'default')
history = self.build_history(request)
iteration_num = len(request.history) + 1
session_uri = self.processor.provenance_session_uri(session_id)
# Emit session provenance on first iteration
if iteration_num == 1:
await self.emit_session_triples(
flow, session_uri, request.question,
request.user, collection, respond, streaming,
)
logger.info(
f"PlanThenExecutePattern iteration {iteration_num}: "
f"{request.question}"
)
if iteration_num >= self.processor.max_iterations:
raise RuntimeError("Too many agent iterations")
# Determine current phase by checking history for a plan step
plan = self._extract_plan(request.history)
if plan is None:
await self._planning_iteration(
request, respond, next, flow,
session_id, collection, streaming, session_uri,
iteration_num,
)
else:
await self._execution_iteration(
request, respond, next, flow,
session_id, collection, streaming, session_uri,
iteration_num, plan,
)
def _extract_plan(self, history):
"""Find the most recent plan from history.
Checks execute steps first (they carry the updated plan with
completion statuses), then falls back to the original plan step.
"""
if not history:
return None
for step in reversed(history):
if step.plan:
return list(step.plan)
return None
def _find_next_pending_step(self, plan):
"""Return index of the next pending step, or None if all done."""
for i, step in enumerate(plan):
if getattr(step, 'status', 'pending') == 'pending':
return i
return None
async def _planning_iteration(self, request, respond, next, flow,
session_id, collection, streaming,
session_uri, iteration_num):
"""Ask the LLM to produce a structured plan."""
think = self.make_think_callback(respond, streaming)
tools = self.filter_tools(self.processor.agent.tools, request)
framing = getattr(request, 'framing', '')
context = self.make_context(flow, request.user)
client = context("prompt-request")
# Use the plan-create prompt template
plan_steps = await client.prompt(
id="plan-create",
variables={
"question": request.question,
"framing": framing,
"tools": [
{"name": t.name, "description": t.description}
for t in tools.values()
],
},
)
# Validate we got a list
if not isinstance(plan_steps, list) or not plan_steps:
logger.warning("plan-create returned invalid result, falling back to single step")
plan_steps = [{"goal": "Answer the question directly", "tool_hint": "", "depends_on": []}]
# Emit thought about the plan
thought_text = f"Created plan with {len(plan_steps)} steps"
await think(thought_text, is_final=True)
# Build PlanStep objects
plan_agent_steps = [
PlanStep(
goal=ps.get("goal", ""),
tool_hint=ps.get("tool_hint", ""),
depends_on=ps.get("depends_on", []),
status="pending",
result="",
)
for ps in plan_steps
]
# Create a plan step in history
plan_history_step = AgentStep(
thought=thought_text,
action="plan",
arguments={},
observation=json.dumps(plan_steps),
step_type="plan",
plan=plan_agent_steps,
)
# Build next request with plan in history
new_history = list(request.history) + [plan_history_step]
r = AgentRequest(
question=request.question,
state=request.state,
group=getattr(request, 'group', []),
history=new_history,
user=request.user,
collection=collection,
streaming=streaming,
session_id=session_id,
conversation_id=getattr(request, 'conversation_id', ''),
pattern=getattr(request, 'pattern', ''),
task_type=getattr(request, 'task_type', ''),
framing=getattr(request, 'framing', ''),
correlation_id=getattr(request, 'correlation_id', ''),
parent_session_id=getattr(request, 'parent_session_id', ''),
subagent_goal=getattr(request, 'subagent_goal', ''),
expected_siblings=getattr(request, 'expected_siblings', 0),
)
await next(r)
async def _execution_iteration(self, request, respond, next, flow,
session_id, collection, streaming,
session_uri, iteration_num, plan):
"""Execute the next pending plan step via single-shot tool call."""
pending_idx = self._find_next_pending_step(plan)
if pending_idx is None:
# All steps done — synthesise final answer
await self._synthesise(
request, respond, next, flow,
session_id, collection, streaming,
session_uri, iteration_num, plan,
)
return
current_step = plan[pending_idx]
goal = getattr(current_step, 'goal', '') or str(current_step)
logger.info(f"Executing plan step {pending_idx}: {goal}")
think = self.make_think_callback(respond, streaming)
observe = self.make_observe_callback(respond, streaming)
# Gather results from dependencies
previous_results = []
depends_on = getattr(current_step, 'depends_on', [])
if depends_on:
for dep_idx in depends_on:
if 0 <= dep_idx < len(plan):
dep_step = plan[dep_idx]
dep_result = getattr(dep_step, 'result', '')
if dep_result:
previous_results.append({
"index": dep_idx,
"result": dep_result,
})
tools = self.filter_tools(self.processor.agent.tools, request)
context = self.make_context(flow, request.user)
client = context("prompt-request")
# Single-shot: ask LLM which tool + arguments to use for this goal
tool_call = await client.prompt(
id="plan-step-execute",
variables={
"goal": goal,
"previous_results": previous_results,
"tools": [
{
"name": t.name,
"description": t.description,
"arguments": [
{"name": a.name, "type": a.type, "description": a.description}
for a in t.arguments
],
}
for t in tools.values()
],
},
)
tool_name = tool_call.get("tool", "")
tool_arguments = tool_call.get("arguments", {})
await think(
f"Step {pending_idx}: {goal} → calling {tool_name}",
is_final=True,
)
# Invoke the tool directly
if tool_name in tools:
tool = tools[tool_name]
resp = await tool.implementation(context).invoke(**tool_arguments)
step_result = resp.strip() if isinstance(resp, str) else str(resp).strip()
else:
logger.warning(
f"Plan step {pending_idx}: LLM selected unknown tool "
f"'{tool_name}', available: {list(tools.keys())}"
)
step_result = f"Error: tool '{tool_name}' not found"
await observe(step_result, is_final=True)
# Update plan step status
plan[pending_idx] = PlanStep(
goal=goal,
tool_hint=getattr(current_step, 'tool_hint', ''),
depends_on=getattr(current_step, 'depends_on', []),
status="completed",
result=step_result,
)
# Emit iteration provenance
prov_act = Action(
thought=f"Plan step {pending_idx}: {goal}",
name=tool_name,
arguments=tool_arguments,
observation=step_result,
)
await self.emit_iteration_triples(
flow, session_id, iteration_num, session_uri,
prov_act, request, respond, streaming,
)
# Build execution step for history
exec_step = AgentStep(
thought=f"Executing plan step {pending_idx}: {goal}",
action=tool_name,
arguments={k: str(v) for k, v in tool_arguments.items()},
observation=step_result,
step_type="execute",
plan=plan,
)
new_history = list(request.history) + [exec_step]
r = AgentRequest(
question=request.question,
state=request.state,
group=getattr(request, 'group', []),
history=new_history,
user=request.user,
collection=collection,
streaming=streaming,
session_id=session_id,
conversation_id=getattr(request, 'conversation_id', ''),
pattern=getattr(request, 'pattern', ''),
task_type=getattr(request, 'task_type', ''),
framing=getattr(request, 'framing', ''),
correlation_id=getattr(request, 'correlation_id', ''),
parent_session_id=getattr(request, 'parent_session_id', ''),
subagent_goal=getattr(request, 'subagent_goal', ''),
expected_siblings=getattr(request, 'expected_siblings', 0),
)
await next(r)
async def _synthesise(self, request, respond, next, flow,
session_id, collection, streaming,
session_uri, iteration_num, plan):
"""Synthesise a final answer from all completed plan step results."""
think = self.make_think_callback(respond, streaming)
framing = getattr(request, 'framing', '')
context = self.make_context(flow, request.user)
client = context("prompt-request")
# Use the plan-synthesise prompt template
steps_data = []
for i, step in enumerate(plan):
steps_data.append({
"index": i,
"goal": getattr(step, 'goal', f'Step {i}'),
"result": getattr(step, 'result', ''),
})
await think("Synthesising final answer from plan results", is_final=True)
response_text = await self.prompt_as_answer(
client, "plan-synthesise",
variables={
"question": request.question,
"framing": framing,
"steps": steps_data,
},
respond=respond,
streaming=streaming,
)
await self.emit_final_triples(
flow, session_id, iteration_num, session_uri,
response_text, request, respond, streaming,
)
await self.send_final_response(
respond, streaming, response_text, already_streamed=streaming,
)

View file

@ -0,0 +1,134 @@
"""
ReactPattern extracted from the existing agent_manager.py.
Implements the ReACT (Reasoning + Acting) loop: think, select a tool,
observe the result, repeat until a final answer is produced.
"""
import json
import logging
import uuid
from ... schema import AgentRequest, AgentResponse, AgentStep
from ..react.agent_manager import AgentManager
from ..react.types import Action, Final
from ..tool_filter import get_next_state
from . pattern_base import PatternBase
logger = logging.getLogger(__name__)
class ReactPattern(PatternBase):
"""
ReACT pattern: interleaved reasoning and action.
Each iterate() call performs one reason/act cycle. If the LLM
produces a Final answer the dialog completes; otherwise the action
result is appended to history and a next-request is emitted.
"""
async def iterate(self, request, respond, next, flow):
streaming = getattr(request, 'streaming', False)
session_id = getattr(request, 'session_id', '') or str(uuid.uuid4())
collection = getattr(request, 'collection', 'default')
history = self.build_history(request)
iteration_num = len(history) + 1
session_uri = self.processor.provenance_session_uri(session_id)
# Emit session provenance on first iteration
if iteration_num == 1:
await self.emit_session_triples(
flow, session_uri, request.question,
request.user, collection, respond, streaming,
)
logger.info(f"ReactPattern iteration {iteration_num}: {request.question}")
if len(history) >= self.processor.max_iterations:
raise RuntimeError("Too many agent iterations")
# Build callbacks
think = self.make_think_callback(respond, streaming)
observe = self.make_observe_callback(respond, streaming)
answer_cb = self.make_answer_callback(respond, streaming)
# Filter tools
filtered_tools = self.filter_tools(
self.processor.agent.tools, request,
)
logger.info(
f"Filtered from {len(self.processor.agent.tools)} "
f"to {len(filtered_tools)} available tools"
)
# Create temporary agent with filtered tools and optional framing
additional_context = self.processor.agent.additional_context
framing = getattr(request, 'framing', '')
if framing:
if additional_context:
additional_context = f"{additional_context}\n\n{framing}"
else:
additional_context = framing
temp_agent = AgentManager(
tools=filtered_tools,
additional_context=additional_context,
)
context = self.make_context(flow, request.user)
act = await temp_agent.react(
question=request.question,
history=history,
think=think,
observe=observe,
answer=answer_cb,
context=context,
streaming=streaming,
)
logger.debug(f"Action: {act}")
if isinstance(act, Final):
if isinstance(act.final, str):
f = act.final
else:
f = json.dumps(act.final)
# Emit final provenance
await self.emit_final_triples(
flow, session_id, iteration_num, session_uri,
f, request, respond, streaming,
)
await self.send_final_response(
respond, streaming, f, already_streamed=streaming,
)
return
# Not final — emit iteration provenance and send next request
await self.emit_iteration_triples(
flow, session_id, iteration_num, session_uri,
act, request, respond, streaming,
)
history.append(act)
# Handle state transitions
next_state = request.state
if act.name in filtered_tools:
executed_tool = filtered_tools[act.name]
next_state = get_next_state(executed_tool, request.state or "undefined")
r = self.build_next_request(
request, history, session_id, collection,
streaming, next_state,
)
await next(r)
logger.debug("ReactPattern iteration complete")

View file

@ -0,0 +1,511 @@
"""
Agent orchestrator service multi-pattern drop-in replacement for
agent-manager-react.
Uses the same service identity and Pulsar queues. Adds meta-routing
to select between ReactPattern, PlanThenExecutePattern, and
SupervisorPattern at runtime.
"""
import asyncio
import base64
import json
import functools
import logging
import uuid
from datetime import datetime
from ... base import AgentService, TextCompletionClientSpec, PromptClientSpec
from ... base import GraphRagClientSpec, ToolClientSpec, StructuredQueryClientSpec
from ... base import RowEmbeddingsQueryClientSpec, EmbeddingsClientSpec
from ... base import ProducerSpec
from ... base import Consumer, Producer
from ... base import ConsumerMetrics, ProducerMetrics
from ... schema import AgentRequest, AgentResponse, AgentStep, Error
from ... schema import Triples, Metadata
from ... schema import LibrarianRequest, LibrarianResponse, DocumentMetadata
from ... schema import librarian_request_queue, librarian_response_queue
from trustgraph.provenance import (
agent_session_uri,
GRAPH_RETRIEVAL,
)
from ..react.tools import (
KnowledgeQueryImpl, TextCompletionImpl, McpToolImpl, PromptImpl,
StructuredQueryImpl, RowEmbeddingsQueryImpl, ToolServiceImpl,
)
from ..react.agent_manager import AgentManager
from ..tool_filter import validate_tool_config
from ..react.types import Final, Action, Tool, Argument
from . meta_router import MetaRouter
from . pattern_base import PatternBase, UserAwareContext
from . react_pattern import ReactPattern
from . plan_pattern import PlanThenExecutePattern
from . supervisor_pattern import SupervisorPattern
from . aggregator import Aggregator
logger = logging.getLogger(__name__)
default_ident = "agent-manager"
default_max_iterations = 10
default_librarian_request_queue = librarian_request_queue
default_librarian_response_queue = librarian_response_queue
class Processor(AgentService):
def __init__(self, **params):
id = params.get("id")
self.max_iterations = int(
params.get("max_iterations", default_max_iterations)
)
self.config_key = params.get("config_type", "agent")
super(Processor, self).__init__(
**params | {
"id": id,
"max_iterations": self.max_iterations,
"config_type": self.config_key,
}
)
self.agent = AgentManager(
tools={},
additional_context="",
)
self.tool_service_clients = {}
# Patterns
self.react_pattern = ReactPattern(self)
self.plan_pattern = PlanThenExecutePattern(self)
self.supervisor_pattern = SupervisorPattern(self)
# Aggregator for supervisor fan-in
self.aggregator = Aggregator()
# Meta-router (initialised on first config load)
self.meta_router = None
self.config_handlers.append(self.on_tools_config)
self.register_specification(
TextCompletionClientSpec(
request_name="text-completion-request",
response_name="text-completion-response",
)
)
self.register_specification(
GraphRagClientSpec(
request_name="graph-rag-request",
response_name="graph-rag-response",
)
)
self.register_specification(
PromptClientSpec(
request_name="prompt-request",
response_name="prompt-response",
)
)
self.register_specification(
ToolClientSpec(
request_name="mcp-tool-request",
response_name="mcp-tool-response",
)
)
self.register_specification(
StructuredQueryClientSpec(
request_name="structured-query-request",
response_name="structured-query-response",
)
)
self.register_specification(
EmbeddingsClientSpec(
request_name="embeddings-request",
response_name="embeddings-response",
)
)
self.register_specification(
RowEmbeddingsQueryClientSpec(
request_name="row-embeddings-query-request",
response_name="row-embeddings-query-response",
)
)
# Explainability producer
self.register_specification(
ProducerSpec(
name="explainability",
schema=Triples,
)
)
# Librarian client
librarian_request_q = params.get(
"librarian_request_queue", default_librarian_request_queue
)
librarian_response_q = params.get(
"librarian_response_queue", default_librarian_response_queue
)
librarian_request_metrics = ProducerMetrics(
processor=id, flow=None, name="librarian-request"
)
self.librarian_request_producer = Producer(
backend=self.pubsub,
topic=librarian_request_q,
schema=LibrarianRequest,
metrics=librarian_request_metrics,
)
librarian_response_metrics = ConsumerMetrics(
processor=id, flow=None, name="librarian-response"
)
self.librarian_response_consumer = Consumer(
taskgroup=self.taskgroup,
backend=self.pubsub,
flow=None,
topic=librarian_response_q,
subscriber=f"{id}-librarian",
schema=LibrarianResponse,
handler=self.on_librarian_response,
metrics=librarian_response_metrics,
)
self.pending_librarian_requests = {}
async def start(self):
await super(Processor, self).start()
await self.librarian_request_producer.start()
await self.librarian_response_consumer.start()
async def on_librarian_response(self, msg, consumer, flow):
response = msg.value()
request_id = msg.properties().get("id")
if request_id in self.pending_librarian_requests:
future = self.pending_librarian_requests.pop(request_id)
future.set_result(response)
async def save_answer_content(self, doc_id, user, content, title=None,
timeout=120):
request_id = str(uuid.uuid4())
doc_metadata = DocumentMetadata(
id=doc_id,
user=user,
kind="text/plain",
title=title or "Agent Answer",
document_type="answer",
)
request = LibrarianRequest(
operation="add-document",
document_id=doc_id,
document_metadata=doc_metadata,
content=base64.b64encode(content.encode("utf-8")).decode("utf-8"),
user=user,
)
future = asyncio.get_event_loop().create_future()
self.pending_librarian_requests[request_id] = future
try:
await self.librarian_request_producer.send(
request, properties={"id": request_id}
)
response = await asyncio.wait_for(future, timeout=timeout)
if response.error:
raise RuntimeError(
f"Librarian error saving answer: "
f"{response.error.type}: {response.error.message}"
)
return doc_id
except asyncio.TimeoutError:
self.pending_librarian_requests.pop(request_id, None)
raise RuntimeError(f"Timeout saving answer document {doc_id}")
def provenance_session_uri(self, session_id):
return agent_session_uri(session_id)
async def on_tools_config(self, config, version):
logger.info(f"Loading configuration version {version}")
try:
tools = {}
# Load tool-service configurations
tool_services = {}
if "tool-service" in config:
for service_id, service_value in config["tool-service"].items():
service_data = json.loads(service_value)
tool_services[service_id] = service_data
logger.debug(f"Loaded tool-service config: {service_id}")
logger.info(
f"Loaded {len(tool_services)} tool-service configurations"
)
# Load tool configurations
if "tool" in config:
for tool_id, tool_value in config["tool"].items():
data = json.loads(tool_value)
impl_id = data.get("type")
name = data.get("name")
if impl_id == "knowledge-query":
impl = functools.partial(
KnowledgeQueryImpl,
collection=data.get("collection"),
)
arguments = KnowledgeQueryImpl.get_arguments()
elif impl_id == "text-completion":
impl = TextCompletionImpl
arguments = TextCompletionImpl.get_arguments()
elif impl_id == "mcp-tool":
config_args = data.get("arguments", [])
arguments = [
Argument(
name=arg.get("name"),
type=arg.get("type"),
description=arg.get("description"),
)
for arg in config_args
]
impl = functools.partial(
McpToolImpl,
mcp_tool_id=data.get("mcp-tool"),
arguments=arguments,
)
elif impl_id == "prompt":
config_args = data.get("arguments", [])
arguments = [
Argument(
name=arg.get("name"),
type=arg.get("type"),
description=arg.get("description"),
)
for arg in config_args
]
impl = functools.partial(
PromptImpl,
template_id=data.get("template"),
arguments=arguments,
)
elif impl_id == "structured-query":
impl = functools.partial(
StructuredQueryImpl,
collection=data.get("collection"),
user=None,
)
arguments = StructuredQueryImpl.get_arguments()
elif impl_id == "row-embeddings-query":
impl = functools.partial(
RowEmbeddingsQueryImpl,
schema_name=data.get("schema-name"),
collection=data.get("collection"),
user=None,
index_name=data.get("index-name"),
limit=int(data.get("limit", 10)),
)
arguments = RowEmbeddingsQueryImpl.get_arguments()
elif impl_id == "tool-service":
service_ref = data.get("service")
if not service_ref:
raise RuntimeError(
f"Tool {name} has type 'tool-service' "
f"but no 'service' reference"
)
if service_ref not in tool_services:
raise RuntimeError(
f"Tool {name} references unknown "
f"tool-service '{service_ref}'"
)
service_config = tool_services[service_ref]
request_queue = service_config.get("request-queue")
response_queue = service_config.get("response-queue")
if not request_queue or not response_queue:
raise RuntimeError(
f"Tool-service '{service_ref}' must define "
f"'request-queue' and 'response-queue'"
)
config_params = service_config.get("config-params", [])
config_values = {}
for param in config_params:
param_name = (
param.get("name")
if isinstance(param, dict) else param
)
if param_name in data:
config_values[param_name] = data[param_name]
elif (
isinstance(param, dict)
and param.get("required", False)
):
raise RuntimeError(
f"Tool {name} missing required config "
f"param '{param_name}'"
)
config_args = data.get("arguments", [])
arguments = [
Argument(
name=arg.get("name"),
type=arg.get("type"),
description=arg.get("description"),
)
for arg in config_args
]
impl = functools.partial(
ToolServiceImpl,
request_queue=request_queue,
response_queue=response_queue,
config_values=config_values,
arguments=arguments,
processor=self,
)
else:
raise RuntimeError(
f"Tool type {impl_id} not known"
)
validate_tool_config(data)
tools[name] = Tool(
name=name,
description=data.get("description"),
implementation=impl,
config=data,
arguments=arguments,
)
# Load additional context from agent config
additional = None
if self.config_key in config:
agent_config = config[self.config_key]
additional = agent_config.get("additional-context", None)
self.agent = AgentManager(
tools=tools,
additional_context=additional,
)
# Re-initialise meta-router with config
self.meta_router = MetaRouter(config=config)
logger.info(f"Loaded {len(tools)} tools")
logger.info("Tool configuration reloaded.")
except Exception as e:
logger.error(
f"on_tools_config Exception: {e}", exc_info=True
)
logger.error("Configuration reload failed")
async def agent_request(self, request, respond, next, flow):
try:
pattern = getattr(request, 'pattern', '') or ''
# If no pattern set and this is the first iteration, route
if not pattern and not request.history:
context = UserAwareContext(flow, request.user)
if self.meta_router:
pattern, task_type, framing = await self.meta_router.route(
request.question, context,
)
else:
pattern = "react"
task_type = "general"
framing = ""
# Update request with routing decision
request.pattern = pattern
request.task_type = task_type
request.framing = framing
logger.info(
f"Routed to pattern={pattern}, "
f"task_type={task_type}"
)
# Dispatch to the selected pattern
if pattern == "plan-then-execute":
await self.plan_pattern.iterate(
request, respond, next, flow,
)
elif pattern == "supervisor":
await self.supervisor_pattern.iterate(
request, respond, next, flow,
)
else:
# Default to react
await self.react_pattern.iterate(
request, respond, next, flow,
)
except Exception as e:
logger.error(
f"agent_request Exception: {e}", exc_info=True
)
logger.debug("Send error response...")
error_obj = Error(
type="agent-error",
message=str(e),
)
r = AgentResponse(
chunk_type="error",
content=str(e),
end_of_message=True,
end_of_dialog=True,
error=error_obj,
)
await respond(r)
@staticmethod
def add_args(parser):
AgentService.add_args(parser)
parser.add_argument(
'--max-iterations',
default=default_max_iterations,
help=f'Maximum number of react iterations '
f'(default: {default_max_iterations})',
)
parser.add_argument(
'--config-type',
default="agent",
help='Configuration key for prompts (default: agent)',
)
def run():
Processor.launch(default_ident, __doc__)

View file

@ -0,0 +1,214 @@
"""
SupervisorPattern decomposes a query into subagent goals, fans out,
then synthesises results when all subagents complete.
Phase 1 (decompose): LLM breaks the query into independent sub-goals.
Fan-out: Each sub-goal is emitted as a new AgentRequest on the agent
request topic, carrying a correlation_id and parent_session_id.
Phase 2 (synthesise): Triggered when the aggregator detects all
subagents have completed. The supervisor fetches results and
produces the final answer.
"""
import json
import logging
import uuid
from ... schema import AgentRequest, AgentResponse, AgentStep
from ..react.types import Action, Final
from . pattern_base import PatternBase
logger = logging.getLogger(__name__)
MAX_SUBAGENTS = 5
class SupervisorPattern(PatternBase):
"""
Supervisor pattern: decompose, fan-out, synthesise.
History tracks phase via AgentStep.step_type:
- "decompose": the decomposition step (subagent goals in arguments)
- "synthesise": triggered by aggregator with results in subagent_results
"""
async def iterate(self, request, respond, next, flow):
streaming = getattr(request, 'streaming', False)
session_id = getattr(request, 'session_id', '') or str(uuid.uuid4())
collection = getattr(request, 'collection', 'default')
iteration_num = len(request.history) + 1
session_uri = self.processor.provenance_session_uri(session_id)
# Emit session provenance on first iteration
if iteration_num == 1:
await self.emit_session_triples(
flow, session_uri, request.question,
request.user, collection, respond, streaming,
)
logger.info(
f"SupervisorPattern iteration {iteration_num}: {request.question}"
)
# Check if this is a synthesis request (has subagent_results)
has_results = bool(
request.history
and any(
getattr(h, 'step_type', '') == 'decompose'
for h in request.history
)
and any(
getattr(h, 'subagent_results', None)
for h in request.history
)
)
if has_results:
await self._synthesise(
request, respond, next, flow,
session_id, collection, streaming,
session_uri, iteration_num,
)
else:
await self._decompose_and_fanout(
request, respond, next, flow,
session_id, collection, streaming,
session_uri, iteration_num,
)
async def _decompose_and_fanout(self, request, respond, next, flow,
session_id, collection, streaming,
session_uri, iteration_num):
"""Decompose the question into sub-goals and fan out subagents."""
think = self.make_think_callback(respond, streaming)
framing = getattr(request, 'framing', '')
tools = self.filter_tools(self.processor.agent.tools, request)
context = self.make_context(flow, request.user)
client = context("prompt-request")
# Use the supervisor-decompose prompt template
goals = await client.prompt(
id="supervisor-decompose",
variables={
"question": request.question,
"framing": framing,
"max_subagents": MAX_SUBAGENTS,
"tools": [
{"name": t.name, "description": t.description}
for t in tools.values()
],
},
)
# Validate result
if not isinstance(goals, list):
goals = []
goals = [g for g in goals if isinstance(g, str)]
goals = goals[:MAX_SUBAGENTS]
if not goals:
goals = [request.question]
await think(
f"Decomposed into {len(goals)} sub-goals: {goals}",
is_final=True,
)
# Generate correlation ID for this fan-out
correlation_id = str(uuid.uuid4())
# Emit decomposition provenance
decompose_act = Action(
thought=f"Decomposed into {len(goals)} sub-goals",
name="decompose",
arguments={"goals": json.dumps(goals), "correlation_id": correlation_id},
observation=f"Fanning out {len(goals)} subagents",
)
await self.emit_iteration_triples(
flow, session_id, iteration_num, session_uri,
decompose_act, request, respond, streaming,
)
# Fan out: emit a subagent request for each goal
for i, goal in enumerate(goals):
subagent_session = str(uuid.uuid4())
sub_request = AgentRequest(
question=goal,
state="",
group=getattr(request, 'group', []),
history=[],
user=request.user,
collection=collection,
streaming=False, # Subagents don't stream
session_id=subagent_session,
conversation_id=getattr(request, 'conversation_id', ''),
pattern="react", # Subagents use react by default
task_type=getattr(request, 'task_type', ''),
framing=getattr(request, 'framing', ''),
correlation_id=correlation_id,
parent_session_id=session_id,
subagent_goal=goal,
expected_siblings=len(goals),
)
await next(sub_request)
logger.info(f"Fan-out: emitted subagent {i} for goal: {goal}")
# NOTE: The supervisor stops here. The aggregator will detect
# when all subagents complete and emit a synthesis request
# with the results populated.
logger.info(
f"Supervisor fan-out complete: {len(goals)} subagents, "
f"correlation_id={correlation_id}"
)
async def _synthesise(self, request, respond, next, flow,
session_id, collection, streaming,
session_uri, iteration_num):
"""Synthesise final answer from subagent results."""
think = self.make_think_callback(respond, streaming)
framing = getattr(request, 'framing', '')
# Collect subagent results from history
subagent_results = {}
for step in request.history:
results = getattr(step, 'subagent_results', None)
if results:
subagent_results.update(results)
if not subagent_results:
logger.warning("Synthesis called with no subagent results")
subagent_results = {"(no results)": "No subagent results available"}
context = self.make_context(flow, request.user)
client = context("prompt-request")
await think("Synthesising final answer from sub-agent results", is_final=True)
response_text = await self.prompt_as_answer(
client, "supervisor-synthesise",
variables={
"question": request.question,
"framing": framing,
"results": [
{"goal": goal, "result": result}
for goal, result in subagent_results.items()
],
},
respond=respond,
streaming=streaming,
)
await self.emit_final_triples(
flow, session_id, iteration_num, session_uri,
response_text, request, respond, streaming,
)
await self.send_final_response(
respond, streaming, response_text, already_streamed=streaming,
)

View file

@ -485,25 +485,16 @@ class Processor(AgentService):
logger.debug(f"Think: {x} (is_final={is_final})")
if streaming:
# Streaming format
r = AgentResponse(
chunk_type="thought",
content=x,
end_of_message=is_final,
end_of_dialog=False,
# Legacy fields for backward compatibility
answer=None,
error=None,
thought=x,
observation=None,
)
else:
# Non-streaming format
r = AgentResponse(
answer=None,
error=None,
thought=x,
observation=None,
chunk_type="thought",
content=x,
end_of_message=True,
end_of_dialog=False,
)
@ -515,25 +506,16 @@ class Processor(AgentService):
logger.debug(f"Observe: {x} (is_final={is_final})")
if streaming:
# Streaming format
r = AgentResponse(
chunk_type="observation",
content=x,
end_of_message=is_final,
end_of_dialog=False,
# Legacy fields for backward compatibility
answer=None,
error=None,
thought=None,
observation=x,
)
else:
# Non-streaming format
r = AgentResponse(
answer=None,
error=None,
thought=None,
observation=x,
chunk_type="observation",
content=x,
end_of_message=True,
end_of_dialog=False,
)
@ -545,25 +527,16 @@ class Processor(AgentService):
logger.debug(f"Answer: {x}")
if streaming:
# Streaming format
r = AgentResponse(
chunk_type="answer",
content=x,
end_of_message=False, # More chunks may follow
end_of_message=False,
end_of_dialog=False,
# Legacy fields for backward compatibility
answer=None,
error=None,
thought=None,
observation=None,
)
else:
# Non-streaming format - shouldn't normally be called
r = AgentResponse(
answer=x,
error=None,
thought=None,
observation=None,
chunk_type="answer",
content=x,
end_of_message=True,
end_of_dialog=False,
)
@ -677,25 +650,17 @@ class Processor(AgentService):
))
if streaming:
# Streaming format - send end-of-dialog marker
# Answer chunks were already sent via answer() callback during parsing
# End-of-dialog marker — answer chunks already sent via callback
r = AgentResponse(
chunk_type="answer",
content="", # Empty content, just marking end of dialog
content="",
end_of_message=True,
end_of_dialog=True,
# Legacy fields set to None - answer already sent via streaming chunks
answer=None,
error=None,
thought=None,
)
else:
# Non-streaming format - send complete answer
r = AgentResponse(
answer=act.final,
error=None,
thought=None,
observation=None,
chunk_type="answer",
content=f,
end_of_message=True,
end_of_dialog=True,
)
@ -833,19 +798,11 @@ class Processor(AgentService):
# Check if streaming was enabled (may not be set if error occurred early)
streaming = getattr(request, 'streaming', False) if 'request' in locals() else False
if streaming:
# Streaming format
r = AgentResponse(
chunk_type="error",
content=str(e),
end_of_message=True,
end_of_dialog=True,
# Legacy fields for backward compatibility
error=error_obj,
)
else:
# Legacy format
r = AgentResponse(
error=error_obj,
)