Agent improvements (#200)

* moved agent tool creation functions and helpers to separate file

* refactored the while loop into smaller functions

* added structured context passing while handing off

* fixed zod issues

* fixed tool calls issue in task and pipeline agents

* Allow pipeline to be set as start agent

* fixed pipeline agent looping issue when made start agent

* fix to show correct agent name after handoffs

* addressed review comments on not touching workflow types

* filter out empty agent messages

* partial: make task agents not loop when set as start agent

* Resolve merge conflicts after rebase - keep both onSetMainAgent prop and HTMLDivElement ref type

* Allow pipeline to be set as start agent

* Add infinite loop protection and remove JSON formatting from agent instructions

* remove whitespace diffs

---------

Co-authored-by: Ramnique Singh <30795890+ramnique@users.noreply.github.com>
This commit is contained in:
arkml 2025-08-14 19:22:43 +05:30 committed by GitHub
parent 6ce96f942f
commit 852e02e49e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1965 additions and 736 deletions

View file

@ -0,0 +1,292 @@
// Agent handoffs using OpenAI Agents SDK native capabilities
import { Agent, handoff, Handoff } from "@openai/agents";
import { z } from "zod";
import { PrefixLogger } from "./utils";
import {
WorkflowAgent
} from "./types/workflow_types";
import {
HandoffContext,
PipelineContext,
TaskContext,
PipelineExecutionState
} from "./agents";
// Types for handoff input data (from SDK)
export interface HandoffInputData {
inputHistory: string | any[];
preHandoffItems: any[];
newItems: any[];
runContext?: any;
}
export type HandoffContextType = 'pipeline' | 'task' | 'direct';
export interface AgentHandoffConfig {
inputSchema?: z.ZodObject<any>;
onHandoff?: (context: any, input: any) => void;
inputFilter?: (data: HandoffInputData) => HandoffInputData;
logger?: PrefixLogger;
}
// Get default schema based on context type
function getDefaultSchemaForContext(contextType: HandoffContextType): z.ZodObject<any> {
switch (contextType) {
case 'pipeline':
return PipelineContext;
case 'task':
return TaskContext;
case 'direct':
default:
return HandoffContext;
}
}
// Create context-aware input filter
function createDefaultInputFilter(contextType: HandoffContextType) {
return (data: HandoffInputData): HandoffInputData => {
switch (contextType) {
case 'pipeline':
return filterForPipeline(data);
case 'task':
return filterForTask(data);
case 'direct':
default:
return data; // Pass through all context for direct handoffs
}
};
}
// Filter context for pipeline execution
function filterForPipeline(data: HandoffInputData): HandoffInputData {
// Keep recent context relevant to pipeline execution
const maxHistoryItems = 10; // Configurable limit
return {
...data,
inputHistory: Array.isArray(data.inputHistory)
? data.inputHistory.slice(-maxHistoryItems)
: data.inputHistory,
// Filter out non-pipeline related tool calls
preHandoffItems: data.preHandoffItems.filter(item =>
!item.type ||
item.type === 'message' ||
item.type === 'tool_call' && item.name?.includes('pipeline')
)
};
}
// Filter context for task delegation
function filterForTask(data: HandoffInputData): HandoffInputData {
// Keep task-relevant context only
const maxHistoryItems = 20; // Tasks may need more context
return {
...data,
inputHistory: Array.isArray(data.inputHistory)
? data.inputHistory.slice(-maxHistoryItems)
: data.inputHistory,
// Keep all items for task context
preHandoffItems: data.preHandoffItems
};
}
// Create SDK-native handoff with rich context
export function createAgentHandoff(
targetAgent: Agent,
contextType: HandoffContextType,
config: AgentHandoffConfig = {}
): Handoff {
const inputSchema = config.inputSchema || getDefaultSchemaForContext(contextType);
const logger = config.logger;
logger?.log(`Creating handoff to ${targetAgent.name} with context type: ${contextType}`);
// Create OpenAI API compliant tool name
const sanitizedAgentName = targetAgent.name
.replace(/[^a-zA-Z0-9_-]/g, '_') // Replace invalid chars with underscore
.replace(/_+/g, '_') // Replace multiple underscores with single
.replace(/^_+|_+$/g, '') // Remove leading/trailing underscores
.substring(0, 50); // Limit length
const toolName = `handoff_to_${sanitizedAgentName}`;
logger?.log(`Creating handoff tool: ${toolName} -> ${targetAgent.name}`);
return handoff(targetAgent, {
inputType: inputSchema,
toolNameOverride: toolName,
toolDescriptionOverride: `Transfer control to ${targetAgent.name} with structured context data`,
onHandoff: async (runContext, inputString) => {
try {
const inputStr = typeof inputString === 'string' ? inputString : '{}';
let input = JSON.parse(inputStr || '{}');
// Validate and enrich the parsed input with defaults
const schema = config.inputSchema || getDefaultSchemaForContext(contextType);
const validationResult = schema.safeParse(input);
if (!validationResult.success) {
logger?.log(`Handoff input validation failed for ${targetAgent.name}, enriching with defaults:`, validationResult.error.issues.map(i => i.path.join('.') + ': ' + i.message));
// Parse with defaults to get a valid object
input = schema.parse({});
logger?.log(`Using default context for handoff to ${targetAgent.name}`);
} else {
logger?.log(`Handoff input validation succeeded for ${targetAgent.name}`);
input = validationResult.data;
}
logger?.log(`Handoff to ${targetAgent.name} with input:`, input);
// Execute custom handoff logic
config.onHandoff?.(runContext, input);
// Log the handoff for debugging
logHandoffEvent(targetAgent.name, contextType, input, logger);
} catch (error) {
logger?.log(`Error in handoff to ${targetAgent.name}:`, error);
throw error;
}
},
inputFilter: config.inputFilter || createDefaultInputFilter(contextType)
});
}
// Create handoff for pipeline execution
export function createPipelineHandoff(
targetAgent: Agent,
pipelineState: z.infer<typeof PipelineExecutionState>,
logger?: PrefixLogger
): Handoff {
const pipelineContext = {
reason: 'pipeline_execution' as const,
parentAgent: pipelineState.callingAgent,
transferCount: 0,
pipelineName: pipelineState.pipelineName,
currentStep: pipelineState.currentStep,
totalSteps: pipelineState.totalSteps,
isLastStep: pipelineState.currentStep >= pipelineState.totalSteps - 1,
pipelineData: pipelineState.pipelineData || null,
stepResults: pipelineState.stepResults || null
};
return createAgentHandoff(targetAgent, 'pipeline', {
inputSchema: PipelineContext,
onHandoff: (context, input) => {
logger?.log(`Pipeline step ${pipelineState.currentStep + 1}/${pipelineState.totalSteps} - handing off to ${targetAgent.name}`);
// Store pipeline state for the target agent
storePipelineStateForAgent(targetAgent.name, pipelineState);
},
inputFilter: (data) => {
// Inject pipeline context into the conversation
const contextMessage = createPipelineContextMessage(pipelineContext);
return {
...data,
newItems: [
...data.newItems,
{
type: 'message',
role: 'system',
content: contextMessage
}
]
};
},
logger
});
}
// Create handoff for task delegation
export function createTaskHandoff(
targetAgent: Agent,
taskContext: {
taskType: string;
priority: 'low' | 'medium' | 'high';
parentAgent: string;
requirements?: string[];
resources?: Record<string, any>;
},
logger?: PrefixLogger
): Handoff {
return createAgentHandoff(targetAgent, 'task', {
inputSchema: TaskContext,
onHandoff: (context, input) => {
logger?.log(`Task delegation to ${targetAgent.name}:`, {
taskType: taskContext.taskType,
priority: taskContext.priority
});
},
logger
});
}
// Get schema based on agent configuration
export function getSchemaForAgent(agentConfig: z.infer<typeof WorkflowAgent>): z.ZodObject<any> {
// Always start with basic HandoffContext - more specific contexts are used
// only when explicitly creating pipeline or task handoffs
return HandoffContext;
// NOTE: PipelineContext and TaskContext are used only in specific creation functions
// like createPipelineHandoff() and createTaskHandoff(), not for general agent handoffs
}
// Create context filter based on agent configuration
export function createContextFilterForAgent(agentConfig: z.infer<typeof WorkflowAgent>) {
return (data: HandoffInputData): HandoffInputData => {
// Use basic passthrough filtering for regular handoffs
// Specific filtering is handled by createPipelineHandoff and createTaskHandoff
return data;
};
}
// Helper functions
function logHandoffEvent(
targetAgent: string,
contextType: string,
input: any,
logger?: PrefixLogger
) {
logger?.log(`🔄 SDK HANDOFF: -> ${targetAgent} (${contextType})`, {
targetAgent,
contextType,
hasContext: !!input && Object.keys(input).length > 0
});
}
// Simple storage for pipeline state (in production, use proper state management)
const pipelineStates = new Map<string, z.infer<typeof PipelineExecutionState>>();
function storePipelineStateForAgent(
agentName: string,
state: z.infer<typeof PipelineExecutionState>
) {
pipelineStates.set(agentName, state);
}
export function getPipelineStateForAgent(
agentName: string
): z.infer<typeof PipelineExecutionState> | null {
return pipelineStates.get(agentName) || null;
}
function createPipelineContextMessage(context: any): string {
return `## Pipeline Execution Context
Pipeline: ${context.pipelineName}
Step: ${context.currentStep + 1}/${context.totalSteps}
${context.isLastStep ? '**Final Step**: Provide complete results.' : '**Continue**: Pass results to next step.'}
${context.stepResults && context.stepResults.length > 0
? `Previous Results:\n${JSON.stringify(context.stepResults, null, 2)}`
: 'No previous results.'
}
${context.pipelineData
? `Pipeline Data:\n${JSON.stringify(context.pipelineData, null, 2)}`
: ''
}`;
}

View file

@ -0,0 +1,511 @@
// External dependencies
import { tool, Tool } from "@openai/agents";
import { createOpenAI } from "@ai-sdk/openai";
import { embed, generateText } from "ai";
import { ObjectId } from "mongodb";
import { z } from "zod";
import { composio } from "./composio/composio";
import { SignJWT } from "jose";
import crypto from "crypto";
// Internal dependencies
import { embeddingModel } from '../lib/embedding';
import { getMcpClient } from "./mcp";
import { dataSourceDocsCollection, dataSourcesCollection, projectsCollection } from "./mongodb";
import { qdrantClient } from '../lib/qdrant';
import { EmbeddingRecord } from "./types/datasource_types";
import { WorkflowAgent, WorkflowTool } from "./types/workflow_types";
import { PrefixLogger } from "./utils";
// Provider configuration
const PROVIDER_API_KEY = process.env.PROVIDER_API_KEY || process.env.OPENAI_API_KEY || '';
const PROVIDER_BASE_URL = process.env.PROVIDER_BASE_URL || undefined;
const MODEL = process.env.PROVIDER_DEFAULT_MODEL || 'gpt-4o';
const openai = createOpenAI({
apiKey: PROVIDER_API_KEY,
baseURL: PROVIDER_BASE_URL,
});
// Helper to handle mock tool responses
export async function invokeMockTool(
logger: PrefixLogger,
toolName: string,
args: string,
description: string,
mockInstructions: string
): Promise<string> {
logger = logger.child(`invokeMockTool`);
logger.log(`toolName: ${toolName}`);
logger.log(`args: ${args}`);
logger.log(`description: ${description}`);
logger.log(`mockInstructions: ${mockInstructions}`);
const messages: Parameters<typeof generateText>[0]['messages'] = [{
role: "system" as const,
content: `You are simulating the execution of a tool called '${toolName}'. Here is the description of the tool: ${description}. Here are the instructions for the mock tool: ${mockInstructions}. Generate a realistic response as if the tool was actually executed with the given parameters.`
}, {
role: "user" as const,
content: `Generate a realistic response for the tool '${toolName}' with these parameters: ${args}. The response should be concise and focused on what the tool would actually return.`
}];
const { text } = await generateText({
model: openai(MODEL),
messages,
});
logger.log(`generated text: ${text}`);
return text;
}
// Helper to handle RAG tool calls
export async function invokeRagTool(
logger: PrefixLogger,
projectId: string,
query: string,
sourceIds: string[],
returnType: 'chunks' | 'content',
k: number
): Promise<{
title: string;
name: string;
content: string;
docId: string;
sourceId: string;
}[]> {
logger = logger.child(`invokeRagTool`);
logger.log(`projectId: ${projectId}`);
logger.log(`query: ${query}`);
logger.log(`sourceIds: ${sourceIds.join(', ')}`);
logger.log(`returnType: ${returnType}`);
logger.log(`k: ${k}`);
// Create embedding for question
const { embedding } = await embed({
model: embeddingModel,
value: query,
});
// Fetch all data sources for this project
const sources = await dataSourcesCollection.find({
projectId: projectId,
active: true,
}).toArray();
const validSourceIds = sources
.filter(s => sourceIds.includes(s._id.toString())) // id should be in sourceIds
.filter(s => s.active) // should be active
.map(s => s._id.toString());
logger.log(`valid source ids: ${validSourceIds.join(', ')}`);
// if no sources found, return empty response
if (validSourceIds.length === 0) {
logger.log(`no valid source ids found, returning empty response`);
return [];
}
// Perform vector search
const qdrantResults = await qdrantClient.query("embeddings", {
query: embedding,
filter: {
must: [
{ key: "projectId", match: { value: projectId } },
{ key: "sourceId", match: { any: validSourceIds } },
],
},
limit: k,
with_payload: true,
});
logger.log(`found ${qdrantResults.points.length} results`);
// if return type is chunks, return the chunks
let results = qdrantResults.points.map((point) => {
const { title, name, content, docId, sourceId } = point.payload as z.infer<typeof EmbeddingRecord>['payload'];
return {
title,
name,
content,
docId,
sourceId,
};
});
if (returnType === 'chunks') {
logger.log(`returning chunks`);
return results;
}
// otherwise, fetch the doc contents from mongodb
const docs = await dataSourceDocsCollection.find({
_id: { $in: results.map(r => new ObjectId(r.docId)) },
}).toArray();
logger.log(`fetched docs: ${docs.length}`);
// map the results to the docs
results = results.map(r => {
const doc = docs.find(d => d._id.toString() === r.docId);
return {
...r,
content: doc?.content || '',
};
});
return results;
}
export async function invokeWebhookTool(
logger: PrefixLogger,
projectId: string,
name: string,
input: any,
): Promise<unknown> {
logger = logger.child(`invokeWebhookTool`);
logger.log(`projectId: ${projectId}`);
logger.log(`name: ${name}`);
logger.log(`input: ${JSON.stringify(input)}`);
const project = await projectsCollection.findOne({
"_id": projectId,
});
if (!project) {
throw new Error('Project not found');
}
if (!project.webhookUrl) {
throw new Error('Webhook URL not found');
}
// prepare request body
const toolCall = {
id: crypto.randomUUID(),
type: "function" as const,
function: {
name,
arguments: JSON.stringify(input),
},
}
const content = JSON.stringify({
toolCall,
});
const requestId = crypto.randomUUID();
const bodyHash = crypto
.createHash('sha256')
.update(content, 'utf8')
.digest('hex');
// sign request
const jwt = await new SignJWT({
requestId,
projectId,
bodyHash,
})
.setProtectedHeader({
alg: 'HS256',
typ: 'JWT',
})
.setIssuer('rowboat')
.setAudience(project.webhookUrl)
.setSubject(`tool-call-${toolCall.id}`)
.setJti(requestId)
.setIssuedAt()
.setExpirationTime("5 minutes")
.sign(new TextEncoder().encode(project.secret));
// make request
const request = {
requestId,
content,
};
const response = await fetch(project.webhookUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-signature-jwt': jwt,
},
body: JSON.stringify(request),
});
if (!response.ok) {
throw new Error(`Failed to call webhook: ${response.status}: ${response.statusText}`);
}
const responseBody = await response.json();
return responseBody;
}
// Helper to handle MCP tool calls
export async function invokeMcpTool(
logger: PrefixLogger,
projectId: string,
name: string,
input: any,
mcpServerName: string
) {
logger = logger.child(`invokeMcpTool`);
logger.log(`projectId: ${projectId}`);
logger.log(`name: ${name}`);
logger.log(`input: ${JSON.stringify(input)}`);
logger.log(`mcpServerName: ${mcpServerName}`);
// Get project configuration
const project = await projectsCollection.findOne({ _id: projectId });
if (!project) {
throw new Error(`project ${projectId} not found`);
}
// get server url from project data
const mcpServerURL = project.customMcpServers?.[mcpServerName]?.serverUrl;
if (!mcpServerURL) {
throw new Error(`mcp server url not found for project ${projectId} and server ${mcpServerName}`);
}
const client = await getMcpClient(mcpServerURL, mcpServerName);
const result = await client.callTool({
name,
arguments: input,
});
logger.log(`mcp tool result: ${JSON.stringify(result)}`);
await client.close();
return result;
}
// Helper to handle composio tool calls
export async function invokeComposioTool(
logger: PrefixLogger,
projectId: string,
name: string,
composioData: z.infer<typeof WorkflowTool>['composioData'] & {},
input: any,
) {
logger = logger.child(`invokeComposioTool`);
logger.log(`projectId: ${projectId}`);
logger.log(`name: ${name}`);
logger.log(`input: ${JSON.stringify(input)}`);
const { slug, toolkitSlug, noAuth } = composioData;
let connectedAccountId: string | undefined = undefined;
if (!noAuth) {
const project = await projectsCollection.findOne({ _id: projectId });
if (!project) {
throw new Error(`project ${projectId} not found`);
}
connectedAccountId = project.composioConnectedAccounts?.[toolkitSlug]?.id;
if (!connectedAccountId) {
throw new Error(`connected account id not found for project ${projectId} and toolkit ${toolkitSlug}`);
}
}
const result = await composio.tools.execute(slug, {
userId: projectId,
arguments: input,
connectedAccountId: connectedAccountId,
});
logger.log(`composio tool result: ${JSON.stringify(result)}`);
return result.data;
}
// Helper to create RAG tool
export function createRagTool(
logger: PrefixLogger,
config: z.infer<typeof WorkflowAgent>,
projectId: string
): Tool {
if (!config.ragDataSources?.length) {
throw new Error(`data sources not found for agent ${config.name}`);
}
return tool({
name: "rag_search",
description: config.description,
parameters: z.object({
query: z.string().describe("The query to search for")
}),
async execute(input: { query: string }) {
const results = await invokeRagTool(
logger,
projectId,
input.query,
config.ragDataSources || [],
config.ragReturnType || 'chunks',
config.ragK || 3
);
return JSON.stringify({
results,
});
}
});
}
// Helper to create a mock tool
export function createMockTool(
logger: PrefixLogger,
config: z.infer<typeof WorkflowTool>,
): Tool {
return tool({
name: config.name,
description: config.description,
strict: false,
parameters: {
type: 'object',
properties: config.parameters.properties,
required: config.parameters.required || [],
additionalProperties: true,
},
async execute(input: any) {
try {
const result = await invokeMockTool(
logger,
config.name,
JSON.stringify(input),
config.description,
config.mockInstructions || ''
);
return JSON.stringify({
result,
});
} catch (error) {
logger.log(`Error executing mock tool ${config.name}:`, error);
return JSON.stringify({
error: `Mock tool execution failed: ${error}`,
});
}
}
});
}
// Helper to create a webhook tool
export function createWebhookTool(
logger: PrefixLogger,
config: z.infer<typeof WorkflowTool>,
projectId: string,
): Tool {
const { name, description, parameters } = config;
return tool({
name,
description,
strict: false,
parameters: {
type: 'object',
properties: parameters.properties,
required: parameters.required || [],
additionalProperties: true,
},
async execute(input: any) {
try {
const result = await invokeWebhookTool(logger, projectId, name, input);
return JSON.stringify({
result,
});
} catch (error) {
logger.log(`Error executing webhook tool ${config.name}:`, error);
return JSON.stringify({
error: `Tool execution failed: ${error}`,
});
}
}
});
}
// Helper to create an mcp tool
export function createMcpTool(
logger: PrefixLogger,
config: z.infer<typeof WorkflowTool>,
projectId: string
): Tool {
const { name, description, parameters, mcpServerName } = config;
return tool({
name,
description,
strict: false,
parameters: {
type: 'object',
properties: parameters.properties,
required: parameters.required || [],
additionalProperties: true,
},
async execute(input: any) {
try {
const result = await invokeMcpTool(logger, projectId, name, input, mcpServerName || '');
return JSON.stringify({
result,
});
} catch (error) {
logger.log(`Error executing mcp tool ${name}:`, error);
return JSON.stringify({
error: `Tool execution failed: ${error}`,
});
}
}
});
}
// Helper to create a composio tool
export function createComposioTool(
logger: PrefixLogger,
config: z.infer<typeof WorkflowTool>,
projectId: string
): Tool {
const { name, description, parameters, composioData } = config;
if (!composioData) {
throw new Error(`composio data not found for tool ${name}`);
}
return tool({
name,
description,
strict: false,
parameters: {
type: 'object',
properties: parameters.properties,
required: parameters.required || [],
additionalProperties: true,
},
async execute(input: any) {
try {
const result = await invokeComposioTool(logger, projectId, name, composioData, input);
return JSON.stringify({
result,
});
} catch (error) {
logger.log(`Error executing composio tool ${name}:`, error);
return JSON.stringify({
error: `Tool execution failed: ${error}`,
});
}
}
});
}
export function createTools(
logger: PrefixLogger,
projectId: string,
workflow: { tools: z.infer<typeof WorkflowTool>[] },
toolConfig: Record<string, z.infer<typeof WorkflowTool>>,
): Record<string, Tool> {
const tools: Record<string, Tool> = {};
const toolLogger = logger.child('createTools');
toolLogger.log(`=== CREATING ${Object.keys(toolConfig).length} TOOLS ===`);
for (const [toolName, config] of Object.entries(toolConfig)) {
toolLogger.log(`creating tool: ${toolName} (type: ${config.mockTool ? 'mock' : config.isMcp ? 'mcp' : config.isComposio ? 'composio' : 'webhook'})`);
if (config.mockTool) {
tools[toolName] = createMockTool(logger, config);
toolLogger.log(`✓ created mock tool: ${toolName}`);
} else if (config.isMcp) {
tools[toolName] = createMcpTool(logger, config, projectId);
toolLogger.log(`✓ created mcp tool: ${toolName} (server: ${config.mcpServerName || 'unknown'})`);
} else if (config.isComposio) {
tools[toolName] = createComposioTool(logger, config, projectId);
toolLogger.log(`✓ created composio tool: ${toolName}`);
} else {
tools[toolName] = createWebhookTool(logger, config, projectId);
toolLogger.log(`✓ created webhook tool: ${toolName} (fallback)`);
}
}
toolLogger.log(`=== TOOL CREATION COMPLETE ===`);
return tools;
}

View file

@ -121,11 +121,9 @@ export const CONVERSATION_TYPE_INSTRUCTIONS = (): string => `
export const TASK_TYPE_INSTRUCTIONS = (): string => `
- You are an agent that is part of a workflow of (one or more) interconnected agents that work together to be an assistant.
- Use the JSON format to convey your responses. The JSON should have 3 keys.
- The first key in the JSON response should be your "thought" - analysizing what has happened till now and what you need to do in this turn.
- The second key should be your "response". While you will put out a message, your response will not be shown directly to the user. Instead, your response will be used by the agent that might have invoked you and (possibly) other agents in the workflow. Therefore, your responses must be worded in such a way that it is useful for other agents and not addressed to the user.
- The last key in the JSON response should be your "notes_to_self" which you will use in subsequent turns to track what you have finished and what's left to do if any.
- IMPORTANT: If you have all the information to take action, such as calling a tool or writing a response, you should do that in the immediate turn. Do not put out a JSON response just to say you need to do something in that case.
- Your response will not be shown directly to the user. Instead, your response will be used by the agent that might have invoked you and (possibly) other agents in the workflow. Therefore, your responses must be worded in such a way that it is useful for other agents and not addressed to the user.
- Provide clear, direct responses that other agents can easily understand and act upon.
- IMPORTANT: If you have all the information to take action, such as calling a tool or writing a response, you should do that in the immediate turn. Do not delay action unnecessarily.
- Reading the messages in the chat history will give you context about the conversation.
- Seeing the tool calls that transfer / handoff control will help you understand the flow of the conversation and which agent produced each message.
- These are high level instructions only. The user will provide more specific instructions which will be below.
@ -138,10 +136,7 @@ export const PIPELINE_TYPE_INSTRUCTIONS = (): string => `
- Your output will be passed to the next step in the pipeline (or returned as the final result if you're the last step).
- CRITICAL: You CANNOT transfer to other agents or pipelines. You can only use tools to complete your specific task.
- Focus ONLY on your designated role in the pipeline. Process the input, perform your specific task, and provide clear output.
- Use the JSON format to convey your responses. The JSON should have 3 keys:
- "thought": Analyze the input from the previous pipeline step and plan what you need to do
- "response": Your processed output that will be passed to the next pipeline step. Make this clear and actionable.
- "pipeline_context": Brief notes about what you accomplished for the pipeline flow
- Provide clear, actionable output that the next pipeline step can easily understand and work with.
- Do NOT attempt to handle tasks outside your specific pipeline role.
- Do NOT mention other agents or the pipeline structure to users.
- Your response should be self-contained and ready to be consumed by the next pipeline step.

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,322 @@
// Pipeline State Manager for handling complex pipeline execution flow
import { Agent } from "@openai/agents";
import { z } from "zod";
import { WorkflowPipeline, WorkflowAgent } from "./types/workflow_types";
import { PipelineExecutionState } from "./agents";
import { PrefixLogger } from "./utils";
import { createPipelineHandoff } from "./agent-handoffs";
export interface PipelineExecutionResult {
action: 'handoff' | 'complete' | 'error';
nextAgent?: string;
handoff?: any; // SDK Handoff object
context?: any;
results?: any;
returnToAgent?: string;
error?: string;
}
export class PipelineStateManager {
private pipelineStates = new Map<string, z.infer<typeof PipelineExecutionState>>();
private logger: PrefixLogger;
constructor(logger: PrefixLogger) {
this.logger = logger.child('PipelineStateManager');
}
// Initialize a new pipeline execution
initializePipelineExecution(
pipelineName: string,
callingAgent: string,
pipelineConfig: z.infer<typeof WorkflowPipeline>,
initialData?: Record<string, any>
): z.infer<typeof PipelineExecutionState> {
const state: z.infer<typeof PipelineExecutionState> = {
pipelineName,
currentStep: 0,
totalSteps: pipelineConfig.agents.length,
callingAgent,
pipelineData: initialData || null,
stepResults: null,
currentStepResult: null,
startTime: new Date().toISOString(),
metadata: {
pipelineDescription: pipelineConfig.description
}
};
// Store initial state for the first agent
const firstAgent = pipelineConfig.agents[0];
this.storePipelineState(firstAgent, state);
this.logger.log(`🚀 Initialized pipeline "${pipelineName}" with ${state.totalSteps} steps`);
this.logger.log(`First agent: ${firstAgent}, called by: ${callingAgent}`);
return state;
}
// Handle pipeline execution step
async handlePipelineExecution(
currentAgentName: string,
pipelineConfig: Record<string, z.infer<typeof WorkflowPipeline>>,
agents: Record<string, Agent>,
stepResult?: Record<string, any>
): Promise<PipelineExecutionResult> {
const state = this.getPipelineState(currentAgentName);
if (!state) {
return {
action: 'error',
error: `No pipeline state found for agent ${currentAgentName}`
};
}
const pipeline = pipelineConfig[state.pipelineName];
if (!pipeline) {
return {
action: 'error',
error: `Pipeline ${state.pipelineName} not found in configuration`
};
}
// Store current step result
if (stepResult) {
// Safely handle stepResults as flexible union type
const existingResults = Array.isArray(state.stepResults) ? state.stepResults : [];
state.stepResults = [...existingResults, stepResult];
state.currentStepResult = stepResult;
// Update pipeline data if result contains data to pass forward
if (stepResult.pipelineData) {
// Safely handle pipelineData as flexible union type
const existingData = (typeof state.pipelineData === 'object' && state.pipelineData !== null) ? state.pipelineData : {};
const newData = (typeof stepResult.pipelineData === 'object' && stepResult.pipelineData !== null) ? stepResult.pipelineData : {};
state.pipelineData = {
...existingData,
...newData
};
}
}
this.logger.log(`📊 Pipeline "${state.pipelineName}" step ${state.currentStep + 1}/${state.totalSteps} completed by ${currentAgentName}`);
// Check if this is the last step
if (state.currentStep >= pipeline.agents.length - 1) {
// Pipeline complete - return to calling agent
this.logger.log(`✅ Pipeline "${state.pipelineName}" completed, returning to ${state.callingAgent}`);
const finalResults = {
pipelineName: state.pipelineName,
totalSteps: state.totalSteps,
stepResults: state.stepResults,
finalData: state.pipelineData,
completionTime: new Date().toISOString(),
duration: Date.now() - new Date(state.startTime).getTime()
};
// Clean up state
this.clearPipelineState(currentAgentName);
return {
action: 'complete',
results: finalResults,
returnToAgent: state.callingAgent
};
}
// Continue to next step
const nextStepIndex = state.currentStep + 1;
const nextAgentName = pipeline.agents[nextStepIndex];
if (!agents[nextAgentName]) {
return {
action: 'error',
error: `Next agent ${nextAgentName} not found in agents configuration`
};
}
// Update state for next step
const nextState: z.infer<typeof PipelineExecutionState> = {
...state,
currentStep: nextStepIndex,
currentStepResult: null // Reset for next step
};
// Store state for next agent
this.storePipelineState(nextAgentName, nextState);
// Create SDK handoff with rich context
const handoff = createPipelineHandoff(
agents[nextAgentName],
nextState,
this.logger
);
this.logger.log(`➡️ Pipeline "${state.pipelineName}": ${currentAgentName} -> ${nextAgentName} (step ${nextStepIndex + 1}/${state.totalSteps})`);
return {
action: 'handoff',
nextAgent: nextAgentName,
handoff,
context: {
reason: 'pipeline_execution',
pipelineName: state.pipelineName,
currentStep: nextStepIndex,
totalSteps: state.totalSteps,
isLastStep: nextStepIndex >= state.totalSteps - 1,
pipelineData: nextState.pipelineData,
stepResults: nextState.stepResults
}
};
}
// Store pipeline state for an agent
storePipelineState(agentName: string, state: z.infer<typeof PipelineExecutionState>): void {
this.pipelineStates.set(agentName, state);
this.logger.log(`💾 Stored pipeline state for ${agentName}: step ${state.currentStep + 1}/${state.totalSteps}`);
}
// Retrieve pipeline state for an agent
getPipelineState(agentName: string): z.infer<typeof PipelineExecutionState> | null {
return this.pipelineStates.get(agentName) || null;
}
// Clear pipeline state (cleanup)
clearPipelineState(agentName: string): void {
this.pipelineStates.delete(agentName);
this.logger.log(`🗑️ Cleared pipeline state for ${agentName}`);
}
// Check if agent is in a pipeline
isAgentInPipeline(agentName: string): boolean {
return this.pipelineStates.has(agentName);
}
// Get all active pipelines (for debugging)
getActivePipelines(): Array<{agentName: string, state: z.infer<typeof PipelineExecutionState>}> {
return Array.from(this.pipelineStates.entries()).map(([agentName, state]) => ({
agentName,
state
}));
}
// Inject pipeline context into agent instructions
injectPipelineContext(
agent: Agent,
agentName: string,
originalInstructions: string
): string {
const state = this.getPipelineState(agentName);
if (!state) {
return originalInstructions;
}
const contextPrompt = this.createPipelineContextPrompt(state);
const enhancedInstructions = `${originalInstructions}\n\n${contextPrompt}`;
this.logger.log(`📝 Injected pipeline context for ${agentName} in pipeline "${state.pipelineName}"`);
return enhancedInstructions;
}
// Create pipeline context prompt
private createPipelineContextPrompt(state: z.infer<typeof PipelineExecutionState>): string {
const stepInfo = `Step ${state.currentStep + 1} of ${state.totalSteps}`;
const isLast = state.currentStep >= state.totalSteps - 1;
let contextPrompt = `## 🔄 Pipeline Execution Context
**Pipeline**: ${state.pipelineName}
**Current Step**: ${stepInfo}
**Status**: ${isLast ? 'FINAL STEP - Provide complete results' : 'Intermediate step - Pass results forward'}
`;
if (state.stepResults && Array.isArray(state.stepResults) && state.stepResults.length > 0) {
contextPrompt += `**Previous Step Results**:
\`\`\`json
${JSON.stringify(state.stepResults, null, 2)}
\`\`\`
`;
}
if (state.pipelineData && typeof state.pipelineData === 'object' && state.pipelineData !== null && Object.keys(state.pipelineData).length > 0) {
contextPrompt += `**Pipeline Data**:
\`\`\`json
${JSON.stringify(state.pipelineData, null, 2)}
\`\`\`
`;
}
if (isLast) {
contextPrompt += `⚠️ **IMPORTANT**: This is the final step in the pipeline. Your response will be returned to the calling agent "${state.callingAgent}". Provide comprehensive results.
`;
} else {
contextPrompt += `➡️ **NEXT**: After completing your task, results will automatically flow to the next step in the pipeline.
`;
}
return contextPrompt;
}
// Error recovery - handle pipeline failures
handlePipelineError(
agentName: string,
error: string | Error,
shouldReturnToCaller: boolean = true
): PipelineExecutionResult {
const state = this.getPipelineState(agentName);
const errorMessage = typeof error === 'string' ? error : error.message;
this.logger.log(`❌ Pipeline error in agent ${agentName}: ${errorMessage}`);
if (state && shouldReturnToCaller) {
// Clean up and return to caller with error
this.clearPipelineState(agentName);
return {
action: 'complete',
results: {
pipelineName: state.pipelineName,
error: errorMessage,
completedSteps: state.currentStep,
totalSteps: state.totalSteps,
stepResults: state.stepResults
},
returnToAgent: state.callingAgent
};
}
return {
action: 'error',
error: errorMessage
};
}
// Get pipeline statistics (for monitoring)
getPipelineStats(): {
activePipelines: number;
pipelinesByName: Record<string, number>;
averageStepsCompleted: number;
} {
const pipelines = this.getActivePipelines();
const pipelinesByName: Record<string, number> = {};
let totalSteps = 0;
pipelines.forEach(({state}) => {
pipelinesByName[state.pipelineName] = (pipelinesByName[state.pipelineName] || 0) + 1;
totalSteps += state.currentStep + 1;
});
return {
activePipelines: pipelines.length,
pipelinesByName,
averageStepsCompleted: pipelines.length > 0 ? totalSteps / pipelines.length : 0
};
}
}

View file

@ -280,6 +280,7 @@ interface PipelineCardProps {
onDeletePipeline: (name: string) => void;
onDeleteAgent: (name: string) => void;
onAddAgentToPipeline: (pipelineName: string) => void;
onSetMainAgent: (name: string) => void;
selectedRef: React.RefObject<HTMLDivElement | null>;
startAgentName: string | null;
dragHandle?: React.ReactNode;
@ -294,6 +295,7 @@ const PipelineCard = ({
onDeletePipeline,
onDeleteAgent,
onAddAgentToPipeline,
onSetMainAgent,
selectedRef,
startAgentName,
dragHandle,
@ -347,6 +349,11 @@ const PipelineCard = ({
<div className="flex items-center gap-2">
<span className="text-xs">{pipeline.name}</span>
<span className="text-xs text-gray-500">({pipelineAgents.length} steps)</span>
{startAgentName === pipeline.name && (
<span className="text-xs bg-green-100 dark:bg-green-900/30 text-green-700 dark:text-green-400 px-2 py-0.5 rounded font-medium">
START
</span>
)}
</div>
</button>
@ -362,10 +369,19 @@ const PipelineCard = ({
onAction={(key) => {
if (key === 'delete') {
onDeletePipeline(pipeline.name);
} else if (key === 'set-main-agent') {
onSetMainAgent(pipeline.name);
}
}}
>
<DropdownItem key="delete" className="text-danger">Delete Pipeline</DropdownItem>
{startAgentName !== pipeline.name ? (
<>
<DropdownItem key="set-main-agent">Set as start agent</DropdownItem>
<DropdownItem key="delete" className="text-danger">Delete Pipeline</DropdownItem>
</>
) : (
<DropdownItem key="delete" className="text-danger">Delete Pipeline</DropdownItem>
)}
</DropdownMenu>
</Dropdown>
</div>
@ -740,6 +756,7 @@ export const EntityList = forwardRef<
onDeletePipeline={onDeletePipeline}
onDeleteAgent={onDeleteAgent}
onAddAgentToPipeline={onAddAgentToPipeline}
onSetMainAgent={onSetMainAgent}
selectedRef={selectedRef}
startAgentName={startAgentName}
/>
@ -1663,6 +1680,7 @@ const SortablePipelineItem = ({
onDeletePipeline,
onDeleteAgent,
onAddAgentToPipeline,
onSetMainAgent,
selectedRef,
startAgentName
}: {
@ -1677,6 +1695,7 @@ const SortablePipelineItem = ({
onDeletePipeline: (name: string) => void;
onDeleteAgent: (name: string) => void;
onAddAgentToPipeline: (pipelineName: string) => void;
onSetMainAgent: (name: string) => void;
selectedRef: React.RefObject<HTMLDivElement | null>;
startAgentName: string | null;
}) => {
@ -1706,6 +1725,7 @@ const SortablePipelineItem = ({
onDeletePipeline={onDeletePipeline}
onDeleteAgent={onDeleteAgent}
onAddAgentToPipeline={onAddAgentToPipeline}
onSetMainAgent={onSetMainAgent}
selectedRef={selectedRef}
startAgentName={startAgentName}
dragHandle={