diff --git a/docs/tech-specs/agent-orchestration.md b/docs/tech-specs/agent-orchestration.md new file mode 100644 index 00000000..c93388ed --- /dev/null +++ b/docs/tech-specs/agent-orchestration.md @@ -0,0 +1,939 @@ +# TrustGraph Agent Orchestration — Technical Specification + +## Overview + +This specification describes the extension of TrustGraph's agent architecture +from a single ReACT execution pattern to a multi-pattern orchestration +model. The existing Pulsar-based self-queuing loop is pattern-agnostic — the +same infrastructure supports ReACT, Plan-then-Execute, Supervisor/Subagent +fan-out, and other execution strategies without changes to the message +transport. The extension adds a routing layer that selects the appropriate +pattern for each task, a set of pattern implementations that share common +iteration infrastructure, and a fan-out/fan-in mechanism for multi-agent +coordination. + +The central design principle is that +**trust and explainability are structural properties of the architecture**, +achieved by constraining LLM decisions to +graph-defined option sets and recording those constraints in the execution +trace. + +--- + +## Background + +### Existing Architecture + +The current agent manager is built on the ReACT pattern (Reasoning + Acting) +with these properties: + +- **Self-queuing loop**: Each iteration emits a new Pulsar message carrying + the accumulated history. The agent manager picks this up and runs the next + iteration. +- **Stateless agent manager**: No in-process state. All state lives in the + message payload. +- **Natural parallelism**: Multiple independent agent requests are handled + concurrently across Pulsar consumers. +- **Durability**: Crash recovery is inherent — the message survives process + failure. +- **Real-time feedback**: Streaming thought, action, observation and answer + chunks are emitted as iterations complete. +- **Tool calling and MCP invocation**: Tool calls into knowledge graphs, + external services, and MCP-connected systems. +- **Decision traces written to the knowledge graph**: Every iteration records + PROV-O triples — session, analysis, and conclusion entities — forming the + basis of explainability. + +### Current Message Flow + +``` +AgentRequest arrives (question, history=[], state, group, session_id) + │ + ▼ + Filter tools by group/state + │ + ▼ + AgentManager.react() → LLM call → parse → Action or Final + │ │ + │ [Action] │ [Final] + ▼ ▼ + Execute tool, capture observation Emit conclusion triples + Emit iteration triples Send AgentResponse + Append to history (end_of_dialog=True) + Emit new AgentRequest → "next" topic + │ + └── (picked up again by consumer, loop continues) +``` + +The key insight is that this loop structure is not ReACT-specific. The +plumbing — receive message, do work, emit next message — is the same +regardless of what the "work" step does. The payload and the pattern logic +define the behaviour; the infrastructure remains constant. + +### Current Limitations + +- Only one execution pattern (ReACT) is available regardless of task + characteristics. +- No mechanism for one agent to spawn and coordinate subagents. +- Pattern selection is implicit — every task gets the same treatment. +- The provenance model assumes a linear iteration chain (analysis N derives + from analysis N-1), with no support for parallel branches. + +--- + +## Design Goals + +- **Pattern-agnostic iteration infrastructure**: The self-queuing loop, tool + filtering, provenance emission, and streaming feedback should be shared + across all patterns. +- **Graph-constrained pattern selection**: The LLM selects patterns from a + graph-defined set, not from unconstrained reasoning. This makes the + selection auditable and explainable. +- **Genuinely parallel fan-out**: Subagent tasks execute concurrently on the + Pulsar queue, not sequentially in a single process. +- **Stateless coordination**: Fan-in uses the knowledge graph as coordination + substrate. The agent manager remains stateless. +- **Additive change**: The existing ReACT flow continues to work + unchanged. New patterns are added alongside it, not in place of it. + +--- + +## Patterns + +### ReACT as One Pattern Among Many + +ReACT is one point in a wider space of agent execution strategies: + +| Pattern | Structure | Strengths | +|---|---|---| +| **ReACT** | Interleaved reasoning and action | Adaptive, good for open-ended tasks | +| **Plan-then-Execute** | Decompose into a step DAG, then execute | More predictable, auditable plan | +| **Reflexion** | ReACT + self-critique after each action | Agents improve within the episode | +| **Supervisor/Subagent** | One agent orchestrates others | Parallel decomposition, synthesis | +| **Debate/Ensemble** | Multiple agents reason independently | Diverse perspectives, reconciliation | +| **LLM-as-router** | No reasoning loop, pure dispatch | Fast classification and routing | + +Not all of these need to be implemented at once. The architecture should +support them; the initial implementation delivers ReACT (already exists), +Plan-then-Execute, and Supervisor/Subagent. + +### Pattern Storage + +Patterns are stored as configuration items via the config API. They are +finite in number, mechanically well-defined, have enumerable properties, +and change slowly. Each pattern is a JSON object stored under the +`agent-pattern` config type. + +```json +Config type: "agent-pattern" +Config key: "react" +Value: { + "name": "react", + "description": "ReACT — Reasoning + Acting", + "when_to_use": "Adaptive, good for open-ended tasks" +} +``` + +These are written at deployment time and change rarely. If the architecture +later benefits from graph-based pattern storage (e.g. for richer ontological +relationships), the config items can be migrated to graph nodes — the +meta-router's selection logic is the same regardless of backend. + +--- + +## Task Types + +### What a Task Type Represents + +A **task type** characterises the problem domain — what the agent is being +asked to accomplish, and how a domain expert would frame it analytically. + +- Carries domain-specific methodology (e.g. "intelligence analysis always + applies structured analytic techniques") +- Pre-populates initial reasoning context via a framing prompt +- Constrains which patterns are valid for this class of problem +- Can define domain-specific termination criteria + +### Identification + +Task types are identified from plain-text task descriptions by the +LLM. Building a formal ontology over task descriptions is premature — natural +language is too varied and context-dependent. The LLM reads the description; +the graph provides the structure downstream. + +### Task Type Storage + +Task types are stored as configuration items via the config API under the +`agent-task-type` config type. Each task type is a JSON object that +references valid patterns by name. + +```json +Config type: "agent-task-type" +Config key: "risk-assessment" +Value: { + "name": "risk-assessment", + "description": "Due Diligence / Risk Assessment", + "framing_prompt": "Analyse across financial, reputational, legal and operational dimensions using structured analytic techniques.", + "valid_patterns": ["supervisor", "plan-then-execute", "react"], + "when_to_use": "Multi-dimensional analysis requiring structured assessment" +} +``` + +The `valid_patterns` list defines the constrained decision space — the LLM +can only select patterns that the task type's configuration says are valid. +This is the many-to-many relationship between task types and patterns, +expressed as configuration rather than graph edges. + +### Selection Flow + +``` +Task Description (plain text, from AgentRequest.question) + │ + │ [LLM interprets, constrained by available task types from config] + ▼ +Task Type (config item — domain framing and methodology) + │ + │ [config lookup — valid_patterns list] + ▼ +Pattern Candidates (config items) + │ + │ [LLM selects within constrained set, + │ informed by task description signals: + │ complexity, urgency, scope] + ▼ +Selected Pattern +``` + +The task description may carry modulating signals (complexity, urgency, scope) +that influence which pattern is selected within the constrained set. But the +raw description never directly selects a pattern — it always passes through +the task type layer first. + +--- + +## Explainability Through Constrained Decision Spaces + +A central principle of TrustGraph's explainability architecture is that +**explainability comes from constrained decision spaces**. + +When a decision is made from an unconstrained space — a raw LLM call with no +guardrails — the reasoning is opaque even if the LLM produces a rationale, +because that rationale is post-hoc and unverifiable. + +When a decision is made from a **constrained set defined in configuration**, +you can always answer: +- What valid options were available +- What criteria narrowed the set +- What signal made the final selection within that set + +This principle already governs the existing decision trace architecture and +extends naturally to pattern selection. The routing decision — which task type +and which pattern — is itself recorded as a provenance node, making the first +decision in the execution trace auditable. + +**Trust becomes a structural property of the architecture, not a claimed +property of the model.** + +--- + +## Orchestration Architecture + +### The Meta-Router + +The meta-router is the entry point for all agent requests. It runs as a +pre-processing step before the pattern-specific iteration loop begins. Its +job is to determine the task type and select the execution pattern. + +**When it runs**: On receipt of an `AgentRequest` with empty history (i.e. a +new task, not a continuation). Requests with non-empty history are already +mid-iteration and bypass the meta-router. + +**What it does**: + +1. Lists all available task types from the config API + (`config.list("agent-task-type")`). +2. Presents these to the LLM alongside the task description. The LLM + identifies which task type applies (or "general" as a fallback). +3. Reads the selected task type's configuration to get the `valid_patterns` + list. +4. Loads the candidate pattern definitions from config and presents them to + the LLM. The LLM selects one, influenced by signals in the task + description (complexity, number of independent dimensions, urgency). +5. Records the routing decision as a provenance node (see Provenance Model + below). +6. Populates the `AgentRequest` with the selected pattern, task type framing + prompt, and any pattern-specific configuration, then emits it onto the + queue. + +**Where it lives**: The meta-router is a phase within the agent-orchestrator, +not a separate service. The agent-orchestrator is a new executable that +uses the same service identity as the existing agent-manager-react, making +it a drop-in replacement on the same Pulsar queues. It includes the full +ReACT implementation alongside the new orchestration patterns. The +distinction between "route" and "iterate" is determined by whether the +request already has a pattern set. + +### Pattern Dispatch + +Once the meta-router has annotated the request with a pattern, the agent +manager dispatches to the appropriate pattern implementation. This is a +straightforward branch on the pattern field: + +``` +request arrives + │ + ├── history is empty → meta-router → annotate with pattern → re-emit + │ + └── history is non-empty (or pattern is set) + │ + ├── pattern = "react" → ReACT iteration + ├── pattern = "plan-then-execute" → PtE iteration + ├── pattern = "supervisor" → Supervisor iteration + └── (no pattern) → ReACT iteration (default) +``` + +Each pattern implementation follows the same contract: receive a request, do +one iteration of work, then either emit a "next" message (continue) or emit a +response (done). The self-queuing loop doesn't change. + +### Pattern Implementations + +#### ReACT (Existing) + +No changes. The existing `AgentManager.react()` path continues to work +as-is. + +#### Plan-then-Execute + +Two-phase pattern: + +**Planning phase** (first iteration): +- LLM receives the question plus task type framing. +- Produces a structured plan: an ordered list of steps, each with a goal, + expected tool, and dependencies on prior steps. +- The plan is recorded in the history as a special "plan" step. +- Emits a "next" message to begin execution. + +**Execution phase** (subsequent iterations): +- Reads the plan from history. +- Identifies the next unexecuted step. +- Executes that step (tool call + observation), similar to a single ReACT + action. +- Records the result against the plan step. +- If all steps complete, synthesises a final answer. +- If a step fails or produces unexpected results, the LLM can revise the + remaining plan (bounded re-planning, not a full restart). + +The plan lives in the history, so it travels with the message. No external +state is needed. + +#### Supervisor/Subagent + +The supervisor pattern introduces fan-out and fan-in. This is the most +architecturally significant addition. + +**Supervisor planning iteration**: +- LLM receives the question plus task type framing. +- Decomposes the task into independent subagent goals. +- For each subagent, emits a new `AgentRequest` with: + - A focused question (the subagent's specific goal) + - A shared correlation ID tying it to the parent task + - The subagent's own pattern (typically ReACT, but could be anything) + - Relevant context sliced from the parent request + +**Subagent execution**: +- Each subagent request is picked up by an agent manager instance and runs its + own independent iteration loop. +- Subagents are ordinary agent executions — they self-queue, use tools, emit + provenance, stream feedback. +- When a subagent reaches a Final answer, it writes a completion record to the + knowledge graph under the shared correlation ID. + +**Fan-in and synthesis**: +- An aggregator detects when all sibling subagents for a correlation ID have + completed. +- It emits a synthesis request to the supervisor carrying the correlation ID. +- The supervisor queries the graph for subagent results, reasons across them, + and decides whether to emit a final answer or iterate again. + +**Supervisor re-iteration**: +- After synthesis, the supervisor may determine that the results are + incomplete, contradictory, or reveal gaps requiring further investigation. +- Rather than emitting a final answer, it can fan out again with new or + refined subagent goals under a new correlation ID. This is the same + self-queuing loop — the supervisor emits new subagent requests and stops, + the aggregator detects completion, and synthesis runs again. +- The supervisor's iteration count (planning + synthesis rounds) is bounded + to prevent unbounded looping. + +This is detailed further in the Fan-Out / Fan-In section below. + +--- + +## Message Schema Evolution + +### Shared Schema Principle + +The `AgentRequest` and `AgentResponse` schemas are the shared contract +between the agent-manager (existing ReACT execution) and the +agent-orchestrator (meta-routing, supervisor, plan-then-execute). Both +services consume from the same *agent request* topic using the same +schema. Any schema changes must be reflected in both — the schema is +the integration point, not the service implementation. + +This means the orchestrator does not introduce separate message types for +its own use. Subagent requests, synthesis triggers, and meta-router +outputs are all `AgentRequest` messages with different field values. The +agent-manager ignores orchestration fields it doesn't use. + +### New Fields + +The `AgentRequest` schema needs new fields to carry orchestration +metadata. + +```python +@dataclass +class AgentRequest: + # Existing fields (unchanged) + question: str = "" + state: str = "" + group: list[str] | None = None + history: list[AgentStep] = field(default_factory=list) + user: str = "" + collection: str = "default" + streaming: bool = False + session_id: str = "" + + # New orchestration fields + conversation_id: str = "" # Optional caller-generated ID grouping related requests + pattern: str = "" # "react", "plan-then-execute", "supervisor", "" + task_type: str = "" # Identified task type name + framing: str = "" # Task type framing prompt injected into LLM context + correlation_id: str = "" # Shared ID linking subagents to parent + parent_session_id: str = "" # Parent's session_id (for subagents) + subagent_goal: str = "" # Focused goal for this subagent + expected_siblings: int = 0 # How many sibling subagents exist +``` + +The `AgentStep` schema also extends to accommodate non-ReACT iteration types: + +```python +@dataclass +class AgentStep: + # Existing fields (unchanged) + thought: str = "" + action: str = "" + arguments: dict[str, str] = field(default_factory=dict) + observation: str = "" + user: str = "" + + # New fields + step_type: str = "" # "react", "plan", "execute", "supervise", "synthesise" + plan: list[PlanStep] | None = None # For plan-then-execute: the structured plan + subagent_results: dict | None = None # For supervisor: collected subagent outputs +``` + +The `PlanStep` structure for Plan-then-Execute: + +```python +@dataclass +class PlanStep: + goal: str = "" # What this step should accomplish + tool_hint: str = "" # Suggested tool (advisory, not binding) + depends_on: list[int] = field(default_factory=list) # Indices of prerequisite steps + status: str = "pending" # "pending", "complete", "failed", "revised" + result: str = "" # Observation from execution +``` + +--- + +## Fan-Out and Fan-In + +### Why This Matters + +Fan-out is the mechanism that makes multi-agent coordination genuinely +parallel rather than simulated. With Pulsar, emitting multiple messages means +multiple consumers can pick them up concurrently. This is not threading or +async simulation — it is real distributed parallelism across agent manager +instances. + +### Fan-Out: Supervisor Emits Subagent Requests + +When a supervisor iteration decides to decompose a task, it: + +1. Generates a **correlation ID** — a UUID that groups the sibling subagents. +2. For each subagent, constructs a new `AgentRequest`: + - `question` = the subagent's focused goal (from `subagent_goal`) + - `correlation_id` = the shared correlation ID + - `parent_session_id` = the supervisor's session_id + - `pattern` = typically "react", but the supervisor can specify any pattern + - `session_id` = a new unique ID for this subagent's own provenance chain + - `expected_siblings` = total number of sibling subagents + - `history` = empty (fresh start, but framing context inherited) + - `group`, `user`, `collection` = inherited from parent +3. Emits each subagent request onto the agent request topic. +4. Records the fan-out decision in the provenance graph (see below). + +The supervisor then **stops**. It does not wait. It does not poll. It has +emitted its messages and its iteration is complete. The graph and the +aggregator handle the rest. + +### Fan-In: Graph-Based Completion Detection + +When a subagent reaches its Final answer, it writes a **completion node** to +the knowledge graph: + +``` +Completion node: + rdf:type tg:SubagentCompletion + tg:correlationId + tg:subagentSessionId + tg:parentSessionId + tg:subagentGoal + tg:result → + prov:wasGeneratedBy → +``` + +The **aggregator** is a component that watches for completion nodes. When it +detects that all expected siblings for a correlation ID have written +completion nodes, it: + +1. Collects all sibling results from the graph and librarian. +2. Constructs a **synthesis request** — a new `AgentRequest` addressed to the supervisor flow: + - `session_id` = the original supervisor's session_id + - `pattern` = "supervisor" + - `step_type` = "synthesise" (carried in history) + - `subagent_results` = the collected findings + - `history` = the supervisor's history up to the fan-out point, plus the synthesis step +3. Emits this onto the agent request topic. + +The supervisor picks this up, reasons across the aggregated findings, and +produces its final answer. + +### Aggregator Design + +The aggregator is event-driven, consistent with TrustGraph's Pulsar-based +architecture. Polling would be an anti-pattern in a system where all +coordination is message-driven. + +**Mechanism**: The aggregator is a Pulsar consumer on the explainability +topic. Subagent completion nodes are emitted as triples on this topic as +part of the existing provenance flow. When the aggregator receives a +`tg:SubagentCompletion` triple, it: + +1. Extracts the `tg:correlationId` from the completion node. +2. Queries the graph to count how many siblings for that correlation ID + have completed. +3. If all `expected_siblings` are present, triggers fan-in immediately — + collects results and emits the synthesis request. + +**State**: The aggregator is stateless in the same sense as the agent +manager — it holds no essential in-memory state. The graph is the source +of truth for completion counts. If the aggregator restarts, it can +re-process unacknowledged completion messages from Pulsar and re-check the +graph. No coordination state is lost. + +**Consistency**: Because the completion check queries the graph rather than +relying on an in-memory counter, the aggregator is tolerant of duplicate +messages, out-of-order delivery, and restarts. The graph query is +idempotent — asking "are all siblings complete?" gives the same answer +regardless of how many times or in what order the events arrive. + +### Timeout and Failure + +- **Subagent timeout**: The aggregator records the timestamp of the first + sibling completion (from the graph). A periodic timeout check (the one + concession to polling — but over local state, not the graph) detects + stalled correlation IDs. If `expected_siblings` completions are not + reached within a configurable timeout, the aggregator emits a partial + synthesis request with whatever results are available, flagging the + incomplete subagents. +- **Subagent failure**: If a subagent errors out, it writes an error + completion node (with `tg:status = "error"` and an error message). The + aggregator treats this as a completion — the supervisor receives the error + in its synthesis input and can reason about partial results. +- **Supervisor iteration limit**: The supervisor's own iteration count + (planning + synthesis) is bounded by `max_iterations` just like any other + pattern. + +--- + +## Provenance Model Extensions + +### Routing Decision + +The meta-router's task type and pattern selection is recorded as the first +provenance node in the session: + +``` +Routing node: + rdf:type prov:Entity, tg:RoutingDecision + prov:wasGeneratedBy → session (Question) activity + tg:taskType → TaskType node URI + tg:selectedPattern → Pattern node URI + tg:candidatePatterns → [Pattern node URIs] (what was available) + tg:routingRationale → document URI in librarian (LLM's reasoning) +``` + +This captures the constrained decision space: what candidates existed, which +was selected, and why. The candidates are graph-derived; the rationale is +LLM-generated but verifiable against the candidates. + +### Fan-Out Provenance + +When a supervisor fans out, the provenance records the decomposition: + +``` +FanOut node: + rdf:type prov:Entity, tg:FanOut + prov:wasDerivedFrom → supervisor's routing or planning iteration + tg:correlationId + tg:subagentGoals → [document URIs for each subagent goal] + tg:expectedSiblings +``` + +Each subagent's provenance chain is independent (its own session, iterations, +conclusion) but linked back to the parent via: + +``` +Subagent session: + rdf:type prov:Activity, tg:Question, tg:AgentQuestion + tg:parentCorrelationId + tg:parentSessionId +``` + +### Fan-In Provenance + +The synthesis step links back to all subagent conclusions: + +``` +Synthesis node: + rdf:type prov:Entity, tg:Synthesis + prov:wasDerivedFrom → [all subagent Conclusion entities] + tg:correlationId +``` + +This creates a DAG in the provenance graph: the supervisor's routing fans out +to N parallel subagent chains, which fan back in to a synthesis node. The +entire multi-agent execution is traceable from a single correlation ID. + +### URI Scheme + +Extending the existing `urn:trustgraph:agent:{session_id}` pattern: + +| Entity | URI Pattern | +|---|---| +| Session (existing) | `urn:trustgraph:agent:{session_id}` | +| Iteration (existing) | `urn:trustgraph:agent:{session_id}/i{n}` | +| Conclusion (existing) | `urn:trustgraph:agent:{session_id}/answer` | +| Routing decision | `urn:trustgraph:agent:{session_id}/routing` | +| Fan-out record | `urn:trustgraph:agent:{session_id}/fanout/{correlation_id}` | +| Subagent completion | `urn:trustgraph:agent:{session_id}/completion` | + +--- + +## Storage Responsibilities + +Pattern and task type definitions live in the config API. Runtime state and +provenance live in the knowledge graph. The division is: + +| Role | Storage | When Written | Content | +|---|---|---|---| +| Pattern definitions | Config API | At design time | Pattern properties, descriptions | +| Task type definitions | Config API | At design time | Domain framing, valid pattern lists | +| Routing decision trace | Knowledge graph | At request arrival | Why this task type and pattern were selected | +| Iteration decision trace | Knowledge graph | During execution | Each think/act/observe cycle, per existing model | +| Fan-out coordination | Knowledge graph | During fan-out | Subagent goals, correlation ID, expected count | +| Subagent completion | Knowledge graph | During fan-in | Per-subagent results under shared correlation ID | +| Execution audit trail | Knowledge graph | Post-execution | Full multi-agent reasoning trace as a DAG | + +The config API holds the definitions that constrain decisions. The knowledge +graph holds the runtime decisions and their provenance. The fan-in +coordination state is part of the provenance automatically — subagent +completion nodes are both coordination signals and audit trail entries. + +--- + +## Worked Example: Partner Risk Assessment + +**Request**: "Assess the risk profile of Company X as a potential partner" + +**1. Request arrives** on the *agent request* topic with empty history. +The agent manager picks it up. + +**2. Meta-router**: +- Queries config API, finds task types: *Risk Assessment*, *Research*, + *Summarisation*, *General*. +- LLM identifies *Risk Assessment*. Framing prompt loaded: "analyse across + financial, reputational, legal and operational dimensions using structured + analytic techniques." +- Valid patterns for *Risk Assessment*: [*Supervisor/Subagent*, + *Plan-then-Execute*, *ReACT*]. +- LLM selects *Supervisor/Subagent* — task has four independent investigative + dimensions, well-suited to parallel decomposition. +- Routing decision written to graph. Request re-emitted on the + *agent request* topic with `pattern="supervisor"`, framing populated. + +**3. Supervisor iteration** (picked up from *agent request* topic): +- LLM receives question + framing. Reasons that four independent investigative + threads are required. +- Generates correlation ID `corr-abc123`. +- Emits four subagent requests on the *agent request* topic: + - Financial analysis (pattern="react", subagent_goal="Analyse financial + health and stability of Company X") + - Legal analysis (pattern="react", subagent_goal="Review regulatory filings, + sanctions, and legal exposure for Company X") + - Reputational analysis (pattern="react", subagent_goal="Analyse news + sentiment and public reputation of Company X") + - Operational analysis (pattern="react", subagent_goal="Assess supply chain + dependencies and operational risks for Company X") +- Fan-out node written to graph. + +**4. Four subagents run in parallel** (each picked up from the *agent +request* topic by agent manager instances), each as an independent ReACT +loop: +- Financial — queries financial data services and knowledge graph + relationships +- Legal — searches regulatory filings and sanctions lists +- Reputational — searches news, analyses sentiment +- Operational — queries supply chain databases + +Each self-queues its iterations on the *agent request* topic. Each writes +its own decision trace to the graph as it progresses. Each completes +independently. + +**5. Fan-in**: +- Each subagent writes a `tg:SubagentCompletion` node to the graph on + completion, emitted on the *explainability* topic. The completion node + references the subagent's result document in the librarian. +- Aggregator (consuming the *explainability* topic) sees each completion + event. It queries the graph for the fan-out node to get the expected + sibling count, then checks how many completions exist for + `corr-abc123`. +- When all four siblings are complete, the aggregator emits a synthesis + request on the *agent request* topic with the correlation ID. It does + not fetch or bundle subagent results — the supervisor will query the + graph for those. + +**6. Supervisor synthesis** (picked up from *agent request* topic): +- Receives the synthesis trigger carrying the correlation ID. +- Queries the graph for `tg:SubagentCompletion` nodes under + `corr-abc123`, retrieving each subagent's goal and result document + reference. +- Fetches the result documents from the librarian. +- Reasons across all four dimensions, produces a structured risk + assessment with confidence scores. +- Emits final answer on the *agent response* topic and writes conclusion + provenance to the graph. + +**7. Response delivered** — the supervisor's synthesis streams on the +*agent response* topic as the LLM generates it, with `end_of_dialog` +on the final chunk. The collated answer is saved to the librarian and +referenced from conclusion provenance in the graph. The graph now holds +a complete, human-readable trace of the entire multi-agent execution — +from pattern selection through four parallel investigations to final +synthesis. + +--- + +## Class Hierarchy + +The agent-orchestrator executable (`agent-orchestrator`) uses the same +service identity as agent-manager-react, making it a drop-in replacement. +The pattern dispatch model suggests a class hierarchy where shared iteration +infrastructure lives in a base class and pattern-specific logic is in +subclasses: + +``` +AgentService (base — Pulsar consumer/producer specs, request handling) + │ + └── Processor (agent-orchestrator service) + │ + ├── MetaRouter — task type identification, pattern selection + │ + ├── PatternBase — shared: tool filtering, provenance, streaming, history + │ ├── ReactPattern — existing ReACT logic (extract from current AgentManager) + │ ├── PlanThenExecutePattern — plan phase + execute phase + │ └── SupervisorPattern — fan-out, synthesis + │ + └── Aggregator — fan-in completion detection +``` + +`PatternBase` captures what is currently spread across `Processor` and +`AgentManager`: tool filtering, LLM invocation, provenance triple emission, +streaming callbacks, history management. The pattern subclasses implement only +the decision logic specific to their execution strategy — what to do with the +LLM output, when to terminate, whether to fan out. + +This refactoring is not strictly necessary for the first iteration — the +meta-router and pattern dispatch could be added as branches within the +existing `Processor.agent_request()` method. But the class hierarchy clarifies +where shared vs. pattern-specific logic lives and will prevent duplication as +more patterns are added. + +--- + +## Configuration + +### Config API Seeding + +Pattern and task type definitions are stored via the config API and need to +be seeded at deployment time. This is analogous to how flow blueprints and +parameter types are loaded — a bootstrap step that writes the initial +configuration. + +The initial seed includes: + +**Patterns** (config type `agent-pattern`): +- `react` — interleaved reasoning and action +- `plan-then-execute` — structured plan followed by step execution +- `supervisor` — decomposition, fan-out to subagents, synthesis + +**Task types** (config type `agent-task-type`, initial set, expected to grow): +- `general` — no specific domain framing, all patterns valid +- `research` — open-ended investigation, valid patterns: react, plan-then-execute +- `risk-assessment` — multi-dimensional analysis, valid patterns: supervisor, + plan-then-execute, react +- `summarisation` — condense information, valid patterns: react + +The seed data is configuration, not code. It can be extended via the config +API (or the configuration UI) without redeploying the agent manager. + +### Migration Path + +The config API provides a practical starting point. If richer ontological +relationships between patterns, task types, and domain knowledge become +valuable, the definitions can be migrated to graph storage. The meta-router's +selection logic queries an abstract set of task types and patterns — the +storage backend is an implementation detail. + +### Fallback Behaviour + +If the config contains no patterns or task types: +- Task type defaults to `general`. +- Pattern defaults to `react`. +- The system degrades gracefully to existing behaviour. + +--- + +## Design Decisions + +| Decision | Resolution | Rationale | +|---|---|---| +| Task type identification | LLM interprets from plain text | Natural language too varied to formalise prematurely | +| Pattern/task type storage | Config API initially, graph later if needed | Avoids graph model complexity upfront; config API already has UI support; migration path is straightforward | +| Meta-router location | Phase within agent manager, not separate service | Avoids an extra network hop; routing is fast | +| Fan-in mechanism | Event-driven via explainability topic | Consistent with Pulsar-based architecture; graph query for completion count is idempotent and restart-safe | +| Aggregator deployment | Separate lightweight process | Decoupled from agent manager lifecycle | +| Subagent pattern selection | Supervisor specifies per-subagent | Supervisor has task context to make this choice | +| Plan storage | In message history | No external state needed; plan travels with message | +| Default pattern | Empty pattern field → ReACT | Sensible default when meta-router is not configured | + +--- + +## Streaming Protocol + +### Current Model + +The existing agent response schema has two levels: + +- **`end_of_message`** — marks the end of a complete thought, observation, + or answer. Chunks belonging to the same message arrive sequentially. +- **`end_of_dialog`** — marks the end of the entire agent execution. No + more messages will follow. + +This works because the current system produces messages serially — one +thought at a time, one agent at a time. + +### Fan-Out Breaks Serial Assumptions + +With supervisor/subagent fan-out, multiple subagents stream chunks +concurrently on the same *agent response* topic. The caller receives +interleaved chunks from different sources and needs to demultiplex them. + +### Resolution: Message ID + +Each chunk carries a `message_id` — a per-message UUID generated when +the agent begins streaming a new thought, observation, or answer. The +caller groups chunks by `message_id` and assembles each message +independently. + +``` +Response chunk fields: + message_id UUID for this message (groups chunks) + session_id Which agent session produced this chunk + chunk_type "thought" | "observation" | "answer" | ... + content The chunk text + end_of_message True on the final chunk of this message + end_of_dialog True on the final message of the entire execution +``` + +A single subagent emits multiple messages (thought, observation, thought, +answer), each with a distinct `message_id`. The `session_id` identifies +which subagent the message belongs to. The caller can display, group, or +filter by either. + +### Provenance Trigger + +`end_of_message` is the trigger for provenance storage. When a complete +message has been assembled from its chunks: + +1. The collated text is saved to the librarian as a single document. +2. A provenance node is written to the graph referencing the document URI. + +This follows the pattern established by GraphRAG, where streaming synthesis +chunks are delivered live but the stored provenance references the collated +answer text. Streaming is for the caller; provenance needs complete messages. + +--- + +## Open Questions + +- **Re-planning depth** (resolved): Runtime parameter on the + agent-orchestrator executable, default 2. Bounds how many times + Plan-then-Execute can revise its plan before forcing termination. +- **Nested fan-out** (phase B): A subagent can itself be a supervisor + that fans out further. The architecture supports this — correlation IDs + are independent and the aggregator is stateless. The protocols and + message schema should not preclude nested fan-out, but implementation + is deferred. Depth limits will need to be enforced to prevent runaway + decomposition. +- **Task type evolution** (resolved): Manually curated for now. See + Future Directions below for automated discovery. +- **Cost attribution** (deferred): Costs are measured at the + text-completion queue level as they are today. Per-request attribution + across subagents is not yet implemented and is not a blocker for + orchestration. +- **Conversation ID** (resolved): An optional `conversation_id` field on + `AgentRequest`, generated by the caller. When present, all objects + created during the execution (provenance nodes, librarian documents, + subagent completion records) are tagged with the conversation ID. This + enables querying all interactions in a conversation with a single + lookup, and provides the foundation for conversation-scoped memory. + No explicit open/close — the first request with a new conversation ID + implicitly starts the conversation. Omit for one-shot queries. +- **Tool scoping per subagent** (resolved): Subagents inherit the + parent's tool group by default. The supervisor can optionally override + the group per subagent to constrain capabilities (e.g. financial + subagent gets only financial tools). The `group` field on + `AgentRequest` already supports this — the supervisor just sets it + when constructing subagent requests. + +--- + +## Future Directions + +### Automated Task Type Discovery + +Task types are manually curated in the initial implementation. However, +the architecture is well-suited to automated discovery because all agent +requests and their execution traces flow through Pulsar topics. A +learning service could consume these messages and analyse patterns in +how tasks are framed, which patterns are selected, and how successfully +they execute. Over time, it could propose new task types based on +clusters of similar requests that don't map well to existing types, or +suggest refinements to framing prompts based on which framings lead to +better outcomes. This service would write proposed task types to the +config API for human review — automated discovery, manual approval. The +agent-orchestrator does not need to change; it always reads task types +from config regardless of how they got there. diff --git a/tests/contract/conftest.py b/tests/contract/conftest.py index c474af29..15082437 100644 --- a/tests/contract/conftest.py +++ b/tests/contract/conftest.py @@ -87,10 +87,11 @@ def sample_message_data(): "history": [] }, "AgentResponse": { - "answer": "Machine learning is a subset of AI.", + "chunk_type": "answer", + "content": "Machine learning is a subset of AI.", + "end_of_message": True, + "end_of_dialog": True, "error": None, - "thought": "I need to provide information about machine learning.", - "observation": None }, "Metadata": { "id": "test-doc-123", diff --git a/tests/contract/test_message_contracts.py b/tests/contract/test_message_contracts.py index 695fef14..bc5bece1 100644 --- a/tests/contract/test_message_contracts.py +++ b/tests/contract/test_message_contracts.py @@ -212,10 +212,11 @@ class TestAgentMessageContracts: # Test required fields response = AgentResponse(**response_data) - assert hasattr(response, 'answer') + assert hasattr(response, 'chunk_type') + assert hasattr(response, 'content') + assert hasattr(response, 'end_of_message') + assert hasattr(response, 'end_of_dialog') assert hasattr(response, 'error') - assert hasattr(response, 'thought') - assert hasattr(response, 'observation') def test_agent_step_schema_contract(self): """Test AgentStep schema contract""" diff --git a/tests/contract/test_translator_completion_flags.py b/tests/contract/test_translator_completion_flags.py index dc7d5748..a22e1c41 100644 --- a/tests/contract/test_translator_completion_flags.py +++ b/tests/contract/test_translator_completion_flags.py @@ -188,12 +188,10 @@ class TestAgentTranslatorCompletionFlags: # Arrange translator = TranslatorRegistry.get_response_translator("agent") response = AgentResponse( - answer="4", - error=None, - thought=None, - observation=None, + chunk_type="answer", + content="4", end_of_message=True, - end_of_dialog=True + end_of_dialog=True, ) # Act @@ -201,7 +199,7 @@ class TestAgentTranslatorCompletionFlags: # Assert assert is_final is True, "is_final must be True when end_of_dialog=True" - assert response_dict["answer"] == "4" + assert response_dict["content"] == "4" assert response_dict["end_of_dialog"] is True def test_agent_translator_is_final_with_end_of_dialog_false(self): @@ -212,12 +210,10 @@ class TestAgentTranslatorCompletionFlags: # Arrange translator = TranslatorRegistry.get_response_translator("agent") response = AgentResponse( - answer=None, - error=None, - thought="I need to solve this.", - observation=None, + chunk_type="thought", + content="I need to solve this.", end_of_message=True, - end_of_dialog=False + end_of_dialog=False, ) # Act @@ -225,31 +221,9 @@ class TestAgentTranslatorCompletionFlags: # Assert assert is_final is False, "is_final must be False when end_of_dialog=False" - assert response_dict["thought"] == "I need to solve this." + assert response_dict["content"] == "I need to solve this." assert response_dict["end_of_dialog"] is False - def test_agent_translator_is_final_fallback_with_answer(self): - """ - Test that AgentResponseTranslator returns is_final=True - when answer is present (fallback for legacy responses). - """ - # Arrange - translator = TranslatorRegistry.get_response_translator("agent") - # Legacy response without end_of_dialog flag - response = AgentResponse( - answer="4", - error=None, - thought=None, - observation=None - ) - - # Act - response_dict, is_final = translator.from_response_with_completion(response) - - # Assert - assert is_final is True, "is_final must be True when answer is present (legacy fallback)" - assert response_dict["answer"] == "4" - def test_agent_translator_intermediate_message_is_not_final(self): """ Test that intermediate messages (thought/observation) return is_final=False. @@ -259,12 +233,10 @@ class TestAgentTranslatorCompletionFlags: # Test thought message thought_response = AgentResponse( - answer=None, - error=None, - thought="Processing...", - observation=None, + chunk_type="thought", + content="Processing...", end_of_message=True, - end_of_dialog=False + end_of_dialog=False, ) # Act @@ -275,12 +247,10 @@ class TestAgentTranslatorCompletionFlags: # Test observation message observation_response = AgentResponse( - answer=None, - error=None, - thought=None, - observation="Result found", + chunk_type="observation", + content="Result found", end_of_message=True, - end_of_dialog=False + end_of_dialog=False, ) # Act @@ -302,10 +272,6 @@ class TestAgentTranslatorCompletionFlags: content="", end_of_message=True, end_of_dialog=True, - answer=None, - error=None, - thought=None, - observation=None ) # Act diff --git a/tests/unit/test_agent/test_agent_service_non_streaming.py b/tests/unit/test_agent/test_agent_service_non_streaming.py index 2ef64e96..ff630325 100644 --- a/tests/unit/test_agent/test_agent_service_non_streaming.py +++ b/tests/unit/test_agent/test_agent_service_non_streaming.py @@ -82,16 +82,16 @@ class TestAgentServiceNonStreaming: # Check thought message thought_response = sent_responses[0] assert isinstance(thought_response, AgentResponse) - assert thought_response.thought == "I need to solve this." - assert thought_response.answer is None + assert thought_response.chunk_type == "thought" + assert thought_response.content == "I need to solve this." assert thought_response.end_of_message is True, "Thought message must have end_of_message=True" assert thought_response.end_of_dialog is False, "Thought message must have end_of_dialog=False" # Check observation message observation_response = sent_responses[1] assert isinstance(observation_response, AgentResponse) - assert observation_response.observation == "The answer is 4." - assert observation_response.answer is None + assert observation_response.chunk_type == "observation" + assert observation_response.content == "The answer is 4." assert observation_response.end_of_message is True, "Observation message must have end_of_message=True" assert observation_response.end_of_dialog is False, "Observation message must have end_of_dialog=False" @@ -161,9 +161,8 @@ class TestAgentServiceNonStreaming: # Check final answer message answer_response = sent_responses[0] assert isinstance(answer_response, AgentResponse) - assert answer_response.answer == "4" - assert answer_response.thought is None - assert answer_response.observation is None + assert answer_response.chunk_type == "answer" + assert answer_response.content == "4" assert answer_response.end_of_message is True, "Final answer must have end_of_message=True" assert answer_response.end_of_dialog is True, "Final answer must have end_of_dialog=True" diff --git a/trustgraph-base/trustgraph/api/socket_client.py b/trustgraph-base/trustgraph/api/socket_client.py index 40769fa0..e5f63c79 100644 --- a/trustgraph-base/trustgraph/api/socket_client.py +++ b/trustgraph-base/trustgraph/api/socket_client.py @@ -402,23 +402,6 @@ class SocketClient: content=resp.get("content", ""), end_of_message=resp.get("end_of_message", False) ) - # Non-streaming agent format: chunk_type is empty but has thought/observation/answer fields - elif resp.get("thought"): - return AgentThought( - content=resp.get("thought", ""), - end_of_message=resp.get("end_of_message", False) - ) - elif resp.get("observation"): - return AgentObservation( - content=resp.get("observation", ""), - end_of_message=resp.get("end_of_message", False) - ) - elif resp.get("answer"): - return AgentAnswer( - content=resp.get("answer", ""), - end_of_message=resp.get("end_of_message", False), - end_of_dialog=resp.get("end_of_dialog", False) - ) else: content = resp.get("response", resp.get("chunk", resp.get("text", ""))) return RAGChunk( diff --git a/trustgraph-base/trustgraph/base/agent_client.py b/trustgraph-base/trustgraph/base/agent_client.py index f48fd024..d73d03b9 100644 --- a/trustgraph-base/trustgraph/base/agent_client.py +++ b/trustgraph-base/trustgraph/base/agent_client.py @@ -57,8 +57,7 @@ class AgentClient(RequestResponse): await self.request( AgentRequest( question = question, - plan = plan, - state = state, + state = state or "", history = history, ), recipient=recipient, diff --git a/trustgraph-base/trustgraph/base/agent_service.py b/trustgraph-base/trustgraph/base/agent_service.py index 0e5524fe..cbb15183 100644 --- a/trustgraph-base/trustgraph/base/agent_service.py +++ b/trustgraph-base/trustgraph/base/agent_service.py @@ -90,9 +90,6 @@ class AgentService(FlowProcessor): type = "agent-error", message = str(e), ), - thought = None, - observation = None, - answer = None, end_of_message = True, end_of_dialog = True, ), diff --git a/trustgraph-base/trustgraph/messaging/translators/agent.py b/trustgraph-base/trustgraph/messaging/translators/agent.py index 378bdb41..b245a83e 100644 --- a/trustgraph-base/trustgraph/messaging/translators/agent.py +++ b/trustgraph-base/trustgraph/messaging/translators/agent.py @@ -16,6 +16,14 @@ class AgentRequestTranslator(MessageTranslator): collection=data.get("collection", "default"), streaming=data.get("streaming", False), session_id=data.get("session_id", ""), + conversation_id=data.get("conversation_id", ""), + pattern=data.get("pattern", ""), + task_type=data.get("task_type", ""), + framing=data.get("framing", ""), + correlation_id=data.get("correlation_id", ""), + parent_session_id=data.get("parent_session_id", ""), + subagent_goal=data.get("subagent_goal", ""), + expected_siblings=data.get("expected_siblings", 0), ) def from_pulsar(self, obj: AgentRequest) -> Dict[str, Any]: @@ -28,6 +36,14 @@ class AgentRequestTranslator(MessageTranslator): "collection": getattr(obj, "collection", "default"), "streaming": getattr(obj, "streaming", False), "session_id": getattr(obj, "session_id", ""), + "conversation_id": getattr(obj, "conversation_id", ""), + "pattern": getattr(obj, "pattern", ""), + "task_type": getattr(obj, "task_type", ""), + "framing": getattr(obj, "framing", ""), + "correlation_id": getattr(obj, "correlation_id", ""), + "parent_session_id": getattr(obj, "parent_session_id", ""), + "subagent_goal": getattr(obj, "subagent_goal", ""), + "expected_siblings": getattr(obj, "expected_siblings", 0), } @@ -40,24 +56,15 @@ class AgentResponseTranslator(MessageTranslator): def from_pulsar(self, obj: AgentResponse) -> Dict[str, Any]: result = {} - # Check if this is a streaming response (has chunk_type) - if hasattr(obj, 'chunk_type') and obj.chunk_type: + if obj.chunk_type: result["chunk_type"] = obj.chunk_type - if obj.content: - result["content"] = obj.content - result["end_of_message"] = getattr(obj, "end_of_message", False) - result["end_of_dialog"] = getattr(obj, "end_of_dialog", False) - else: - # Legacy format (non-streaming) - if obj.answer: - result["answer"] = obj.answer - if obj.thought: - result["thought"] = obj.thought - if obj.observation: - result["observation"] = obj.observation - # Include completion flags for legacy format too - result["end_of_message"] = getattr(obj, "end_of_message", False) - result["end_of_dialog"] = getattr(obj, "end_of_dialog", False) + if obj.content: + result["content"] = obj.content + result["end_of_message"] = getattr(obj, "end_of_message", False) + result["end_of_dialog"] = getattr(obj, "end_of_dialog", False) + + if getattr(obj, "message_id", ""): + result["message_id"] = obj.message_id # Include explainability fields if present explain_id = getattr(obj, "explain_id", None) @@ -76,11 +83,5 @@ class AgentResponseTranslator(MessageTranslator): def from_response_with_completion(self, obj: AgentResponse) -> Tuple[Dict[str, Any], bool]: """Returns (response_dict, is_final)""" - # For streaming responses, check end_of_dialog - if hasattr(obj, 'chunk_type') and obj.chunk_type: - is_final = getattr(obj, 'end_of_dialog', False) - else: - # For legacy responses, check if answer is present - is_final = (obj.answer is not None) - + is_final = getattr(obj, 'end_of_dialog', False) return self.from_pulsar(obj), is_final \ No newline at end of file diff --git a/trustgraph-base/trustgraph/schema/services/agent.py b/trustgraph-base/trustgraph/schema/services/agent.py index 91179047..fdb9e391 100644 --- a/trustgraph-base/trustgraph/schema/services/agent.py +++ b/trustgraph-base/trustgraph/schema/services/agent.py @@ -1,5 +1,6 @@ from dataclasses import dataclass, field +from typing import Optional from ..core.topic import topic from ..core.primitives import Error @@ -8,6 +9,14 @@ from ..core.primitives import Error # Prompt services, abstract the prompt generation +@dataclass +class PlanStep: + goal: str = "" + tool_hint: str = "" # Suggested tool for this step + depends_on: list[int] = field(default_factory=list) # Indices of prerequisite steps + status: str = "pending" # pending, running, completed, failed + result: str = "" # Result of step execution + @dataclass class AgentStep: thought: str = "" @@ -15,6 +24,9 @@ class AgentStep: arguments: dict[str, str] = field(default_factory=dict) observation: str = "" user: str = "" # User context for the step + step_type: str = "" # "react", "plan", "execute", "decompose", "synthesise" + plan: list[PlanStep] = field(default_factory=list) # Plan steps (for plan-then-execute) + subagent_results: dict[str, str] = field(default_factory=dict) # Subagent results keyed by goal @dataclass class AgentRequest: @@ -27,6 +39,16 @@ class AgentRequest: streaming: bool = False # Enable streaming response delivery (default false) session_id: str = "" # For provenance tracking across iterations + # Orchestration fields + conversation_id: str = "" # Groups related requests into a conversation + pattern: str = "" # Selected pattern: "react", "plan-then-execute", "supervisor" + task_type: str = "" # Task type from config: "general", "research", etc. + framing: str = "" # Domain framing text injected into prompts + correlation_id: str = "" # Links fan-out subagents to parent for fan-in + parent_session_id: str = "" # Session ID of the supervisor that spawned this subagent + subagent_goal: str = "" # Specific goal for a subagent (set by supervisor) + expected_siblings: int = 0 # Number of sibling subagents in this fan-out + @dataclass class AgentResponse: # Streaming-first design @@ -39,11 +61,10 @@ class AgentResponse: explain_id: str | None = None # Provenance URI (announced as created) explain_graph: str | None = None # Named graph where explain was stored - # Legacy fields (deprecated but kept for backward compatibility) - answer: str = "" + # Orchestration fields + message_id: str = "" # Unique ID for this response message + error: Error | None = None - thought: str = "" - observation: str = "" ############################################################################ diff --git a/trustgraph-flow/pyproject.toml b/trustgraph-flow/pyproject.toml index 4aeb9199..66363305 100644 --- a/trustgraph-flow/pyproject.toml +++ b/trustgraph-flow/pyproject.toml @@ -56,6 +56,7 @@ Homepage = "https://github.com/trustgraph-ai/trustgraph" [project.scripts] agent-manager-react = "trustgraph.agent.react:run" +agent-orchestrator = "trustgraph.agent.orchestrator:run" api-gateway = "trustgraph.gateway:run" chunker-recursive = "trustgraph.chunking.recursive:run" chunker-token = "trustgraph.chunking.token:run" diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/__init__.py b/trustgraph-flow/trustgraph/agent/orchestrator/__init__.py new file mode 100644 index 00000000..214f7272 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/orchestrator/__init__.py @@ -0,0 +1,2 @@ + +from . service import * diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/__main__.py b/trustgraph-flow/trustgraph/agent/orchestrator/__main__.py new file mode 100644 index 00000000..da5a9021 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/orchestrator/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from . service import run + +if __name__ == '__main__': + run() diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/aggregator.py b/trustgraph-flow/trustgraph/agent/orchestrator/aggregator.py new file mode 100644 index 00000000..bff8822c --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/orchestrator/aggregator.py @@ -0,0 +1,157 @@ +""" +Aggregator — monitors the explainability topic for subagent completions +and triggers synthesis when all siblings in a fan-out have completed. + +The aggregator watches for tg:Conclusion triples that carry a +correlation_id. When it detects that all expected siblings have +completed, it emits a synthesis AgentRequest on the agent request topic. +""" + +import asyncio +import json +import logging +import time +import uuid + +from ... schema import AgentRequest, AgentStep + +logger = logging.getLogger(__name__) + +# How long to wait for stalled correlations before giving up (seconds) +DEFAULT_TIMEOUT = 300 + + +class Aggregator: + """ + Tracks in-flight fan-out correlations and triggers synthesis + when all subagents complete. + + State is held in-memory; if the process restarts, in-flight + correlations are lost (acceptable for v1). + """ + + def __init__(self, timeout=DEFAULT_TIMEOUT): + self.timeout = timeout + + # correlation_id -> { + # "parent_session_id": str, + # "expected": int, + # "results": {goal: answer}, + # "request_template": AgentRequest or None, + # "created_at": float, + # } + self.correlations = {} + + def register_fanout(self, correlation_id, parent_session_id, + expected_siblings, request_template=None): + """ + Register a new fan-out. Called by the supervisor after emitting + subagent requests. + """ + self.correlations[correlation_id] = { + "parent_session_id": parent_session_id, + "expected": expected_siblings, + "results": {}, + "request_template": request_template, + "created_at": time.time(), + } + logger.info( + f"Aggregator: registered fan-out {correlation_id}, " + f"expecting {expected_siblings} subagents" + ) + + def record_completion(self, correlation_id, subagent_goal, result): + """ + Record a subagent completion. + + Returns: + True if all siblings are now complete, False otherwise. + Returns None if the correlation_id is unknown. + """ + if correlation_id not in self.correlations: + logger.warning( + f"Aggregator: unknown correlation_id {correlation_id}" + ) + return None + + entry = self.correlations[correlation_id] + entry["results"][subagent_goal] = result + + completed = len(entry["results"]) + expected = entry["expected"] + + logger.info( + f"Aggregator: {correlation_id} — " + f"{completed}/{expected} subagents complete" + ) + + return completed >= expected + + def get_results(self, correlation_id): + """Get all results for a correlation and remove the tracking entry.""" + entry = self.correlations.pop(correlation_id, None) + if entry is None: + return None, None, None + return ( + entry["results"], + entry["parent_session_id"], + entry["request_template"], + ) + + def build_synthesis_request(self, correlation_id, original_question, + user, collection): + """ + Build the AgentRequest that triggers the synthesis phase. + """ + results, parent_session_id, template = self.get_results(correlation_id) + + if results is None: + raise RuntimeError( + f"No results for correlation_id {correlation_id}" + ) + + # Build history with decompose step + results + synthesis_step = AgentStep( + thought="All subagents completed", + action="aggregate", + arguments={}, + observation=json.dumps(results), + step_type="synthesise", + subagent_results=results, + ) + + history = [] + if template and template.history: + history = list(template.history) + history.append(synthesis_step) + + return AgentRequest( + question=original_question, + state="", + group=template.group if template else [], + history=history, + user=user, + collection=collection, + streaming=template.streaming if template else False, + session_id=parent_session_id, + conversation_id=template.conversation_id if template else "", + pattern="supervisor", + task_type=template.task_type if template else "", + framing=template.framing if template else "", + correlation_id=correlation_id, + parent_session_id="", + subagent_goal="", + expected_siblings=0, + ) + + def cleanup_stale(self): + """Remove correlations that have timed out.""" + now = time.time() + stale = [ + cid for cid, entry in self.correlations.items() + if now - entry["created_at"] > self.timeout + ] + for cid in stale: + logger.warning(f"Aggregator: timing out stale correlation {cid}") + self.correlations.pop(cid, None) + return stale diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/meta_router.py b/trustgraph-flow/trustgraph/agent/orchestrator/meta_router.py new file mode 100644 index 00000000..c3b1afa6 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/orchestrator/meta_router.py @@ -0,0 +1,168 @@ +""" +MetaRouter — selects the task type and execution pattern for a query. + +Uses the config API to look up available task types and patterns, then +asks the LLM to classify the query and select the best pattern. +Falls back to ("react", "general", "") if config is empty. +""" + +import json +import logging + +logger = logging.getLogger(__name__) + +DEFAULT_PATTERN = "react" +DEFAULT_TASK_TYPE = "general" +DEFAULT_FRAMING = "" + + +class MetaRouter: + + def __init__(self, config=None): + """ + Args: + config: The full config dict from the config service. + May contain "agent-pattern" and "agent-task-type" keys. + """ + self.patterns = {} + self.task_types = {} + + if config: + # Load from config API + if "agent-pattern" in config: + for pid, pval in config["agent-pattern"].items(): + try: + self.patterns[pid] = json.loads(pval) + except (json.JSONDecodeError, TypeError): + self.patterns[pid] = {"name": pid} + + if "agent-task-type" in config: + for tid, tval in config["agent-task-type"].items(): + try: + self.task_types[tid] = json.loads(tval) + except (json.JSONDecodeError, TypeError): + self.task_types[tid] = {"name": tid} + + # If config has no patterns/task-types, default to react/general + if not self.patterns: + self.patterns = { + "react": {"name": "react", "description": "Interleaved reasoning and action"}, + } + if not self.task_types: + self.task_types = { + "general": {"name": "general", "description": "General queries", "valid_patterns": ["react"], "framing": ""}, + } + + async def identify_task_type(self, question, context): + """ + Use the LLM to classify the question into one of the known task types. + + Args: + question: The user's query. + context: UserAwareContext (flow wrapper). + + Returns: + (task_type_id, framing) tuple. + """ + if len(self.task_types) <= 1: + tid = next(iter(self.task_types), DEFAULT_TASK_TYPE) + framing = self.task_types.get(tid, {}).get("framing", DEFAULT_FRAMING) + return tid, framing + + try: + client = context("prompt-request") + response = await client.prompt( + id="task-type-classify", + variables={ + "question": question, + "task_types": [ + {"name": tid, "description": tdata.get("description", tid)} + for tid, tdata in self.task_types.items() + ], + }, + ) + selected = response.strip().lower().replace('"', '').replace("'", "") + + if selected in self.task_types: + framing = self.task_types[selected].get("framing", DEFAULT_FRAMING) + logger.info(f"MetaRouter: identified task type '{selected}'") + return selected, framing + else: + logger.warning( + f"MetaRouter: LLM returned unknown task type '{selected}', " + f"falling back to '{DEFAULT_TASK_TYPE}'" + ) + except Exception as e: + logger.warning(f"MetaRouter: task type classification failed: {e}") + + framing = self.task_types.get(DEFAULT_TASK_TYPE, {}).get( + "framing", DEFAULT_FRAMING + ) + return DEFAULT_TASK_TYPE, framing + + async def select_pattern(self, question, task_type, context): + """ + Use the LLM to select the best execution pattern for this task type. + + Args: + question: The user's query. + task_type: The identified task type ID. + context: UserAwareContext (flow wrapper). + + Returns: + Pattern ID string. + """ + task_config = self.task_types.get(task_type, {}) + valid_patterns = task_config.get("valid_patterns", list(self.patterns.keys())) + + if len(valid_patterns) <= 1: + return valid_patterns[0] if valid_patterns else DEFAULT_PATTERN + + try: + client = context("prompt-request") + response = await client.prompt( + id="pattern-select", + variables={ + "question": question, + "task_type": task_type, + "task_type_description": task_config.get("description", task_type), + "patterns": [ + {"name": pid, "description": self.patterns.get(pid, {}).get("description", pid)} + for pid in valid_patterns + if pid in self.patterns + ], + }, + ) + selected = response.strip().lower().replace('"', '').replace("'", "") + + if selected in valid_patterns: + logger.info(f"MetaRouter: selected pattern '{selected}'") + return selected + else: + logger.warning( + f"MetaRouter: LLM returned invalid pattern '{selected}', " + f"falling back to '{valid_patterns[0]}'" + ) + return valid_patterns[0] + except Exception as e: + logger.warning(f"MetaRouter: pattern selection failed: {e}") + return valid_patterns[0] if valid_patterns else DEFAULT_PATTERN + + async def route(self, question, context): + """ + Full routing pipeline: identify task type, then select pattern. + + Args: + question: The user's query. + context: UserAwareContext (flow wrapper). + + Returns: + (pattern, task_type, framing) tuple. + """ + task_type, framing = await self.identify_task_type(question, context) + pattern = await self.select_pattern(question, task_type, context) + logger.info( + f"MetaRouter: route result — " + f"pattern={pattern}, task_type={task_type}, framing={framing!r}" + ) + return pattern, task_type, framing diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py new file mode 100644 index 00000000..fc07e745 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/orchestrator/pattern_base.py @@ -0,0 +1,428 @@ +""" +Base class for agent patterns. + +Provides shared infrastructure used by all patterns: tool filtering, +provenance emission, streaming callbacks, history management, and +librarian integration. +""" + +import json +import logging +import uuid +from datetime import datetime + +from ... schema import AgentRequest, AgentResponse, AgentStep, Error +from ... schema import Triples, Metadata + +from trustgraph.provenance import ( + agent_session_uri, + agent_iteration_uri, + agent_thought_uri, + agent_observation_uri, + agent_final_uri, + agent_session_triples, + agent_iteration_triples, + agent_final_triples, + set_graph, + GRAPH_RETRIEVAL, +) + +from ..react.types import Action, Final +from ..tool_filter import filter_tools_by_group_and_state, get_next_state + +logger = logging.getLogger(__name__) + + +class UserAwareContext: + """Wraps flow interface to inject user context for tools that need it.""" + + def __init__(self, flow, user): + self._flow = flow + self._user = user + + def __call__(self, service_name): + client = self._flow(service_name) + if service_name in ( + "structured-query-request", + "row-embeddings-query-request", + ): + client._current_user = self._user + return client + + +class PatternBase: + """ + Shared infrastructure for all agent patterns. + + Subclasses implement iterate() to perform one iteration of their + pattern-specific logic. + """ + + def __init__(self, processor): + self.processor = processor + + def filter_tools(self, tools, request): + """Apply group/state filtering to the tool set.""" + return filter_tools_by_group_and_state( + tools=tools, + requested_groups=getattr(request, 'group', None), + current_state=getattr(request, 'state', None), + ) + + def make_context(self, flow, user): + """Create a user-aware context wrapper.""" + return UserAwareContext(flow, user) + + def build_history(self, request): + """Convert AgentStep history into Action objects.""" + if not request.history: + return [] + return [ + Action( + thought=h.thought, + name=h.action, + arguments=h.arguments, + observation=h.observation, + ) + for h in request.history + ] + + # ---- Streaming callbacks ------------------------------------------------ + + def make_think_callback(self, respond, streaming): + """Create the think callback for streaming/non-streaming.""" + async def think(x, is_final=False): + logger.debug(f"Think: {x} (is_final={is_final})") + if streaming: + r = AgentResponse( + chunk_type="thought", + content=x, + end_of_message=is_final, + end_of_dialog=False, + ) + else: + r = AgentResponse( + chunk_type="thought", + content=x, + end_of_message=True, + end_of_dialog=False, + ) + await respond(r) + return think + + def make_observe_callback(self, respond, streaming): + """Create the observe callback for streaming/non-streaming.""" + async def observe(x, is_final=False): + logger.debug(f"Observe: {x} (is_final={is_final})") + if streaming: + r = AgentResponse( + chunk_type="observation", + content=x, + end_of_message=is_final, + end_of_dialog=False, + ) + else: + r = AgentResponse( + chunk_type="observation", + content=x, + end_of_message=True, + end_of_dialog=False, + ) + await respond(r) + return observe + + def make_answer_callback(self, respond, streaming): + """Create the answer callback for streaming/non-streaming.""" + async def answer(x): + logger.debug(f"Answer: {x}") + if streaming: + r = AgentResponse( + chunk_type="answer", + content=x, + end_of_message=False, + end_of_dialog=False, + ) + else: + r = AgentResponse( + chunk_type="answer", + content=x, + end_of_message=True, + end_of_dialog=False, + ) + await respond(r) + return answer + + # ---- Provenance emission ------------------------------------------------ + + async def emit_session_triples(self, flow, session_uri, question, user, + collection, respond, streaming): + """Emit provenance triples for a new session.""" + timestamp = datetime.utcnow().isoformat() + "Z" + triples = set_graph( + agent_session_triples(session_uri, question, timestamp), + GRAPH_RETRIEVAL, + ) + await flow("explainability").send(Triples( + metadata=Metadata( + id=session_uri, + user=user, + collection=collection, + ), + triples=triples, + )) + logger.debug(f"Emitted session triples for {session_uri}") + + if streaming: + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=session_uri, + explain_graph=GRAPH_RETRIEVAL, + )) + + async def emit_iteration_triples(self, flow, session_id, iteration_num, + session_uri, act, request, respond, + streaming): + """Emit provenance triples for an iteration and save to librarian.""" + iteration_uri = agent_iteration_uri(session_id, iteration_num) + + if iteration_num > 1: + iter_question_uri = None + iter_previous_uri = agent_iteration_uri(session_id, iteration_num - 1) + else: + iter_question_uri = session_uri + iter_previous_uri = None + + # Save thought to librarian + thought_doc_id = None + if act.thought: + thought_doc_id = ( + f"urn:trustgraph:agent:{session_id}/i{iteration_num}/thought" + ) + try: + await self.processor.save_answer_content( + doc_id=thought_doc_id, + user=request.user, + content=act.thought, + title=f"Agent Thought: {act.name}", + ) + except Exception as e: + logger.warning(f"Failed to save thought to librarian: {e}") + thought_doc_id = None + + # Save observation to librarian + observation_doc_id = None + if act.observation: + observation_doc_id = ( + f"urn:trustgraph:agent:{session_id}/i{iteration_num}/observation" + ) + try: + await self.processor.save_answer_content( + doc_id=observation_doc_id, + user=request.user, + content=act.observation, + title=f"Agent Observation: {act.name}", + ) + except Exception as e: + logger.warning(f"Failed to save observation to librarian: {e}") + observation_doc_id = None + + thought_entity_uri = agent_thought_uri(session_id, iteration_num) + observation_entity_uri = agent_observation_uri(session_id, iteration_num) + + iter_triples = set_graph( + agent_iteration_triples( + iteration_uri, + question_uri=iter_question_uri, + previous_uri=iter_previous_uri, + action=act.name, + arguments=act.arguments, + thought_uri=thought_entity_uri if thought_doc_id else None, + thought_document_id=thought_doc_id, + observation_uri=observation_entity_uri if observation_doc_id else None, + observation_document_id=observation_doc_id, + ), + GRAPH_RETRIEVAL, + ) + await flow("explainability").send(Triples( + metadata=Metadata( + id=iteration_uri, + user=request.user, + collection=getattr(request, 'collection', 'default'), + ), + triples=iter_triples, + )) + logger.debug(f"Emitted iteration triples for {iteration_uri}") + + if streaming: + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=iteration_uri, + explain_graph=GRAPH_RETRIEVAL, + )) + + async def emit_final_triples(self, flow, session_id, iteration_num, + session_uri, answer_text, request, respond, + streaming): + """Emit provenance triples for the final answer and save to librarian.""" + final_uri = agent_final_uri(session_id) + + if iteration_num > 1: + final_question_uri = None + final_previous_uri = agent_iteration_uri(session_id, iteration_num - 1) + else: + final_question_uri = session_uri + final_previous_uri = None + + # Save answer to librarian + answer_doc_id = None + if answer_text: + answer_doc_id = f"urn:trustgraph:agent:{session_id}/answer" + try: + await self.processor.save_answer_content( + doc_id=answer_doc_id, + user=request.user, + content=answer_text, + title=f"Agent Answer: {request.question[:50]}...", + ) + logger.debug(f"Saved answer to librarian: {answer_doc_id}") + except Exception as e: + logger.warning(f"Failed to save answer to librarian: {e}") + answer_doc_id = None + + final_triples = set_graph( + agent_final_triples( + final_uri, + question_uri=final_question_uri, + previous_uri=final_previous_uri, + document_id=answer_doc_id, + ), + GRAPH_RETRIEVAL, + ) + await flow("explainability").send(Triples( + metadata=Metadata( + id=final_uri, + user=request.user, + collection=getattr(request, 'collection', 'default'), + ), + triples=final_triples, + )) + logger.debug(f"Emitted final triples for {final_uri}") + + if streaming: + await respond(AgentResponse( + chunk_type="explain", + content="", + explain_id=final_uri, + explain_graph=GRAPH_RETRIEVAL, + )) + + # ---- Response helpers --------------------------------------------------- + + async def prompt_as_answer(self, client, prompt_id, variables, + respond, streaming): + """Call a prompt template, forwarding chunks as answer + AgentResponse messages when streaming is enabled. + + Returns the full accumulated answer text (needed for provenance). + """ + if streaming: + accumulated = [] + + async def on_chunk(text, end_of_stream): + if text: + accumulated.append(text) + await respond(AgentResponse( + chunk_type="answer", + content=text, + end_of_message=False, + end_of_dialog=False, + )) + + await client.prompt( + id=prompt_id, + variables=variables, + streaming=True, + chunk_callback=on_chunk, + ) + + return "".join(accumulated) + else: + return await client.prompt( + id=prompt_id, + variables=variables, + ) + + async def send_final_response(self, respond, streaming, answer_text, + already_streamed=False): + """Send the answer content and end-of-dialog marker. + + Args: + already_streamed: If True, answer chunks were already sent + via streaming callbacks (e.g. ReactPattern). Only the + end-of-dialog marker is emitted. + """ + if streaming and not already_streamed: + # Answer wasn't streamed yet — send it as a chunk first + if answer_text: + await respond(AgentResponse( + chunk_type="answer", + content=answer_text, + end_of_message=False, + end_of_dialog=False, + )) + if streaming: + # End-of-dialog marker + await respond(AgentResponse( + chunk_type="answer", + content="", + end_of_message=True, + end_of_dialog=True, + )) + else: + await respond(AgentResponse( + chunk_type="answer", + content=answer_text, + end_of_message=True, + end_of_dialog=True, + )) + + def build_next_request(self, request, history, session_id, collection, + streaming, next_state): + """Build the AgentRequest for the next iteration.""" + return AgentRequest( + question=request.question, + state=next_state, + group=getattr(request, 'group', []), + history=[ + AgentStep( + thought=h.thought, + action=h.name, + arguments={k: str(v) for k, v in h.arguments.items()}, + observation=h.observation, + ) + for h in history + ], + user=request.user, + collection=collection, + streaming=streaming, + session_id=session_id, + # Preserve orchestration fields + conversation_id=getattr(request, 'conversation_id', ''), + pattern=getattr(request, 'pattern', ''), + task_type=getattr(request, 'task_type', ''), + framing=getattr(request, 'framing', ''), + correlation_id=getattr(request, 'correlation_id', ''), + parent_session_id=getattr(request, 'parent_session_id', ''), + subagent_goal=getattr(request, 'subagent_goal', ''), + expected_siblings=getattr(request, 'expected_siblings', 0), + ) + + async def iterate(self, request, respond, next, flow): + """ + Perform one iteration of this pattern. + + Must be implemented by subclasses. + """ + raise NotImplementedError diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py new file mode 100644 index 00000000..d5f667c8 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/orchestrator/plan_pattern.py @@ -0,0 +1,349 @@ +""" +PlanThenExecutePattern — structured planning followed by step execution. + +Phase 1 (planning): LLM produces a structured plan of steps. +Phase 2 (execution): Each step is executed via single-shot tool call. +""" + +import json +import logging +import uuid + +from ... schema import AgentRequest, AgentResponse, AgentStep, PlanStep + +from ..react.types import Action + +from . pattern_base import PatternBase + +logger = logging.getLogger(__name__) + + +class PlanThenExecutePattern(PatternBase): + """ + Plan-then-Execute pattern. + + History tracks the current phase via AgentStep.step_type: + - "plan" step: contains the plan in step.plan + - "execute" step: a normal react iteration executing a plan step + + On the first call (empty history), a planning iteration is run. + Subsequent calls execute the next pending plan step via ReACT. + """ + + async def iterate(self, request, respond, next, flow): + + streaming = getattr(request, 'streaming', False) + session_id = getattr(request, 'session_id', '') or str(uuid.uuid4()) + collection = getattr(request, 'collection', 'default') + + history = self.build_history(request) + iteration_num = len(request.history) + 1 + session_uri = self.processor.provenance_session_uri(session_id) + + # Emit session provenance on first iteration + if iteration_num == 1: + await self.emit_session_triples( + flow, session_uri, request.question, + request.user, collection, respond, streaming, + ) + + logger.info( + f"PlanThenExecutePattern iteration {iteration_num}: " + f"{request.question}" + ) + + if iteration_num >= self.processor.max_iterations: + raise RuntimeError("Too many agent iterations") + + # Determine current phase by checking history for a plan step + plan = self._extract_plan(request.history) + + if plan is None: + await self._planning_iteration( + request, respond, next, flow, + session_id, collection, streaming, session_uri, + iteration_num, + ) + else: + await self._execution_iteration( + request, respond, next, flow, + session_id, collection, streaming, session_uri, + iteration_num, plan, + ) + + def _extract_plan(self, history): + """Find the most recent plan from history. + + Checks execute steps first (they carry the updated plan with + completion statuses), then falls back to the original plan step. + """ + if not history: + return None + for step in reversed(history): + if step.plan: + return list(step.plan) + return None + + def _find_next_pending_step(self, plan): + """Return index of the next pending step, or None if all done.""" + for i, step in enumerate(plan): + if getattr(step, 'status', 'pending') == 'pending': + return i + return None + + async def _planning_iteration(self, request, respond, next, flow, + session_id, collection, streaming, + session_uri, iteration_num): + """Ask the LLM to produce a structured plan.""" + + think = self.make_think_callback(respond, streaming) + + tools = self.filter_tools(self.processor.agent.tools, request) + framing = getattr(request, 'framing', '') + + context = self.make_context(flow, request.user) + client = context("prompt-request") + + # Use the plan-create prompt template + plan_steps = await client.prompt( + id="plan-create", + variables={ + "question": request.question, + "framing": framing, + "tools": [ + {"name": t.name, "description": t.description} + for t in tools.values() + ], + }, + ) + + # Validate we got a list + if not isinstance(plan_steps, list) or not plan_steps: + logger.warning("plan-create returned invalid result, falling back to single step") + plan_steps = [{"goal": "Answer the question directly", "tool_hint": "", "depends_on": []}] + + # Emit thought about the plan + thought_text = f"Created plan with {len(plan_steps)} steps" + await think(thought_text, is_final=True) + + # Build PlanStep objects + plan_agent_steps = [ + PlanStep( + goal=ps.get("goal", ""), + tool_hint=ps.get("tool_hint", ""), + depends_on=ps.get("depends_on", []), + status="pending", + result="", + ) + for ps in plan_steps + ] + + # Create a plan step in history + plan_history_step = AgentStep( + thought=thought_text, + action="plan", + arguments={}, + observation=json.dumps(plan_steps), + step_type="plan", + plan=plan_agent_steps, + ) + + # Build next request with plan in history + new_history = list(request.history) + [plan_history_step] + r = AgentRequest( + question=request.question, + state=request.state, + group=getattr(request, 'group', []), + history=new_history, + user=request.user, + collection=collection, + streaming=streaming, + session_id=session_id, + conversation_id=getattr(request, 'conversation_id', ''), + pattern=getattr(request, 'pattern', ''), + task_type=getattr(request, 'task_type', ''), + framing=getattr(request, 'framing', ''), + correlation_id=getattr(request, 'correlation_id', ''), + parent_session_id=getattr(request, 'parent_session_id', ''), + subagent_goal=getattr(request, 'subagent_goal', ''), + expected_siblings=getattr(request, 'expected_siblings', 0), + ) + await next(r) + + async def _execution_iteration(self, request, respond, next, flow, + session_id, collection, streaming, + session_uri, iteration_num, plan): + """Execute the next pending plan step via single-shot tool call.""" + + pending_idx = self._find_next_pending_step(plan) + + if pending_idx is None: + # All steps done — synthesise final answer + await self._synthesise( + request, respond, next, flow, + session_id, collection, streaming, + session_uri, iteration_num, plan, + ) + return + + current_step = plan[pending_idx] + goal = getattr(current_step, 'goal', '') or str(current_step) + + logger.info(f"Executing plan step {pending_idx}: {goal}") + + think = self.make_think_callback(respond, streaming) + observe = self.make_observe_callback(respond, streaming) + + # Gather results from dependencies + previous_results = [] + depends_on = getattr(current_step, 'depends_on', []) + if depends_on: + for dep_idx in depends_on: + if 0 <= dep_idx < len(plan): + dep_step = plan[dep_idx] + dep_result = getattr(dep_step, 'result', '') + if dep_result: + previous_results.append({ + "index": dep_idx, + "result": dep_result, + }) + + tools = self.filter_tools(self.processor.agent.tools, request) + context = self.make_context(flow, request.user) + client = context("prompt-request") + + # Single-shot: ask LLM which tool + arguments to use for this goal + tool_call = await client.prompt( + id="plan-step-execute", + variables={ + "goal": goal, + "previous_results": previous_results, + "tools": [ + { + "name": t.name, + "description": t.description, + "arguments": [ + {"name": a.name, "type": a.type, "description": a.description} + for a in t.arguments + ], + } + for t in tools.values() + ], + }, + ) + + tool_name = tool_call.get("tool", "") + tool_arguments = tool_call.get("arguments", {}) + + await think( + f"Step {pending_idx}: {goal} → calling {tool_name}", + is_final=True, + ) + + # Invoke the tool directly + if tool_name in tools: + tool = tools[tool_name] + resp = await tool.implementation(context).invoke(**tool_arguments) + step_result = resp.strip() if isinstance(resp, str) else str(resp).strip() + else: + logger.warning( + f"Plan step {pending_idx}: LLM selected unknown tool " + f"'{tool_name}', available: {list(tools.keys())}" + ) + step_result = f"Error: tool '{tool_name}' not found" + + await observe(step_result, is_final=True) + + # Update plan step status + plan[pending_idx] = PlanStep( + goal=goal, + tool_hint=getattr(current_step, 'tool_hint', ''), + depends_on=getattr(current_step, 'depends_on', []), + status="completed", + result=step_result, + ) + + # Emit iteration provenance + prov_act = Action( + thought=f"Plan step {pending_idx}: {goal}", + name=tool_name, + arguments=tool_arguments, + observation=step_result, + ) + await self.emit_iteration_triples( + flow, session_id, iteration_num, session_uri, + prov_act, request, respond, streaming, + ) + + # Build execution step for history + exec_step = AgentStep( + thought=f"Executing plan step {pending_idx}: {goal}", + action=tool_name, + arguments={k: str(v) for k, v in tool_arguments.items()}, + observation=step_result, + step_type="execute", + plan=plan, + ) + + new_history = list(request.history) + [exec_step] + + r = AgentRequest( + question=request.question, + state=request.state, + group=getattr(request, 'group', []), + history=new_history, + user=request.user, + collection=collection, + streaming=streaming, + session_id=session_id, + conversation_id=getattr(request, 'conversation_id', ''), + pattern=getattr(request, 'pattern', ''), + task_type=getattr(request, 'task_type', ''), + framing=getattr(request, 'framing', ''), + correlation_id=getattr(request, 'correlation_id', ''), + parent_session_id=getattr(request, 'parent_session_id', ''), + subagent_goal=getattr(request, 'subagent_goal', ''), + expected_siblings=getattr(request, 'expected_siblings', 0), + ) + await next(r) + + async def _synthesise(self, request, respond, next, flow, + session_id, collection, streaming, + session_uri, iteration_num, plan): + """Synthesise a final answer from all completed plan step results.""" + + think = self.make_think_callback(respond, streaming) + framing = getattr(request, 'framing', '') + + context = self.make_context(flow, request.user) + client = context("prompt-request") + + # Use the plan-synthesise prompt template + steps_data = [] + for i, step in enumerate(plan): + steps_data.append({ + "index": i, + "goal": getattr(step, 'goal', f'Step {i}'), + "result": getattr(step, 'result', ''), + }) + + await think("Synthesising final answer from plan results", is_final=True) + + response_text = await self.prompt_as_answer( + client, "plan-synthesise", + variables={ + "question": request.question, + "framing": framing, + "steps": steps_data, + }, + respond=respond, + streaming=streaming, + ) + + await self.emit_final_triples( + flow, session_id, iteration_num, session_uri, + response_text, request, respond, streaming, + ) + await self.send_final_response( + respond, streaming, response_text, already_streamed=streaming, + ) diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py new file mode 100644 index 00000000..c0e481f7 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/orchestrator/react_pattern.py @@ -0,0 +1,134 @@ +""" +ReactPattern — extracted from the existing agent_manager.py. + +Implements the ReACT (Reasoning + Acting) loop: think, select a tool, +observe the result, repeat until a final answer is produced. +""" + +import json +import logging +import uuid + +from ... schema import AgentRequest, AgentResponse, AgentStep + +from ..react.agent_manager import AgentManager +from ..react.types import Action, Final +from ..tool_filter import get_next_state + +from . pattern_base import PatternBase + +logger = logging.getLogger(__name__) + + +class ReactPattern(PatternBase): + """ + ReACT pattern: interleaved reasoning and action. + + Each iterate() call performs one reason/act cycle. If the LLM + produces a Final answer the dialog completes; otherwise the action + result is appended to history and a next-request is emitted. + """ + + async def iterate(self, request, respond, next, flow): + + streaming = getattr(request, 'streaming', False) + session_id = getattr(request, 'session_id', '') or str(uuid.uuid4()) + collection = getattr(request, 'collection', 'default') + + history = self.build_history(request) + iteration_num = len(history) + 1 + session_uri = self.processor.provenance_session_uri(session_id) + + # Emit session provenance on first iteration + if iteration_num == 1: + await self.emit_session_triples( + flow, session_uri, request.question, + request.user, collection, respond, streaming, + ) + + logger.info(f"ReactPattern iteration {iteration_num}: {request.question}") + + if len(history) >= self.processor.max_iterations: + raise RuntimeError("Too many agent iterations") + + # Build callbacks + think = self.make_think_callback(respond, streaming) + observe = self.make_observe_callback(respond, streaming) + answer_cb = self.make_answer_callback(respond, streaming) + + # Filter tools + filtered_tools = self.filter_tools( + self.processor.agent.tools, request, + ) + logger.info( + f"Filtered from {len(self.processor.agent.tools)} " + f"to {len(filtered_tools)} available tools" + ) + + # Create temporary agent with filtered tools and optional framing + additional_context = self.processor.agent.additional_context + framing = getattr(request, 'framing', '') + if framing: + if additional_context: + additional_context = f"{additional_context}\n\n{framing}" + else: + additional_context = framing + + temp_agent = AgentManager( + tools=filtered_tools, + additional_context=additional_context, + ) + + context = self.make_context(flow, request.user) + + act = await temp_agent.react( + question=request.question, + history=history, + think=think, + observe=observe, + answer=answer_cb, + context=context, + streaming=streaming, + ) + + logger.debug(f"Action: {act}") + + if isinstance(act, Final): + + if isinstance(act.final, str): + f = act.final + else: + f = json.dumps(act.final) + + # Emit final provenance + await self.emit_final_triples( + flow, session_id, iteration_num, session_uri, + f, request, respond, streaming, + ) + + await self.send_final_response( + respond, streaming, f, already_streamed=streaming, + ) + return + + # Not final — emit iteration provenance and send next request + await self.emit_iteration_triples( + flow, session_id, iteration_num, session_uri, + act, request, respond, streaming, + ) + + history.append(act) + + # Handle state transitions + next_state = request.state + if act.name in filtered_tools: + executed_tool = filtered_tools[act.name] + next_state = get_next_state(executed_tool, request.state or "undefined") + + r = self.build_next_request( + request, history, session_id, collection, + streaming, next_state, + ) + await next(r) + + logger.debug("ReactPattern iteration complete") diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/service.py b/trustgraph-flow/trustgraph/agent/orchestrator/service.py new file mode 100644 index 00000000..f7418e60 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/orchestrator/service.py @@ -0,0 +1,511 @@ +""" +Agent orchestrator service — multi-pattern drop-in replacement for +agent-manager-react. + +Uses the same service identity and Pulsar queues. Adds meta-routing +to select between ReactPattern, PlanThenExecutePattern, and +SupervisorPattern at runtime. +""" + +import asyncio +import base64 +import json +import functools +import logging +import uuid +from datetime import datetime + +from ... base import AgentService, TextCompletionClientSpec, PromptClientSpec +from ... base import GraphRagClientSpec, ToolClientSpec, StructuredQueryClientSpec +from ... base import RowEmbeddingsQueryClientSpec, EmbeddingsClientSpec +from ... base import ProducerSpec +from ... base import Consumer, Producer +from ... base import ConsumerMetrics, ProducerMetrics + +from ... schema import AgentRequest, AgentResponse, AgentStep, Error +from ... schema import Triples, Metadata +from ... schema import LibrarianRequest, LibrarianResponse, DocumentMetadata +from ... schema import librarian_request_queue, librarian_response_queue + +from trustgraph.provenance import ( + agent_session_uri, + GRAPH_RETRIEVAL, +) + +from ..react.tools import ( + KnowledgeQueryImpl, TextCompletionImpl, McpToolImpl, PromptImpl, + StructuredQueryImpl, RowEmbeddingsQueryImpl, ToolServiceImpl, +) +from ..react.agent_manager import AgentManager +from ..tool_filter import validate_tool_config +from ..react.types import Final, Action, Tool, Argument + +from . meta_router import MetaRouter +from . pattern_base import PatternBase, UserAwareContext +from . react_pattern import ReactPattern +from . plan_pattern import PlanThenExecutePattern +from . supervisor_pattern import SupervisorPattern +from . aggregator import Aggregator + +logger = logging.getLogger(__name__) + +default_ident = "agent-manager" +default_max_iterations = 10 +default_librarian_request_queue = librarian_request_queue +default_librarian_response_queue = librarian_response_queue + + +class Processor(AgentService): + + def __init__(self, **params): + + id = params.get("id") + + self.max_iterations = int( + params.get("max_iterations", default_max_iterations) + ) + + self.config_key = params.get("config_type", "agent") + + super(Processor, self).__init__( + **params | { + "id": id, + "max_iterations": self.max_iterations, + "config_type": self.config_key, + } + ) + + self.agent = AgentManager( + tools={}, + additional_context="", + ) + + self.tool_service_clients = {} + + # Patterns + self.react_pattern = ReactPattern(self) + self.plan_pattern = PlanThenExecutePattern(self) + self.supervisor_pattern = SupervisorPattern(self) + + # Aggregator for supervisor fan-in + self.aggregator = Aggregator() + + # Meta-router (initialised on first config load) + self.meta_router = None + + self.config_handlers.append(self.on_tools_config) + + self.register_specification( + TextCompletionClientSpec( + request_name="text-completion-request", + response_name="text-completion-response", + ) + ) + + self.register_specification( + GraphRagClientSpec( + request_name="graph-rag-request", + response_name="graph-rag-response", + ) + ) + + self.register_specification( + PromptClientSpec( + request_name="prompt-request", + response_name="prompt-response", + ) + ) + + self.register_specification( + ToolClientSpec( + request_name="mcp-tool-request", + response_name="mcp-tool-response", + ) + ) + + self.register_specification( + StructuredQueryClientSpec( + request_name="structured-query-request", + response_name="structured-query-response", + ) + ) + + self.register_specification( + EmbeddingsClientSpec( + request_name="embeddings-request", + response_name="embeddings-response", + ) + ) + + self.register_specification( + RowEmbeddingsQueryClientSpec( + request_name="row-embeddings-query-request", + response_name="row-embeddings-query-response", + ) + ) + + # Explainability producer + self.register_specification( + ProducerSpec( + name="explainability", + schema=Triples, + ) + ) + + # Librarian client + librarian_request_q = params.get( + "librarian_request_queue", default_librarian_request_queue + ) + librarian_response_q = params.get( + "librarian_response_queue", default_librarian_response_queue + ) + + librarian_request_metrics = ProducerMetrics( + processor=id, flow=None, name="librarian-request" + ) + + self.librarian_request_producer = Producer( + backend=self.pubsub, + topic=librarian_request_q, + schema=LibrarianRequest, + metrics=librarian_request_metrics, + ) + + librarian_response_metrics = ConsumerMetrics( + processor=id, flow=None, name="librarian-response" + ) + + self.librarian_response_consumer = Consumer( + taskgroup=self.taskgroup, + backend=self.pubsub, + flow=None, + topic=librarian_response_q, + subscriber=f"{id}-librarian", + schema=LibrarianResponse, + handler=self.on_librarian_response, + metrics=librarian_response_metrics, + ) + + self.pending_librarian_requests = {} + + async def start(self): + await super(Processor, self).start() + await self.librarian_request_producer.start() + await self.librarian_response_consumer.start() + + async def on_librarian_response(self, msg, consumer, flow): + response = msg.value() + request_id = msg.properties().get("id") + + if request_id in self.pending_librarian_requests: + future = self.pending_librarian_requests.pop(request_id) + future.set_result(response) + + async def save_answer_content(self, doc_id, user, content, title=None, + timeout=120): + request_id = str(uuid.uuid4()) + + doc_metadata = DocumentMetadata( + id=doc_id, + user=user, + kind="text/plain", + title=title or "Agent Answer", + document_type="answer", + ) + + request = LibrarianRequest( + operation="add-document", + document_id=doc_id, + document_metadata=doc_metadata, + content=base64.b64encode(content.encode("utf-8")).decode("utf-8"), + user=user, + ) + + future = asyncio.get_event_loop().create_future() + self.pending_librarian_requests[request_id] = future + + try: + await self.librarian_request_producer.send( + request, properties={"id": request_id} + ) + response = await asyncio.wait_for(future, timeout=timeout) + + if response.error: + raise RuntimeError( + f"Librarian error saving answer: " + f"{response.error.type}: {response.error.message}" + ) + return doc_id + + except asyncio.TimeoutError: + self.pending_librarian_requests.pop(request_id, None) + raise RuntimeError(f"Timeout saving answer document {doc_id}") + + def provenance_session_uri(self, session_id): + return agent_session_uri(session_id) + + async def on_tools_config(self, config, version): + + logger.info(f"Loading configuration version {version}") + + try: + tools = {} + + # Load tool-service configurations + tool_services = {} + if "tool-service" in config: + for service_id, service_value in config["tool-service"].items(): + service_data = json.loads(service_value) + tool_services[service_id] = service_data + logger.debug(f"Loaded tool-service config: {service_id}") + + logger.info( + f"Loaded {len(tool_services)} tool-service configurations" + ) + + # Load tool configurations + if "tool" in config: + for tool_id, tool_value in config["tool"].items(): + data = json.loads(tool_value) + impl_id = data.get("type") + name = data.get("name") + + if impl_id == "knowledge-query": + impl = functools.partial( + KnowledgeQueryImpl, + collection=data.get("collection"), + ) + arguments = KnowledgeQueryImpl.get_arguments() + elif impl_id == "text-completion": + impl = TextCompletionImpl + arguments = TextCompletionImpl.get_arguments() + elif impl_id == "mcp-tool": + config_args = data.get("arguments", []) + arguments = [ + Argument( + name=arg.get("name"), + type=arg.get("type"), + description=arg.get("description"), + ) + for arg in config_args + ] + impl = functools.partial( + McpToolImpl, + mcp_tool_id=data.get("mcp-tool"), + arguments=arguments, + ) + elif impl_id == "prompt": + config_args = data.get("arguments", []) + arguments = [ + Argument( + name=arg.get("name"), + type=arg.get("type"), + description=arg.get("description"), + ) + for arg in config_args + ] + impl = functools.partial( + PromptImpl, + template_id=data.get("template"), + arguments=arguments, + ) + elif impl_id == "structured-query": + impl = functools.partial( + StructuredQueryImpl, + collection=data.get("collection"), + user=None, + ) + arguments = StructuredQueryImpl.get_arguments() + elif impl_id == "row-embeddings-query": + impl = functools.partial( + RowEmbeddingsQueryImpl, + schema_name=data.get("schema-name"), + collection=data.get("collection"), + user=None, + index_name=data.get("index-name"), + limit=int(data.get("limit", 10)), + ) + arguments = RowEmbeddingsQueryImpl.get_arguments() + elif impl_id == "tool-service": + service_ref = data.get("service") + if not service_ref: + raise RuntimeError( + f"Tool {name} has type 'tool-service' " + f"but no 'service' reference" + ) + if service_ref not in tool_services: + raise RuntimeError( + f"Tool {name} references unknown " + f"tool-service '{service_ref}'" + ) + + service_config = tool_services[service_ref] + request_queue = service_config.get("request-queue") + response_queue = service_config.get("response-queue") + if not request_queue or not response_queue: + raise RuntimeError( + f"Tool-service '{service_ref}' must define " + f"'request-queue' and 'response-queue'" + ) + + config_params = service_config.get("config-params", []) + config_values = {} + for param in config_params: + param_name = ( + param.get("name") + if isinstance(param, dict) else param + ) + if param_name in data: + config_values[param_name] = data[param_name] + elif ( + isinstance(param, dict) + and param.get("required", False) + ): + raise RuntimeError( + f"Tool {name} missing required config " + f"param '{param_name}'" + ) + + config_args = data.get("arguments", []) + arguments = [ + Argument( + name=arg.get("name"), + type=arg.get("type"), + description=arg.get("description"), + ) + for arg in config_args + ] + + impl = functools.partial( + ToolServiceImpl, + request_queue=request_queue, + response_queue=response_queue, + config_values=config_values, + arguments=arguments, + processor=self, + ) + else: + raise RuntimeError( + f"Tool type {impl_id} not known" + ) + + validate_tool_config(data) + + tools[name] = Tool( + name=name, + description=data.get("description"), + implementation=impl, + config=data, + arguments=arguments, + ) + + # Load additional context from agent config + additional = None + if self.config_key in config: + agent_config = config[self.config_key] + additional = agent_config.get("additional-context", None) + + self.agent = AgentManager( + tools=tools, + additional_context=additional, + ) + + # Re-initialise meta-router with config + self.meta_router = MetaRouter(config=config) + + logger.info(f"Loaded {len(tools)} tools") + logger.info("Tool configuration reloaded.") + + except Exception as e: + logger.error( + f"on_tools_config Exception: {e}", exc_info=True + ) + logger.error("Configuration reload failed") + + async def agent_request(self, request, respond, next, flow): + + try: + pattern = getattr(request, 'pattern', '') or '' + + # If no pattern set and this is the first iteration, route + if not pattern and not request.history: + context = UserAwareContext(flow, request.user) + + if self.meta_router: + pattern, task_type, framing = await self.meta_router.route( + request.question, context, + ) + else: + pattern = "react" + task_type = "general" + framing = "" + + # Update request with routing decision + request.pattern = pattern + request.task_type = task_type + request.framing = framing + + logger.info( + f"Routed to pattern={pattern}, " + f"task_type={task_type}" + ) + + # Dispatch to the selected pattern + if pattern == "plan-then-execute": + await self.plan_pattern.iterate( + request, respond, next, flow, + ) + elif pattern == "supervisor": + await self.supervisor_pattern.iterate( + request, respond, next, flow, + ) + else: + # Default to react + await self.react_pattern.iterate( + request, respond, next, flow, + ) + + except Exception as e: + + logger.error( + f"agent_request Exception: {e}", exc_info=True + ) + + logger.debug("Send error response...") + + error_obj = Error( + type="agent-error", + message=str(e), + ) + + r = AgentResponse( + chunk_type="error", + content=str(e), + end_of_message=True, + end_of_dialog=True, + error=error_obj, + ) + + await respond(r) + + @staticmethod + def add_args(parser): + + AgentService.add_args(parser) + + parser.add_argument( + '--max-iterations', + default=default_max_iterations, + help=f'Maximum number of react iterations ' + f'(default: {default_max_iterations})', + ) + + parser.add_argument( + '--config-type', + default="agent", + help='Configuration key for prompts (default: agent)', + ) + + +def run(): + Processor.launch(default_ident, __doc__) diff --git a/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py new file mode 100644 index 00000000..9070a393 --- /dev/null +++ b/trustgraph-flow/trustgraph/agent/orchestrator/supervisor_pattern.py @@ -0,0 +1,214 @@ +""" +SupervisorPattern — decomposes a query into subagent goals, fans out, +then synthesises results when all subagents complete. + +Phase 1 (decompose): LLM breaks the query into independent sub-goals. +Fan-out: Each sub-goal is emitted as a new AgentRequest on the agent + request topic, carrying a correlation_id and parent_session_id. +Phase 2 (synthesise): Triggered when the aggregator detects all + subagents have completed. The supervisor fetches results and + produces the final answer. +""" + +import json +import logging +import uuid + +from ... schema import AgentRequest, AgentResponse, AgentStep + +from ..react.types import Action, Final + +from . pattern_base import PatternBase + +logger = logging.getLogger(__name__) + +MAX_SUBAGENTS = 5 + + +class SupervisorPattern(PatternBase): + """ + Supervisor pattern: decompose, fan-out, synthesise. + + History tracks phase via AgentStep.step_type: + - "decompose": the decomposition step (subagent goals in arguments) + - "synthesise": triggered by aggregator with results in subagent_results + """ + + async def iterate(self, request, respond, next, flow): + + streaming = getattr(request, 'streaming', False) + session_id = getattr(request, 'session_id', '') or str(uuid.uuid4()) + collection = getattr(request, 'collection', 'default') + iteration_num = len(request.history) + 1 + session_uri = self.processor.provenance_session_uri(session_id) + + # Emit session provenance on first iteration + if iteration_num == 1: + await self.emit_session_triples( + flow, session_uri, request.question, + request.user, collection, respond, streaming, + ) + + logger.info( + f"SupervisorPattern iteration {iteration_num}: {request.question}" + ) + + # Check if this is a synthesis request (has subagent_results) + has_results = bool( + request.history + and any( + getattr(h, 'step_type', '') == 'decompose' + for h in request.history + ) + and any( + getattr(h, 'subagent_results', None) + for h in request.history + ) + ) + + if has_results: + await self._synthesise( + request, respond, next, flow, + session_id, collection, streaming, + session_uri, iteration_num, + ) + else: + await self._decompose_and_fanout( + request, respond, next, flow, + session_id, collection, streaming, + session_uri, iteration_num, + ) + + async def _decompose_and_fanout(self, request, respond, next, flow, + session_id, collection, streaming, + session_uri, iteration_num): + """Decompose the question into sub-goals and fan out subagents.""" + + think = self.make_think_callback(respond, streaming) + framing = getattr(request, 'framing', '') + + tools = self.filter_tools(self.processor.agent.tools, request) + + context = self.make_context(flow, request.user) + client = context("prompt-request") + + # Use the supervisor-decompose prompt template + goals = await client.prompt( + id="supervisor-decompose", + variables={ + "question": request.question, + "framing": framing, + "max_subagents": MAX_SUBAGENTS, + "tools": [ + {"name": t.name, "description": t.description} + for t in tools.values() + ], + }, + ) + + # Validate result + if not isinstance(goals, list): + goals = [] + goals = [g for g in goals if isinstance(g, str)] + goals = goals[:MAX_SUBAGENTS] + + if not goals: + goals = [request.question] + + await think( + f"Decomposed into {len(goals)} sub-goals: {goals}", + is_final=True, + ) + + # Generate correlation ID for this fan-out + correlation_id = str(uuid.uuid4()) + + # Emit decomposition provenance + decompose_act = Action( + thought=f"Decomposed into {len(goals)} sub-goals", + name="decompose", + arguments={"goals": json.dumps(goals), "correlation_id": correlation_id}, + observation=f"Fanning out {len(goals)} subagents", + ) + await self.emit_iteration_triples( + flow, session_id, iteration_num, session_uri, + decompose_act, request, respond, streaming, + ) + + # Fan out: emit a subagent request for each goal + for i, goal in enumerate(goals): + subagent_session = str(uuid.uuid4()) + sub_request = AgentRequest( + question=goal, + state="", + group=getattr(request, 'group', []), + history=[], + user=request.user, + collection=collection, + streaming=False, # Subagents don't stream + session_id=subagent_session, + conversation_id=getattr(request, 'conversation_id', ''), + pattern="react", # Subagents use react by default + task_type=getattr(request, 'task_type', ''), + framing=getattr(request, 'framing', ''), + correlation_id=correlation_id, + parent_session_id=session_id, + subagent_goal=goal, + expected_siblings=len(goals), + ) + await next(sub_request) + logger.info(f"Fan-out: emitted subagent {i} for goal: {goal}") + + # NOTE: The supervisor stops here. The aggregator will detect + # when all subagents complete and emit a synthesis request + # with the results populated. + logger.info( + f"Supervisor fan-out complete: {len(goals)} subagents, " + f"correlation_id={correlation_id}" + ) + + async def _synthesise(self, request, respond, next, flow, + session_id, collection, streaming, + session_uri, iteration_num): + """Synthesise final answer from subagent results.""" + + think = self.make_think_callback(respond, streaming) + framing = getattr(request, 'framing', '') + + # Collect subagent results from history + subagent_results = {} + for step in request.history: + results = getattr(step, 'subagent_results', None) + if results: + subagent_results.update(results) + + if not subagent_results: + logger.warning("Synthesis called with no subagent results") + subagent_results = {"(no results)": "No subagent results available"} + + context = self.make_context(flow, request.user) + client = context("prompt-request") + + await think("Synthesising final answer from sub-agent results", is_final=True) + + response_text = await self.prompt_as_answer( + client, "supervisor-synthesise", + variables={ + "question": request.question, + "framing": framing, + "results": [ + {"goal": goal, "result": result} + for goal, result in subagent_results.items() + ], + }, + respond=respond, + streaming=streaming, + ) + + await self.emit_final_triples( + flow, session_id, iteration_num, session_uri, + response_text, request, respond, streaming, + ) + await self.send_final_response( + respond, streaming, response_text, already_streamed=streaming, + ) diff --git a/trustgraph-flow/trustgraph/agent/react/service.py b/trustgraph-flow/trustgraph/agent/react/service.py index c037c937..1bca9627 100755 --- a/trustgraph-flow/trustgraph/agent/react/service.py +++ b/trustgraph-flow/trustgraph/agent/react/service.py @@ -485,25 +485,16 @@ class Processor(AgentService): logger.debug(f"Think: {x} (is_final={is_final})") if streaming: - # Streaming format r = AgentResponse( chunk_type="thought", content=x, end_of_message=is_final, end_of_dialog=False, - # Legacy fields for backward compatibility - answer=None, - error=None, - thought=x, - observation=None, ) else: - # Non-streaming format r = AgentResponse( - answer=None, - error=None, - thought=x, - observation=None, + chunk_type="thought", + content=x, end_of_message=True, end_of_dialog=False, ) @@ -515,25 +506,16 @@ class Processor(AgentService): logger.debug(f"Observe: {x} (is_final={is_final})") if streaming: - # Streaming format r = AgentResponse( chunk_type="observation", content=x, end_of_message=is_final, end_of_dialog=False, - # Legacy fields for backward compatibility - answer=None, - error=None, - thought=None, - observation=x, ) else: - # Non-streaming format r = AgentResponse( - answer=None, - error=None, - thought=None, - observation=x, + chunk_type="observation", + content=x, end_of_message=True, end_of_dialog=False, ) @@ -545,25 +527,16 @@ class Processor(AgentService): logger.debug(f"Answer: {x}") if streaming: - # Streaming format r = AgentResponse( chunk_type="answer", content=x, - end_of_message=False, # More chunks may follow + end_of_message=False, end_of_dialog=False, - # Legacy fields for backward compatibility - answer=None, - error=None, - thought=None, - observation=None, ) else: - # Non-streaming format - shouldn't normally be called r = AgentResponse( - answer=x, - error=None, - thought=None, - observation=None, + chunk_type="answer", + content=x, end_of_message=True, end_of_dialog=False, ) @@ -677,25 +650,17 @@ class Processor(AgentService): )) if streaming: - # Streaming format - send end-of-dialog marker - # Answer chunks were already sent via answer() callback during parsing + # End-of-dialog marker — answer chunks already sent via callback r = AgentResponse( chunk_type="answer", - content="", # Empty content, just marking end of dialog + content="", end_of_message=True, end_of_dialog=True, - # Legacy fields set to None - answer already sent via streaming chunks - answer=None, - error=None, - thought=None, ) else: - # Non-streaming format - send complete answer r = AgentResponse( - answer=act.final, - error=None, - thought=None, - observation=None, + chunk_type="answer", + content=f, end_of_message=True, end_of_dialog=True, ) @@ -833,21 +798,13 @@ class Processor(AgentService): # Check if streaming was enabled (may not be set if error occurred early) streaming = getattr(request, 'streaming', False) if 'request' in locals() else False - if streaming: - # Streaming format - r = AgentResponse( - chunk_type="error", - content=str(e), - end_of_message=True, - end_of_dialog=True, - # Legacy fields for backward compatibility - error=error_obj, - ) - else: - # Legacy format - r = AgentResponse( - error=error_obj, - ) + r = AgentResponse( + chunk_type="error", + content=str(e), + end_of_message=True, + end_of_dialog=True, + error=error_obj, + ) await respond(r)