From 852e02e49e92bddb86310b65715297d95dce6630 Mon Sep 17 00:00:00 2001 From: arkml Date: Thu, 14 Aug 2025 19:22:43 +0530 Subject: [PATCH] Agent improvements (#200) * moved agent tool creation functions and helpers to separate file * refactored the while loop into smaller functions * added structured context passing while handing off * fixed zod issues * fixed tool calls issue in task and pipeline agents * Allow pipeline to be set as start agent * fixed pipeline agent looping issue when made start agent * fix to show correct agent name after handoffs * addressed review comments on not touching workflow types * filter out empty agent messages * partial: make task agents not loop when set as start agent * Resolve merge conflicts after rebase - keep both onSetMainAgent prop and HTMLDivElement ref type * Allow pipeline to be set as start agent * Add infinite loop protection and remove JSON formatting from agent instructions * remove whitespace diffs --------- Co-authored-by: Ramnique Singh <30795890+ramnique@users.noreply.github.com> --- apps/rowboat/app/lib/agent-handoffs.ts | 292 ++++ apps/rowboat/app/lib/agent-tools.ts | 511 ++++++ apps/rowboat/app/lib/agent_instructions.ts | 13 +- apps/rowboat/app/lib/agents.ts | 1541 +++++++++-------- .../rowboat/app/lib/pipeline-state-manager.ts | 322 ++++ .../[projectId]/workflow/entity_list.tsx | 22 +- 6 files changed, 1965 insertions(+), 736 deletions(-) create mode 100644 apps/rowboat/app/lib/agent-handoffs.ts create mode 100644 apps/rowboat/app/lib/agent-tools.ts create mode 100644 apps/rowboat/app/lib/pipeline-state-manager.ts diff --git a/apps/rowboat/app/lib/agent-handoffs.ts b/apps/rowboat/app/lib/agent-handoffs.ts new file mode 100644 index 00000000..72bb6861 --- /dev/null +++ b/apps/rowboat/app/lib/agent-handoffs.ts @@ -0,0 +1,292 @@ +// Agent handoffs using OpenAI Agents SDK native capabilities +import { Agent, handoff, Handoff } from "@openai/agents"; +import { z } from "zod"; +import { PrefixLogger } from "./utils"; +import { + WorkflowAgent +} from "./types/workflow_types"; +import { + HandoffContext, + PipelineContext, + TaskContext, + PipelineExecutionState +} from "./agents"; + +// Types for handoff input data (from SDK) +export interface HandoffInputData { + inputHistory: string | any[]; + preHandoffItems: any[]; + newItems: any[]; + runContext?: any; +} + +export type HandoffContextType = 'pipeline' | 'task' | 'direct'; + +export interface AgentHandoffConfig { + inputSchema?: z.ZodObject; + onHandoff?: (context: any, input: any) => void; + inputFilter?: (data: HandoffInputData) => HandoffInputData; + logger?: PrefixLogger; +} + +// Get default schema based on context type +function getDefaultSchemaForContext(contextType: HandoffContextType): z.ZodObject { + switch (contextType) { + case 'pipeline': + return PipelineContext; + case 'task': + return TaskContext; + case 'direct': + default: + return HandoffContext; + } +} + +// Create context-aware input filter +function createDefaultInputFilter(contextType: HandoffContextType) { + return (data: HandoffInputData): HandoffInputData => { + switch (contextType) { + case 'pipeline': + return filterForPipeline(data); + case 'task': + return filterForTask(data); + case 'direct': + default: + return data; // Pass through all context for direct handoffs + } + }; +} + +// Filter context for pipeline execution +function filterForPipeline(data: HandoffInputData): HandoffInputData { + // Keep recent context relevant to pipeline execution + const maxHistoryItems = 10; // Configurable limit + + return { + ...data, + inputHistory: Array.isArray(data.inputHistory) + ? data.inputHistory.slice(-maxHistoryItems) + : data.inputHistory, + // Filter out non-pipeline related tool calls + preHandoffItems: data.preHandoffItems.filter(item => + !item.type || + item.type === 'message' || + item.type === 'tool_call' && item.name?.includes('pipeline') + ) + }; +} + +// Filter context for task delegation +function filterForTask(data: HandoffInputData): HandoffInputData { + // Keep task-relevant context only + const maxHistoryItems = 20; // Tasks may need more context + + return { + ...data, + inputHistory: Array.isArray(data.inputHistory) + ? data.inputHistory.slice(-maxHistoryItems) + : data.inputHistory, + // Keep all items for task context + preHandoffItems: data.preHandoffItems + }; +} + +// Create SDK-native handoff with rich context +export function createAgentHandoff( + targetAgent: Agent, + contextType: HandoffContextType, + config: AgentHandoffConfig = {} +): Handoff { + const inputSchema = config.inputSchema || getDefaultSchemaForContext(contextType); + const logger = config.logger; + + logger?.log(`Creating handoff to ${targetAgent.name} with context type: ${contextType}`); + + // Create OpenAI API compliant tool name + const sanitizedAgentName = targetAgent.name + .replace(/[^a-zA-Z0-9_-]/g, '_') // Replace invalid chars with underscore + .replace(/_+/g, '_') // Replace multiple underscores with single + .replace(/^_+|_+$/g, '') // Remove leading/trailing underscores + .substring(0, 50); // Limit length + + const toolName = `handoff_to_${sanitizedAgentName}`; + + logger?.log(`Creating handoff tool: ${toolName} -> ${targetAgent.name}`); + + return handoff(targetAgent, { + inputType: inputSchema, + toolNameOverride: toolName, + toolDescriptionOverride: `Transfer control to ${targetAgent.name} with structured context data`, + + onHandoff: async (runContext, inputString) => { + try { + const inputStr = typeof inputString === 'string' ? inputString : '{}'; + let input = JSON.parse(inputStr || '{}'); + + // Validate and enrich the parsed input with defaults + const schema = config.inputSchema || getDefaultSchemaForContext(contextType); + const validationResult = schema.safeParse(input); + + if (!validationResult.success) { + logger?.log(`Handoff input validation failed for ${targetAgent.name}, enriching with defaults:`, validationResult.error.issues.map(i => i.path.join('.') + ': ' + i.message)); + // Parse with defaults to get a valid object + input = schema.parse({}); + logger?.log(`Using default context for handoff to ${targetAgent.name}`); + } else { + logger?.log(`Handoff input validation succeeded for ${targetAgent.name}`); + input = validationResult.data; + } + + logger?.log(`Handoff to ${targetAgent.name} with input:`, input); + + // Execute custom handoff logic + config.onHandoff?.(runContext, input); + + // Log the handoff for debugging + logHandoffEvent(targetAgent.name, contextType, input, logger); + + } catch (error) { + logger?.log(`Error in handoff to ${targetAgent.name}:`, error); + throw error; + } + }, + + inputFilter: config.inputFilter || createDefaultInputFilter(contextType) + }); +} + +// Create handoff for pipeline execution +export function createPipelineHandoff( + targetAgent: Agent, + pipelineState: z.infer, + logger?: PrefixLogger +): Handoff { + const pipelineContext = { + reason: 'pipeline_execution' as const, + parentAgent: pipelineState.callingAgent, + transferCount: 0, + pipelineName: pipelineState.pipelineName, + currentStep: pipelineState.currentStep, + totalSteps: pipelineState.totalSteps, + isLastStep: pipelineState.currentStep >= pipelineState.totalSteps - 1, + pipelineData: pipelineState.pipelineData || null, + stepResults: pipelineState.stepResults || null + }; + + return createAgentHandoff(targetAgent, 'pipeline', { + inputSchema: PipelineContext, + onHandoff: (context, input) => { + logger?.log(`Pipeline step ${pipelineState.currentStep + 1}/${pipelineState.totalSteps} - handing off to ${targetAgent.name}`); + + // Store pipeline state for the target agent + storePipelineStateForAgent(targetAgent.name, pipelineState); + }, + inputFilter: (data) => { + // Inject pipeline context into the conversation + const contextMessage = createPipelineContextMessage(pipelineContext); + + return { + ...data, + newItems: [ + ...data.newItems, + { + type: 'message', + role: 'system', + content: contextMessage + } + ] + }; + }, + logger + }); +} + +// Create handoff for task delegation +export function createTaskHandoff( + targetAgent: Agent, + taskContext: { + taskType: string; + priority: 'low' | 'medium' | 'high'; + parentAgent: string; + requirements?: string[]; + resources?: Record; + }, + logger?: PrefixLogger +): Handoff { + return createAgentHandoff(targetAgent, 'task', { + inputSchema: TaskContext, + onHandoff: (context, input) => { + logger?.log(`Task delegation to ${targetAgent.name}:`, { + taskType: taskContext.taskType, + priority: taskContext.priority + }); + }, + logger + }); +} + +// Get schema based on agent configuration +export function getSchemaForAgent(agentConfig: z.infer): z.ZodObject { + // Always start with basic HandoffContext - more specific contexts are used + // only when explicitly creating pipeline or task handoffs + return HandoffContext; + + // NOTE: PipelineContext and TaskContext are used only in specific creation functions + // like createPipelineHandoff() and createTaskHandoff(), not for general agent handoffs +} + +// Create context filter based on agent configuration +export function createContextFilterForAgent(agentConfig: z.infer) { + return (data: HandoffInputData): HandoffInputData => { + // Use basic passthrough filtering for regular handoffs + // Specific filtering is handled by createPipelineHandoff and createTaskHandoff + return data; + }; +} + +// Helper functions +function logHandoffEvent( + targetAgent: string, + contextType: string, + input: any, + logger?: PrefixLogger +) { + logger?.log(`🔄 SDK HANDOFF: -> ${targetAgent} (${contextType})`, { + targetAgent, + contextType, + hasContext: !!input && Object.keys(input).length > 0 + }); +} + +// Simple storage for pipeline state (in production, use proper state management) +const pipelineStates = new Map>(); + +function storePipelineStateForAgent( + agentName: string, + state: z.infer +) { + pipelineStates.set(agentName, state); +} + +export function getPipelineStateForAgent( + agentName: string +): z.infer | null { + return pipelineStates.get(agentName) || null; +} + +function createPipelineContextMessage(context: any): string { + return `## Pipeline Execution Context +Pipeline: ${context.pipelineName} +Step: ${context.currentStep + 1}/${context.totalSteps} +${context.isLastStep ? '**Final Step**: Provide complete results.' : '**Continue**: Pass results to next step.'} + +${context.stepResults && context.stepResults.length > 0 + ? `Previous Results:\n${JSON.stringify(context.stepResults, null, 2)}` + : 'No previous results.' +} + +${context.pipelineData + ? `Pipeline Data:\n${JSON.stringify(context.pipelineData, null, 2)}` + : '' +}`; +} \ No newline at end of file diff --git a/apps/rowboat/app/lib/agent-tools.ts b/apps/rowboat/app/lib/agent-tools.ts new file mode 100644 index 00000000..d8659937 --- /dev/null +++ b/apps/rowboat/app/lib/agent-tools.ts @@ -0,0 +1,511 @@ +// External dependencies +import { tool, Tool } from "@openai/agents"; +import { createOpenAI } from "@ai-sdk/openai"; +import { embed, generateText } from "ai"; +import { ObjectId } from "mongodb"; +import { z } from "zod"; +import { composio } from "./composio/composio"; +import { SignJWT } from "jose"; +import crypto from "crypto"; + +// Internal dependencies +import { embeddingModel } from '../lib/embedding'; +import { getMcpClient } from "./mcp"; +import { dataSourceDocsCollection, dataSourcesCollection, projectsCollection } from "./mongodb"; +import { qdrantClient } from '../lib/qdrant'; +import { EmbeddingRecord } from "./types/datasource_types"; +import { WorkflowAgent, WorkflowTool } from "./types/workflow_types"; +import { PrefixLogger } from "./utils"; + +// Provider configuration +const PROVIDER_API_KEY = process.env.PROVIDER_API_KEY || process.env.OPENAI_API_KEY || ''; +const PROVIDER_BASE_URL = process.env.PROVIDER_BASE_URL || undefined; +const MODEL = process.env.PROVIDER_DEFAULT_MODEL || 'gpt-4o'; + +const openai = createOpenAI({ + apiKey: PROVIDER_API_KEY, + baseURL: PROVIDER_BASE_URL, +}); + +// Helper to handle mock tool responses +export async function invokeMockTool( + logger: PrefixLogger, + toolName: string, + args: string, + description: string, + mockInstructions: string +): Promise { + logger = logger.child(`invokeMockTool`); + logger.log(`toolName: ${toolName}`); + logger.log(`args: ${args}`); + logger.log(`description: ${description}`); + logger.log(`mockInstructions: ${mockInstructions}`); + + const messages: Parameters[0]['messages'] = [{ + role: "system" as const, + content: `You are simulating the execution of a tool called '${toolName}'. Here is the description of the tool: ${description}. Here are the instructions for the mock tool: ${mockInstructions}. Generate a realistic response as if the tool was actually executed with the given parameters.` + }, { + role: "user" as const, + content: `Generate a realistic response for the tool '${toolName}' with these parameters: ${args}. The response should be concise and focused on what the tool would actually return.` + }]; + + const { text } = await generateText({ + model: openai(MODEL), + messages, + }); + logger.log(`generated text: ${text}`); + + return text; +} + +// Helper to handle RAG tool calls +export async function invokeRagTool( + logger: PrefixLogger, + projectId: string, + query: string, + sourceIds: string[], + returnType: 'chunks' | 'content', + k: number +): Promise<{ + title: string; + name: string; + content: string; + docId: string; + sourceId: string; +}[]> { + logger = logger.child(`invokeRagTool`); + logger.log(`projectId: ${projectId}`); + logger.log(`query: ${query}`); + logger.log(`sourceIds: ${sourceIds.join(', ')}`); + logger.log(`returnType: ${returnType}`); + logger.log(`k: ${k}`); + + // Create embedding for question + const { embedding } = await embed({ + model: embeddingModel, + value: query, + }); + + // Fetch all data sources for this project + const sources = await dataSourcesCollection.find({ + projectId: projectId, + active: true, + }).toArray(); + const validSourceIds = sources + .filter(s => sourceIds.includes(s._id.toString())) // id should be in sourceIds + .filter(s => s.active) // should be active + .map(s => s._id.toString()); + logger.log(`valid source ids: ${validSourceIds.join(', ')}`); + + // if no sources found, return empty response + if (validSourceIds.length === 0) { + logger.log(`no valid source ids found, returning empty response`); + return []; + } + + // Perform vector search + const qdrantResults = await qdrantClient.query("embeddings", { + query: embedding, + filter: { + must: [ + { key: "projectId", match: { value: projectId } }, + { key: "sourceId", match: { any: validSourceIds } }, + ], + }, + limit: k, + with_payload: true, + }); + logger.log(`found ${qdrantResults.points.length} results`); + + // if return type is chunks, return the chunks + let results = qdrantResults.points.map((point) => { + const { title, name, content, docId, sourceId } = point.payload as z.infer['payload']; + return { + title, + name, + content, + docId, + sourceId, + }; + }); + + if (returnType === 'chunks') { + logger.log(`returning chunks`); + return results; + } + + // otherwise, fetch the doc contents from mongodb + const docs = await dataSourceDocsCollection.find({ + _id: { $in: results.map(r => new ObjectId(r.docId)) }, + }).toArray(); + logger.log(`fetched docs: ${docs.length}`); + + // map the results to the docs + results = results.map(r => { + const doc = docs.find(d => d._id.toString() === r.docId); + return { + ...r, + content: doc?.content || '', + }; + }); + + return results; +} + +export async function invokeWebhookTool( + logger: PrefixLogger, + projectId: string, + name: string, + input: any, +): Promise { + logger = logger.child(`invokeWebhookTool`); + logger.log(`projectId: ${projectId}`); + logger.log(`name: ${name}`); + logger.log(`input: ${JSON.stringify(input)}`); + + const project = await projectsCollection.findOne({ + "_id": projectId, + }); + if (!project) { + throw new Error('Project not found'); + } + + if (!project.webhookUrl) { + throw new Error('Webhook URL not found'); + } + + // prepare request body + const toolCall = { + id: crypto.randomUUID(), + type: "function" as const, + function: { + name, + arguments: JSON.stringify(input), + }, + } + const content = JSON.stringify({ + toolCall, + }); + const requestId = crypto.randomUUID(); + const bodyHash = crypto + .createHash('sha256') + .update(content, 'utf8') + .digest('hex'); + + // sign request + const jwt = await new SignJWT({ + requestId, + projectId, + bodyHash, + }) + .setProtectedHeader({ + alg: 'HS256', + typ: 'JWT', + }) + .setIssuer('rowboat') + .setAudience(project.webhookUrl) + .setSubject(`tool-call-${toolCall.id}`) + .setJti(requestId) + .setIssuedAt() + .setExpirationTime("5 minutes") + .sign(new TextEncoder().encode(project.secret)); + + // make request + const request = { + requestId, + content, + }; + const response = await fetch(project.webhookUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'x-signature-jwt': jwt, + }, + body: JSON.stringify(request), + }); + if (!response.ok) { + throw new Error(`Failed to call webhook: ${response.status}: ${response.statusText}`); + } + const responseBody = await response.json(); + return responseBody; +} + +// Helper to handle MCP tool calls +export async function invokeMcpTool( + logger: PrefixLogger, + projectId: string, + name: string, + input: any, + mcpServerName: string +) { + logger = logger.child(`invokeMcpTool`); + logger.log(`projectId: ${projectId}`); + logger.log(`name: ${name}`); + logger.log(`input: ${JSON.stringify(input)}`); + logger.log(`mcpServerName: ${mcpServerName}`); + + // Get project configuration + const project = await projectsCollection.findOne({ _id: projectId }); + if (!project) { + throw new Error(`project ${projectId} not found`); + } + + // get server url from project data + const mcpServerURL = project.customMcpServers?.[mcpServerName]?.serverUrl; + if (!mcpServerURL) { + throw new Error(`mcp server url not found for project ${projectId} and server ${mcpServerName}`); + } + + const client = await getMcpClient(mcpServerURL, mcpServerName); + const result = await client.callTool({ + name, + arguments: input, + }); + logger.log(`mcp tool result: ${JSON.stringify(result)}`); + await client.close(); + return result; +} + +// Helper to handle composio tool calls +export async function invokeComposioTool( + logger: PrefixLogger, + projectId: string, + name: string, + composioData: z.infer['composioData'] & {}, + input: any, +) { + logger = logger.child(`invokeComposioTool`); + logger.log(`projectId: ${projectId}`); + logger.log(`name: ${name}`); + logger.log(`input: ${JSON.stringify(input)}`); + + const { slug, toolkitSlug, noAuth } = composioData; + + let connectedAccountId: string | undefined = undefined; + if (!noAuth) { + const project = await projectsCollection.findOne({ _id: projectId }); + if (!project) { + throw new Error(`project ${projectId} not found`); + } + connectedAccountId = project.composioConnectedAccounts?.[toolkitSlug]?.id; + if (!connectedAccountId) { + throw new Error(`connected account id not found for project ${projectId} and toolkit ${toolkitSlug}`); + } + } + + const result = await composio.tools.execute(slug, { + userId: projectId, + arguments: input, + connectedAccountId: connectedAccountId, + }); + logger.log(`composio tool result: ${JSON.stringify(result)}`); + return result.data; +} + +// Helper to create RAG tool +export function createRagTool( + logger: PrefixLogger, + config: z.infer, + projectId: string +): Tool { + if (!config.ragDataSources?.length) { + throw new Error(`data sources not found for agent ${config.name}`); + } + + return tool({ + name: "rag_search", + description: config.description, + parameters: z.object({ + query: z.string().describe("The query to search for") + }), + async execute(input: { query: string }) { + const results = await invokeRagTool( + logger, + projectId, + input.query, + config.ragDataSources || [], + config.ragReturnType || 'chunks', + config.ragK || 3 + ); + return JSON.stringify({ + results, + }); + } + }); +} + +// Helper to create a mock tool +export function createMockTool( + logger: PrefixLogger, + config: z.infer, +): Tool { + return tool({ + name: config.name, + description: config.description, + strict: false, + parameters: { + type: 'object', + properties: config.parameters.properties, + required: config.parameters.required || [], + additionalProperties: true, + }, + async execute(input: any) { + try { + const result = await invokeMockTool( + logger, + config.name, + JSON.stringify(input), + config.description, + config.mockInstructions || '' + ); + return JSON.stringify({ + result, + }); + } catch (error) { + logger.log(`Error executing mock tool ${config.name}:`, error); + return JSON.stringify({ + error: `Mock tool execution failed: ${error}`, + }); + } + } + }); +} + +// Helper to create a webhook tool +export function createWebhookTool( + logger: PrefixLogger, + config: z.infer, + projectId: string, +): Tool { + const { name, description, parameters } = config; + + return tool({ + name, + description, + strict: false, + parameters: { + type: 'object', + properties: parameters.properties, + required: parameters.required || [], + additionalProperties: true, + }, + async execute(input: any) { + try { + const result = await invokeWebhookTool(logger, projectId, name, input); + return JSON.stringify({ + result, + }); + } catch (error) { + logger.log(`Error executing webhook tool ${config.name}:`, error); + return JSON.stringify({ + error: `Tool execution failed: ${error}`, + }); + } + } + }); +} + +// Helper to create an mcp tool +export function createMcpTool( + logger: PrefixLogger, + config: z.infer, + projectId: string +): Tool { + const { name, description, parameters, mcpServerName } = config; + + return tool({ + name, + description, + strict: false, + parameters: { + type: 'object', + properties: parameters.properties, + required: parameters.required || [], + additionalProperties: true, + }, + async execute(input: any) { + try { + const result = await invokeMcpTool(logger, projectId, name, input, mcpServerName || ''); + return JSON.stringify({ + result, + }); + } catch (error) { + logger.log(`Error executing mcp tool ${name}:`, error); + return JSON.stringify({ + error: `Tool execution failed: ${error}`, + }); + } + } + }); +} + +// Helper to create a composio tool +export function createComposioTool( + logger: PrefixLogger, + config: z.infer, + projectId: string +): Tool { + const { name, description, parameters, composioData } = config; + + if (!composioData) { + throw new Error(`composio data not found for tool ${name}`); + } + + return tool({ + name, + description, + strict: false, + parameters: { + type: 'object', + properties: parameters.properties, + required: parameters.required || [], + additionalProperties: true, + }, + async execute(input: any) { + try { + const result = await invokeComposioTool(logger, projectId, name, composioData, input); + return JSON.stringify({ + result, + }); + } catch (error) { + logger.log(`Error executing composio tool ${name}:`, error); + return JSON.stringify({ + error: `Tool execution failed: ${error}`, + }); + } + } + }); +} + +export function createTools( + logger: PrefixLogger, + projectId: string, + workflow: { tools: z.infer[] }, + toolConfig: Record>, +): Record { + const tools: Record = {}; + const toolLogger = logger.child('createTools'); + + toolLogger.log(`=== CREATING ${Object.keys(toolConfig).length} TOOLS ===`); + + for (const [toolName, config] of Object.entries(toolConfig)) { + toolLogger.log(`creating tool: ${toolName} (type: ${config.mockTool ? 'mock' : config.isMcp ? 'mcp' : config.isComposio ? 'composio' : 'webhook'})`); + + if (config.mockTool) { + tools[toolName] = createMockTool(logger, config); + toolLogger.log(`✓ created mock tool: ${toolName}`); + } else if (config.isMcp) { + tools[toolName] = createMcpTool(logger, config, projectId); + toolLogger.log(`✓ created mcp tool: ${toolName} (server: ${config.mcpServerName || 'unknown'})`); + } else if (config.isComposio) { + tools[toolName] = createComposioTool(logger, config, projectId); + toolLogger.log(`✓ created composio tool: ${toolName}`); + } else { + tools[toolName] = createWebhookTool(logger, config, projectId); + toolLogger.log(`✓ created webhook tool: ${toolName} (fallback)`); + } + } + + toolLogger.log(`=== TOOL CREATION COMPLETE ===`); + return tools; +} \ No newline at end of file diff --git a/apps/rowboat/app/lib/agent_instructions.ts b/apps/rowboat/app/lib/agent_instructions.ts index 6b0d442e..0fd914d8 100644 --- a/apps/rowboat/app/lib/agent_instructions.ts +++ b/apps/rowboat/app/lib/agent_instructions.ts @@ -121,11 +121,9 @@ export const CONVERSATION_TYPE_INSTRUCTIONS = (): string => ` export const TASK_TYPE_INSTRUCTIONS = (): string => ` - You are an agent that is part of a workflow of (one or more) interconnected agents that work together to be an assistant. -- Use the JSON format to convey your responses. The JSON should have 3 keys. -- The first key in the JSON response should be your "thought" - analysizing what has happened till now and what you need to do in this turn. -- The second key should be your "response". While you will put out a message, your response will not be shown directly to the user. Instead, your response will be used by the agent that might have invoked you and (possibly) other agents in the workflow. Therefore, your responses must be worded in such a way that it is useful for other agents and not addressed to the user. -- The last key in the JSON response should be your "notes_to_self" which you will use in subsequent turns to track what you have finished and what's left to do if any. -- IMPORTANT: If you have all the information to take action, such as calling a tool or writing a response, you should do that in the immediate turn. Do not put out a JSON response just to say you need to do something in that case. +- Your response will not be shown directly to the user. Instead, your response will be used by the agent that might have invoked you and (possibly) other agents in the workflow. Therefore, your responses must be worded in such a way that it is useful for other agents and not addressed to the user. +- Provide clear, direct responses that other agents can easily understand and act upon. +- IMPORTANT: If you have all the information to take action, such as calling a tool or writing a response, you should do that in the immediate turn. Do not delay action unnecessarily. - Reading the messages in the chat history will give you context about the conversation. - Seeing the tool calls that transfer / handoff control will help you understand the flow of the conversation and which agent produced each message. - These are high level instructions only. The user will provide more specific instructions which will be below. @@ -138,10 +136,7 @@ export const PIPELINE_TYPE_INSTRUCTIONS = (): string => ` - Your output will be passed to the next step in the pipeline (or returned as the final result if you're the last step). - CRITICAL: You CANNOT transfer to other agents or pipelines. You can only use tools to complete your specific task. - Focus ONLY on your designated role in the pipeline. Process the input, perform your specific task, and provide clear output. -- Use the JSON format to convey your responses. The JSON should have 3 keys: - - "thought": Analyze the input from the previous pipeline step and plan what you need to do - - "response": Your processed output that will be passed to the next pipeline step. Make this clear and actionable. - - "pipeline_context": Brief notes about what you accomplished for the pipeline flow +- Provide clear, actionable output that the next pipeline step can easily understand and work with. - Do NOT attempt to handle tasks outside your specific pipeline role. - Do NOT mention other agents or the pipeline structure to users. - Your response should be self-contained and ready to be consumed by the next pipeline step. diff --git a/apps/rowboat/app/lib/agents.ts b/apps/rowboat/app/lib/agents.ts index a8f617e1..83f06eb6 100644 --- a/apps/rowboat/app/lib/agents.ts +++ b/apps/rowboat/app/lib/agents.ts @@ -1,31 +1,78 @@ // External dependencies -import { Agent, AgentInputItem, run, tool, Tool } from "@openai/agents"; +import { Agent, AgentInputItem, run, Tool } from "@openai/agents"; import { RECOMMENDED_PROMPT_PREFIX } from "@openai/agents-core/extensions"; import { aisdk } from "@openai/agents-extensions"; import { createOpenAI } from "@ai-sdk/openai"; -import { CoreMessage, embed, generateText } from "ai"; -import { ObjectId } from "mongodb"; import { z } from "zod"; -import { composio } from "./composio/composio"; -import { SignJWT } from "jose"; import crypto from "crypto"; // Internal dependencies -import { embeddingModel } from '../lib/embedding'; -import { getMcpClient } from "./mcp"; -import { dataSourceDocsCollection, dataSourcesCollection, projectsCollection } from "./mongodb"; -import { qdrantClient } from '../lib/qdrant'; -import { EmbeddingRecord } from "./types/datasource_types"; +import { createTools, createRagTool } from "./agent-tools"; import { ConnectedEntity, sanitizeTextWithMentions, Workflow, WorkflowAgent, WorkflowPipeline, WorkflowPrompt, WorkflowTool } from "./types/workflow_types"; import { CHILD_TRANSFER_RELATED_INSTRUCTIONS, CONVERSATION_TYPE_INSTRUCTIONS, PIPELINE_TYPE_INSTRUCTIONS, RAG_INSTRUCTIONS, TASK_TYPE_INSTRUCTIONS } from "./agent_instructions"; import { PrefixLogger } from "./utils"; import { Message, AssistantMessage, AssistantMessageWithToolCalls, ToolMessage } from "./types/types"; +// Native handoff support +import { createAgentHandoff, getSchemaForAgent, createContextFilterForAgent } from "./agent-handoffs"; +import { PipelineStateManager } from "./pipeline-state-manager"; -// Make everything available as a promise +// Provider configuration const PROVIDER_API_KEY = process.env.PROVIDER_API_KEY || process.env.OPENAI_API_KEY || ''; const PROVIDER_BASE_URL = process.env.PROVIDER_BASE_URL || undefined; const MODEL = process.env.PROVIDER_DEFAULT_MODEL || 'gpt-4o'; +// Feature flags +const USE_NATIVE_HANDOFFS = process.env.USE_NATIVE_HANDOFFS === 'true'; + +// Agent execution limits +const MAX_AGENT_TURNS = 25; // Configurable limit for agent SDK turns (default was 10) + +// Internal types for agent handoffs and pipeline management +// Context passing schemas for SDK handoffs (OpenAI API compatible) +export const HandoffContext = z.object({ + reason: z.enum(['direct_handoff', 'pipeline_execution', 'task_delegation']).default('direct_handoff'), + parentAgent: z.string().default('unknown'), + transferCount: z.number().default(0), + // Allow metadata to be object, string, or null to handle AI model variations + metadata: z.union([z.record(z.any()), z.string(), z.null()]).default(null) +}); + +export const PipelineContext = HandoffContext.extend({ + pipelineName: z.string().default('unknown_pipeline'), + currentStep: z.number().default(0), + totalSteps: z.number().default(1), + isLastStep: z.boolean().default(false), + // Allow flexible types for AI model compatibility + pipelineData: z.union([z.record(z.any()), z.string(), z.null()]).default(null), + stepResults: z.union([z.array(z.record(z.any())), z.string(), z.null()]).default(null) +}); + +export const TaskContext = HandoffContext.extend({ + taskType: z.string().default('general_task'), + priority: z.enum(['low', 'medium', 'high']).default('medium'), + deadline: z.union([z.string().datetime(), z.string(), z.null()]).default(null), + requirements: z.union([z.array(z.string()), z.string(), z.null()]).default(null), + resources: z.union([z.record(z.any()), z.string(), z.null()]).default(null) +}); + +// Pipeline execution state for state manager +export const PipelineExecutionState = z.object({ + pipelineName: z.string(), + currentStep: z.number(), + totalSteps: z.number(), + callingAgent: z.string(), + pipelineData: z.union([z.record(z.any()), z.string(), z.null()]).default(null), + stepResults: z.union([z.array(z.record(z.any())), z.string(), z.null()]).default(null), + currentStepResult: z.union([z.record(z.any()), z.string(), z.null()]).default(null), + startTime: z.string().datetime(), + metadata: z.union([z.record(z.any()), z.string(), z.null()]).default(null) +}); + +// Agent state tracking for tool call completion +interface AgentState { + pendingToolCalls: number; +} + const openai = createOpenAI({ apiKey: PROVIDER_API_KEY, baseURL: PROVIDER_BASE_URL, @@ -45,456 +92,6 @@ const ZOutMessage = z.union([ ToolMessage, ]); -// Helper to handle mock tool responses -async function invokeMockTool( - logger: PrefixLogger, - toolName: string, - args: string, - description: string, - mockInstructions: string -): Promise { - logger = logger.child(`invokeMockTool`); - logger.log(`toolName: ${toolName}`); - logger.log(`args: ${args}`); - logger.log(`description: ${description}`); - logger.log(`mockInstructions: ${mockInstructions}`); - - const messages: CoreMessage[] = [{ - role: "system" as const, - content: `You are simulating the execution of a tool called '${toolName}'. Here is the description of the tool: ${description}. Here are the instructions for the mock tool: ${mockInstructions}. Generate a realistic response as if the tool was actually executed with the given parameters.` - }, { - role: "user" as const, - content: `Generate a realistic response for the tool '${toolName}' with these parameters: ${args}. The response should be concise and focused on what the tool would actually return.` - }]; - - const { text } = await generateText({ - model: openai(MODEL), - messages, - }); - logger.log(`generated text: ${text}`); - - return text; -} - -// Helper to handle RAG tool calls -async function invokeRagTool( - logger: PrefixLogger, - projectId: string, - query: string, - sourceIds: string[], - returnType: 'chunks' | 'content', - k: number -): Promise<{ - title: string; - name: string; - content: string; - docId: string; - sourceId: string; -}[]> { - logger = logger.child(`invokeRagTool`); - logger.log(`projectId: ${projectId}`); - logger.log(`query: ${query}`); - logger.log(`sourceIds: ${sourceIds.join(', ')}`); - logger.log(`returnType: ${returnType}`); - logger.log(`k: ${k}`); - - // Create embedding for question - const { embedding } = await embed({ - model: embeddingModel, - value: query, - }); - - // Fetch all data sources for this project - const sources = await dataSourcesCollection.find({ - projectId: projectId, - active: true, - }).toArray(); - const validSourceIds = sources - .filter(s => sourceIds.includes(s._id.toString())) // id should be in sourceIds - .filter(s => s.active) // should be active - .map(s => s._id.toString()); - logger.log(`valid source ids: ${validSourceIds.join(', ')}`); - - // if no sources found, return empty response - if (validSourceIds.length === 0) { - logger.log(`no valid source ids found, returning empty response`); - return []; - } - - // Perform vector search - const qdrantResults = await qdrantClient.query("embeddings", { - query: embedding, - filter: { - must: [ - { key: "projectId", match: { value: projectId } }, - { key: "sourceId", match: { any: validSourceIds } }, - ], - }, - limit: k, - with_payload: true, - }); - logger.log(`found ${qdrantResults.points.length} results`); - - // if return type is chunks, return the chunks - let results = qdrantResults.points.map((point) => { - const { title, name, content, docId, sourceId } = point.payload as z.infer['payload']; - return { - title, - name, - content, - docId, - sourceId, - }; - }); - - if (returnType === 'chunks') { - logger.log(`returning chunks`); - return results; - } - - // otherwise, fetch the doc contents from mongodb - const docs = await dataSourceDocsCollection.find({ - _id: { $in: results.map(r => new ObjectId(r.docId)) }, - }).toArray(); - logger.log(`fetched docs: ${docs.length}`); - - // map the results to the docs - results = results.map(r => { - const doc = docs.find(d => d._id.toString() === r.docId); - return { - ...r, - content: doc?.content || '', - }; - }); - - return results; -} - -async function invokeWebhookTool( - logger: PrefixLogger, - projectId: string, - name: string, - input: any, -): Promise { - logger = logger.child(`invokeWebhookTool`); - logger.log(`projectId: ${projectId}`); - logger.log(`name: ${name}`); - logger.log(`input: ${JSON.stringify(input)}`); - - const project = await projectsCollection.findOne({ - "_id": projectId, - }); - if (!project) { - throw new Error('Project not found'); - } - - if (!project.webhookUrl) { - throw new Error('Webhook URL not found'); - } - - // prepare request body - const toolCall: z.infer[number] = { - id: crypto.randomUUID(), - type: "function", - function: { - name, - arguments: JSON.stringify(input), - }, - } - const content = JSON.stringify({ - toolCall, - }); - const requestId = crypto.randomUUID(); - const bodyHash = crypto - .createHash('sha256') - .update(content, 'utf8') - .digest('hex'); - - // sign request - const jwt = await new SignJWT({ - requestId, - projectId, - bodyHash, - }) - .setProtectedHeader({ - alg: 'HS256', - typ: 'JWT', - }) - .setIssuer('rowboat') - .setAudience(project.webhookUrl) - .setSubject(`tool-call-${toolCall.id}`) - .setJti(requestId) - .setIssuedAt() - .setExpirationTime("5 minutes") - .sign(new TextEncoder().encode(project.secret)); - - // make request - const request = { - requestId, - content, - }; - const response = await fetch(project.webhookUrl, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'x-signature-jwt': jwt, - }, - body: JSON.stringify(request), - }); - if (!response.ok) { - throw new Error(`Failed to call webhook: ${response.status}: ${response.statusText}`); - } - const responseBody = await response.json(); - return responseBody; -} - -// Helper to handle MCP tool calls -async function invokeMcpTool( - logger: PrefixLogger, - projectId: string, - name: string, - input: any, - mcpServerName: string -) { - logger = logger.child(`invokeMcpTool`); - logger.log(`projectId: ${projectId}`); - logger.log(`name: ${name}`); - logger.log(`input: ${JSON.stringify(input)}`); - logger.log(`mcpServerName: ${mcpServerName}`); - - // Get project configuration - const project = await projectsCollection.findOne({ _id: projectId }); - if (!project) { - throw new Error(`project ${projectId} not found`); - } - - // get server url from project data - const mcpServerURL = project.customMcpServers?.[mcpServerName]?.serverUrl; - if (!mcpServerURL) { - throw new Error(`mcp server url not found for project ${projectId} and server ${mcpServerName}`); - } - - const client = await getMcpClient(mcpServerURL, mcpServerName); - const result = await client.callTool({ - name, - arguments: input, - }); - logger.log(`mcp tool result: ${JSON.stringify(result)}`); - await client.close(); - return result; -} - -// Helper to handle composio tool calls -async function invokeComposioTool( - logger: PrefixLogger, - projectId: string, - name: string, - composioData: z.infer['composioData'] & {}, - input: any, -) { - logger = logger.child(`invokeComposioTool`); - logger.log(`projectId: ${projectId}`); - logger.log(`name: ${name}`); - logger.log(`input: ${JSON.stringify(input)}`); - - const { slug, toolkitSlug, noAuth } = composioData; - - let connectedAccountId: string | undefined = undefined; - if (!noAuth) { - const project = await projectsCollection.findOne({ _id: projectId }); - if (!project) { - throw new Error(`project ${projectId} not found`); - } - connectedAccountId = project.composioConnectedAccounts?.[toolkitSlug]?.id; - if (!connectedAccountId) { - throw new Error(`connected account id not found for project ${projectId} and toolkit ${toolkitSlug}`); - } - } - - const result = await composio.tools.execute(slug, { - userId: projectId, - arguments: input, - connectedAccountId: connectedAccountId, - }); - logger.log(`composio tool result: ${JSON.stringify(result)}`); - return result.data; -} - -// Helper to create RAG tool -function createRagTool( - logger: PrefixLogger, - config: z.infer, - projectId: string -): Tool { - if (!config.ragDataSources?.length) { - throw new Error(`data sources not found for agent ${config.name}`); - } - - return tool({ - name: "rag_search", - description: config.description, - parameters: z.object({ - query: z.string().describe("The query to search for") - }), - async execute(input: { query: string }) { - const results = await invokeRagTool( - logger, - projectId, - input.query, - config.ragDataSources || [], - config.ragReturnType || 'chunks', - config.ragK || 3 - ); - return JSON.stringify({ - results, - }); - } - }); -} - -// Helper to create a mock tool -function createMockTool( - logger: PrefixLogger, - config: z.infer, -): Tool { - return tool({ - name: config.name, - description: config.description, - strict: false, - parameters: { - type: 'object', - properties: config.parameters.properties, - required: config.parameters.required || [], - additionalProperties: true, - }, - async execute(input: any) { - try { - const result = await invokeMockTool( - logger, - config.name, - JSON.stringify(input), - config.description, - config.mockInstructions || '' - ); - return JSON.stringify({ - result, - }); - } catch (error) { - logger.log(`Error executing mock tool ${config.name}:`, error); - return JSON.stringify({ - error: `Mock tool execution failed: ${error}`, - }); - } - } - }); -} - -// Helper to create a webhook tool -function createWebhookTool( - logger: PrefixLogger, - config: z.infer, - projectId: string, -): Tool { - const { name, description, parameters } = config; - - return tool({ - name, - description, - strict: false, - parameters: { - type: 'object', - properties: parameters.properties, - required: parameters.required || [], - additionalProperties: true, - }, - async execute(input: any) { - try { - const result = await invokeWebhookTool(logger, projectId, name, input); - return JSON.stringify({ - result, - }); - } catch (error) { - logger.log(`Error executing webhook tool ${config.name}:`, error); - return JSON.stringify({ - error: `Tool execution failed: ${error}`, - }); - } - } - }); -} - -// Helper to create an mcp tool -function createMcpTool( - logger: PrefixLogger, - config: z.infer, - projectId: string -): Tool { - const { name, description, parameters, mcpServerName } = config; - - return tool({ - name, - description, - strict: false, - parameters: { - type: 'object', - properties: parameters.properties, - required: parameters.required || [], - additionalProperties: true, - }, - async execute(input: any) { - try { - const result = await invokeMcpTool(logger, projectId, name, input, mcpServerName || ''); - return JSON.stringify({ - result, - }); - } catch (error) { - logger.log(`Error executing mcp tool ${name}:`, error); - return JSON.stringify({ - error: `Tool execution failed: ${error}`, - }); - } - } - }); -} - -// Helper to create a composio tool -function createComposioTool( - logger: PrefixLogger, - config: z.infer, - projectId: string -): Tool { - const { name, description, parameters, composioData } = config; - - if (!composioData) { - throw new Error(`composio data not found for tool ${name}`); - } - - return tool({ - name, - description, - strict: false, - parameters: { - type: 'object', - properties: parameters.properties, - required: parameters.required || [], - additionalProperties: true, - }, - async execute(input: any) { - try { - const result = await invokeComposioTool(logger, projectId, name, composioData, input); - return JSON.stringify({ - result, - }); - } catch (error) { - logger.log(`Error executing composio tool ${name}:`, error); - return JSON.stringify({ - error: `Tool execution failed: ${error}`, - }); - } - } - }); -} - // Helper to create an agent function createAgent( logger: PrefixLogger, @@ -517,10 +114,10 @@ ${config.description} ## About You -${config.outputVisibility === 'user_facing' - ? CONVERSATION_TYPE_INSTRUCTIONS() - : config.type === 'pipeline' - ? PIPELINE_TYPE_INSTRUCTIONS() +${config.outputVisibility === 'user_facing' + ? CONVERSATION_TYPE_INSTRUCTIONS() + : config.type === 'pipeline' + ? PIPELINE_TYPE_INSTRUCTIONS() : TASK_TYPE_INSTRUCTIONS()} ## Instructions @@ -535,12 +132,12 @@ ${CHILD_TRANSFER_RELATED_INSTRUCTIONS} `; let { sanitized, entities } = sanitizeTextWithMentions(instructions, workflow, config); - + // Remove agent transfer instructions for pipeline agents if (config.type === 'pipeline') { sanitized = sanitized.replace(CHILD_TRANSFER_RELATED_INSTRUCTIONS, ''); } - + agentLogger.log(`instructions: ${JSON.stringify(sanitized)}`); agentLogger.log(`mentions: ${JSON.stringify(entities)}`); @@ -610,6 +207,7 @@ function getStartOfTurnAgentName( logger: PrefixLogger, messages: z.infer[], agentConfig: Record>, + pipelineConfig: Record>, workflow: z.infer, ): string { @@ -626,7 +224,7 @@ function getStartOfTurnAgentName( } } return stack; - } + } logger = logger.child(`getStartOfTurnAgentName`); const startAgentStack = createAgentCallStack(messages); @@ -635,12 +233,20 @@ function getStartOfTurnAgentName( // if control type is retain, return last agent const lastAgentName = startAgentStack.pop() || workflow.startAgent; logger.log(`setting last agent name initially to: ${lastAgentName}`); + + // Check if this is a pipeline + const lastPipelineConfig = pipelineConfig[lastAgentName]; + if (lastPipelineConfig) { + logger.log(`last agent ${lastAgentName} is a pipeline, returning pipeline: ${lastAgentName}`); + return lastAgentName; + } + const lastAgentConfig = agentConfig[lastAgentName]; if (!lastAgentConfig) { logger.log(`last agent ${lastAgentName} not found in agent config, returning start agent: ${workflow.startAgent}`); return workflow.startAgent; } - + // For other agents, check control type switch (lastAgentConfig.controlType) { case 'retain': @@ -757,7 +363,7 @@ function ensureSystemMessage(logger: PrefixLogger, messages: z.infer): { ...acc, [prompt.name]: prompt }), {}); - + const pipelineConfig: Record> = (workflow.pipelines || []).reduce((acc, pipeline) => ({ ...acc, [pipeline.name]: pipeline }), {}); - + return { agentConfig, toolConfig, promptConfig, pipelineConfig }; } @@ -807,40 +413,152 @@ async function* emitGreetingTurn(logger: PrefixLogger, workflow: z.infer, - toolConfig: Record>, -): Record { - const tools: Record = {}; - const toolLogger = logger.child('createTools'); - - toolLogger.log(`=== CREATING ${Object.keys(toolConfig).length} TOOLS ===`); + agentConfig: Record>, + tools: Record, + promptConfig: Record>, + pipelineConfig: Record>, +): { agents: Record, mentions: Record[]>, originalInstructions: Record, originalHandoffs: Record } { + const agentsLogger = logger.child('createAgentsWithNativeHandoffs'); + const agents: Record = {}; + const mentions: Record[]> = {}; + const originalInstructions: Record = {}; + const originalHandoffs: Record = {}; - for (const [toolName, config] of Object.entries(toolConfig)) { - toolLogger.log(`creating tool: ${toolName} (type: ${config.mockTool ? 'mock' : config.isMcp ? 'mcp' : config.isComposio ? 'composio' : 'webhook'})`); + agentsLogger.log(`=== CREATING ${Object.keys(agentConfig).length} AGENTS WITH NATIVE HANDOFFS ===`); + + // Create pipeline entities that will be available for @ referencing + const pipelineEntities: z.infer[] = Object.keys(pipelineConfig).map(pipelineName => ({ + type: 'pipeline' as const, + name: pipelineName, + })); + if (pipelineEntities.length > 0) { + agentsLogger.log(`available pipeline entities for @ referencing: ${pipelineEntities.map(p => p.name).join(', ')}`); + } + + // Create agents first + for (const [agentName, config] of Object.entries(agentConfig)) { + agentsLogger.log(`creating agent: ${agentName} (type: ${config.outputVisibility}, control: ${config.controlType})`); - if (config.mockTool) { - tools[toolName] = createMockTool(logger, config); - toolLogger.log(`✓ created mock tool: ${toolName}`); - } else if (config.isMcp) { - tools[toolName] = createMcpTool(logger, config, projectId); - toolLogger.log(`✓ created mcp tool: ${toolName} (server: ${config.mcpServerName || 'unknown'})`); - } else if (config.isComposio) { - tools[toolName] = createComposioTool(logger, config, projectId); - toolLogger.log(`✓ created composio tool: ${toolName}`); + const { agent, entities } = createAgent( + logger, + projectId, + config, + tools, + workflow, + promptConfig, + ); + agents[agentName] = agent; + + // Add pipeline entities to the agent's available mentions (unless it's a pipeline agent itself) + let agentEntities = entities; + if (config.type !== 'pipeline') { + agentEntities = [...entities, ...pipelineEntities]; + agentsLogger.log(`${agentName} can reference: ${entities.length} entities + ${pipelineEntities.length} pipelines`); } else { - tools[toolName] = createWebhookTool(logger, config, projectId); - toolLogger.log(`✓ created webhook tool: ${toolName} (fallback)`); + agentsLogger.log(`${agentName} (pipeline agent) can reference: ${entities.length} entities only`); + } + + mentions[agentName] = agentEntities; + originalInstructions[agentName] = agent.instructions as string; + } + + agentsLogger.log(`=== SETTING UP NATIVE HANDOFFS ===`); + + // Set up SDK native handoffs + for (const [agentName, agent] of Object.entries(agents)) { + const connectedAgentNames = (mentions[agentName] || []).filter(e => e.type === 'agent').map(e => e.name); + const connectedPipelineNames = (mentions[agentName] || []).filter(e => e.type === 'pipeline').map(e => e.name); + + // Pipeline agents have no direct handoffs - they're controlled by the pipeline manager + const agentConfigObj = agentConfig[agentName]; + if (agentConfigObj?.type === 'pipeline') { + agent.handoffs = []; + originalHandoffs[agentName] = []; + agentsLogger.log(`${agentName} is a pipeline agent - no direct handoffs`); + continue; + } + + // Create SDK handoffs for connected agents + const agentHandoffs: any[] = []; + + // Regular agent handoffs + for (const targetAgentName of connectedAgentNames) { + const targetAgent = agents[targetAgentName]; + const targetConfig = agentConfig[targetAgentName]; + + if (!targetAgent || !targetConfig) continue; + + // Skip pipeline agents as direct handoff targets + if (targetConfig.type === 'pipeline') continue; + + const handoffType = targetConfig.outputVisibility === 'internal' ? 'task' : 'direct'; + + const handoff = createAgentHandoff(targetAgent, handoffType, { + inputSchema: getSchemaForAgent(targetConfig), + onHandoff: (context, input) => { + agentsLogger.log(`🔄 SDK Handoff: ${agentName} -> ${targetAgentName} (${handoffType})`); + }, + inputFilter: createContextFilterForAgent(targetConfig), + logger: agentsLogger + }); + + agentHandoffs.push(handoff); + } + + // Pipeline handoffs - create handoff to first agent of each pipeline + for (const pipelineName of connectedPipelineNames) { + const pipeline = pipelineConfig[pipelineName]; + if (pipeline && pipeline.agents.length > 0) { + const firstAgentName = pipeline.agents[0]; + const firstAgent = agents[firstAgentName]; + + if (firstAgent && !agentHandoffs.some(h => h.agent.name === firstAgentName)) { + const pipelineHandoff = createAgentHandoff(firstAgent, 'pipeline', { + onHandoff: (context, input) => { + agentsLogger.log(`🔄 Pipeline Handoff: ${agentName} -> ${pipelineName} (starting with ${firstAgentName})`); + // TODO: Initialize pipeline state here + }, + logger: agentsLogger + }); + + agentHandoffs.push(pipelineHandoff); + agentsLogger.log(`${agentName} pipeline mention ${pipelineName} -> SDK handoff to first agent: ${firstAgentName}`); + } + } + } + + agent.handoffs = agentHandoffs; + originalHandoffs[agentName] = agentHandoffs; + agentsLogger.log(`set ${agentHandoffs.length} SDK handoffs for ${agentName}`); + } + + // Pipeline agents still get their metadata for compatibility + agentsLogger.log(`=== SETTING UP PIPELINE METADATA ===`); + for (const [pipelineName, pipeline] of Object.entries(pipelineConfig)) { + for (let i = 0; i < pipeline.agents.length; i++) { + const currentAgentName = pipeline.agents[i]; + const currentAgent = agents[currentAgentName]; + + if (currentAgent) { + (currentAgent as any).pipelineName = pipelineName; + (currentAgent as any).pipelineIndex = i; + (currentAgent as any).isLastInPipeline = i === pipeline.agents.length - 1; + agentsLogger.log(`pipeline agent ${currentAgentName} metadata: pipeline=${pipelineName}, index=${i}`); + } } } - - toolLogger.log(`=== TOOL CREATION COMPLETE ===`); - return tools; + + return { agents, mentions, originalInstructions, originalHandoffs }; } -function createAgents( +// Legacy agent creation (existing implementation) +function createAgentsLegacy( logger: PrefixLogger, projectId: string, workflow: z.infer, @@ -869,12 +587,12 @@ function createAgents( // create agents for (const [agentName, config] of Object.entries(agentConfig)) { agentsLogger.log(`creating agent: ${agentName} (type: ${config.outputVisibility}, control: ${config.controlType})`); - + // Pipeline agents get special handling: // - Different instruction template (PIPELINE_TYPE_INSTRUCTIONS) // - Filtered mentions (tools only, no agents) // - No agent transfer instructions - + const { agent, entities } = createAgent( logger, projectId, @@ -884,7 +602,7 @@ function createAgents( promptConfig, ); agents[agentName] = agent; - + // Add pipeline entities to the agent's available mentions (unless it's a pipeline agent itself) // Pipeline agents cannot reference other agents or pipelines, only tools let agentEntities = entities; @@ -894,7 +612,7 @@ function createAgents( } else { agentsLogger.log(`${agentName} (pipeline agent) can reference: ${entities.length} entities only`); } - + mentions[agentName] = agentEntities; originalInstructions[agentName] = agent.instructions as string; // handoffs will be set after all agents are created @@ -906,17 +624,17 @@ function createAgents( for (const [agentName, agent] of Object.entries(agents)) { const connectedAgentNames = (mentions[agentName] || []).filter(e => e.type === 'agent').map(e => e.name); const connectedPipelineNames = (mentions[agentName] || []).filter(e => e.type === 'pipeline').map(e => e.name); - + // Pipeline agents have no agent handoffs (filtered out in validatePipelineAgentMentions) // They only have tool connections, no agent transfers allowed - + // Filter out pipeline agents from being handoff targets // Only allow handoffs to non-pipeline agents const validAgentNames = connectedAgentNames.filter(name => { const targetConfig = agentConfig[name]; return targetConfig && targetConfig.type !== 'pipeline'; }); - + // Convert pipeline mentions to handoffs to the first agent in each pipeline const pipelineFirstAgents: string[] = []; for (const pipelineName of connectedPipelineNames) { @@ -929,10 +647,10 @@ function createAgents( } } } - + // Combine regular agent handoffs with pipeline first agents const allHandoffTargets = [...validAgentNames, ...pipelineFirstAgents]; - + // Only store Agent objects in handoffs (filter out Handoff if present) const agentHandoffs = allHandoffTargets.map(e => agents[e]).filter(Boolean) as Agent[]; agent.handoffs = agentHandoffs; @@ -944,30 +662,30 @@ function createAgents( agentsLogger.log(`=== SETTING UP PIPELINE CHAINS ===`); for (const [pipelineName, pipeline] of Object.entries(pipelineConfig)) { agentsLogger.log(`setting up pipeline chain: ${pipelineName} -> [${pipeline.agents.join(' -> ')}]`); - + for (let i = 0; i < pipeline.agents.length; i++) { const currentAgentName = pipeline.agents[i]; const currentAgent = agents[currentAgentName]; - + if (!currentAgent) { agentsLogger.log(`warning: pipeline agent ${currentAgentName} not found in agent config`); continue; } - + // Pipeline agents have NO handoffs - they just execute once currentAgent.handoffs = []; - + // Add pipeline metadata to the agent for easy lookup (currentAgent as any).pipelineName = pipelineName; (currentAgent as any).pipelineIndex = i; (currentAgent as any).isLastInPipeline = i === pipeline.agents.length - 1; - + // Update originalHandoffs to reflect the final pipeline state originalHandoffs[currentAgentName] = []; - + agentsLogger.log(`pipeline agent ${currentAgentName} has no handoffs (will be controlled by pipeline controller)`); agentsLogger.log(`pipeline agent ${currentAgentName} metadata: pipeline=${pipelineName}, index=${i}, isLast=${i === pipeline.agents.length - 1}`); - + // Configure pipeline agents to relinquish control after completing their task const agentConfigObj = agentConfig[currentAgentName]; if (agentConfigObj && agentConfigObj.type === 'pipeline') { @@ -1023,13 +741,13 @@ function maybeInjectGiveUpControlInstructions( injectLogger.log(`isInternal: ${isInternal}`); injectLogger.log(`isPipeline: ${isPipeline}`); injectLogger.log(`isRetain: ${isRetain}`); - + // For pipeline agents, they should continue pipeline execution, so no need to inject give up control if (isPipeline) { injectLogger.log(`Pipeline agent ${childAgentName} continues pipeline execution, no give up control needed`); return; } - + if (!isInternal && isRetain) { // inject give up control instructions agents[childAgentName].instructions = getGiveUpControlInstructions(agents[childAgentName], parentAgentName, injectLogger); @@ -1042,6 +760,418 @@ function maybeInjectGiveUpControlInstructions( } } +// Handle raw model stream events +async function* handleRawModelStreamEvent( + event: any, + agentName: string, + turnMsgs: z.infer[], + usageTracker: UsageTracker, + eventLogger: PrefixLogger, + getAgentState?: (agentName: string) => AgentState +): AsyncIterable | z.infer> { + if (event.data.type === 'response_done') { + // Count tool calls (excluding transfer_to_* calls) + const toolCallCount = event.data.response.output.filter( + (output: any) => output.type === 'function_call' && !output.name.startsWith('transfer_to') + ).length; + + // If we have tool calls, increment pending counter + if (toolCallCount > 0 && getAgentState) { + const state = getAgentState(agentName); + state.pendingToolCalls += toolCallCount; + eventLogger.log(`🔧 Agent ${agentName} has ${toolCallCount} new tool calls (total: ${state.pendingToolCalls})`); + } + + for (const output of event.data.response.output) { + // handle tool call invocation + // except for transfer_to_* tool calls + if (output.type === 'function_call' && !output.name.startsWith('transfer_to')) { + const m: z.infer = { + role: 'assistant', + content: null, + toolCalls: [{ + id: output.callId, + type: 'function', + function: { + name: output.name, + arguments: output.arguments, + }, + }], + agentName: agentName, + }; + + // add message to turn + turnMsgs.push(m); + + // emit event + yield* emitEvent(eventLogger, m); + } + } + + // update usage information + usageTracker.increment( + event.data.response.usage.totalTokens, + event.data.response.usage.inputTokens, + event.data.response.usage.outputTokens + ); + eventLogger.log(`updated usage information: ${JSON.stringify(usageTracker.get())}`); + } +} + +// Handle native SDK handoff events +async function* handleNativeHandoffEvent( + event: any, + agentName: string, + agentConfig: Record>, + agents: Record, + pipelineConfig: Record>, + stack: string[], + turnMsgs: z.infer[], + transferCounter: AgentTransferCounter, + pipelineStateManager: PipelineStateManager, + originalInstructions: Record, + originalHandoffs: Record, + eventLogger: PrefixLogger, + loopLogger: PrefixLogger +): AsyncIterable | z.infer | { newAgentName: string; shouldContinue?: boolean }> { + eventLogger.log(`🔄 NATIVE HANDOFF EVENT: ${agentName} -> ${event.item.targetAgent.name}`); + + // skip if its the same agent + if (agentName === event.item.targetAgent.name) { + eventLogger.log(`⚠️ SKIPPING: handoff to same agent: ${agentName}`); + return; + } + + const targetAgentName = event.item.targetAgent.name; + const targetAgentConfig = agentConfig[targetAgentName]; + + // Check if this is a pipeline-related handoff + const isTargetPipelineAgent = targetAgentConfig?.type === 'pipeline'; + const isSourceStartingPipeline = pipelineStateManager && !pipelineStateManager.isAgentInPipeline(agentName); + + if (isTargetPipelineAgent && isSourceStartingPipeline) { + // Starting a new pipeline execution + eventLogger.log(`🚀 Starting pipeline execution: ${agentName} -> ${targetAgentName}`); + + // Find which pipeline this agent belongs to + let targetPipelineName = ''; + let targetPipeline: z.infer | null = null; + + for (const [pipelineName, pipeline] of Object.entries(pipelineConfig)) { + if (pipeline.agents.includes(targetAgentName)) { + targetPipelineName = pipelineName; + targetPipeline = pipeline; + break; + } + } + + if (targetPipeline) { + // Initialize pipeline state + const pipelineState = pipelineStateManager!.initializePipelineExecution( + targetPipelineName, + agentName, + targetPipeline, + {} // TODO: Extract initial data from handoff input + ); + + eventLogger.log(`📋 Initialized pipeline "${targetPipelineName}" with ${targetPipeline.agents.length} steps`); + } + } + + // Handle pipeline step completion and continuation + if (pipelineStateManager?.isAgentInPipeline(agentName)) { + eventLogger.log(`🔄 Pipeline step handoff from ${agentName} to ${targetAgentName}`); + + // This is handled by the pipeline state manager + // The handoff event will trigger the next pipeline step + const result = await pipelineStateManager.handlePipelineExecution( + agentName, + pipelineConfig, + agents, + {} // TODO: Extract step result from event data + ); + + if (result.action === 'complete') { + eventLogger.log(`✅ Pipeline completed, returning to ${result.returnToAgent}`); + yield { newAgentName: result.returnToAgent || agentName }; + return; + } else if (result.action === 'handoff' && result.nextAgent) { + eventLogger.log(`➡️ Pipeline continuing to ${result.nextAgent}`); + yield { newAgentName: result.nextAgent }; + return; + } + } + + // Regular handoff handling (non-pipeline) + const maxCalls = targetAgentConfig?.maxCallsPerParentAgent || 3; + const currentCalls = transferCounter.get(agentName, targetAgentName); + + if (targetAgentConfig?.outputVisibility === 'internal' && currentCalls >= maxCalls) { + eventLogger.log(`⚠️ SKIPPING: handoff to ${targetAgentName} - max calls ${maxCalls} exceeded from ${agentName}`); + return; + } + + eventLogger.log(`📊 TRANSFER COUNT: ${agentName} -> ${targetAgentName} = ${currentCalls}/${maxCalls}`); + + // Update transfer counter + transferCounter.increment(agentName, targetAgentName); + + loopLogger.log(`🔄 AGENT SWITCH: ${agentName} -> ${targetAgentName} (reason: native SDK handoff)`); + + // Add current agent to stack only if new agent is internal or pipeline + const newAgentConfig = agentConfig[targetAgentName]; + if (newAgentConfig?.outputVisibility === 'internal' || newAgentConfig?.type === 'pipeline') { + stack.push(agentName); + loopLogger.log(`📚 STACK PUSH: ${agentName} (new agent ${targetAgentName} is internal/pipeline)`); + loopLogger.log(`📚 STACK NOW: [${stack.join(' -> ')}]`); + } + + // Return the new agent name for the caller to handle + yield { newAgentName: targetAgentName }; +} + +// Handle handoff events (legacy) +async function* handleHandoffEvent( + event: any, + agentName: string, + agentConfig: Record>, + agents: Record, + stack: string[], + turnMsgs: z.infer[], + transferCounter: AgentTransferCounter, + originalInstructions: Record, + originalHandoffs: Record, + eventLogger: PrefixLogger, + loopLogger: PrefixLogger +): AsyncIterable | z.infer | { newAgentName: string }> { + eventLogger.log(`🔄 HANDOFF EVENT: ${agentName} -> ${event.item.targetAgent.name}`); + + // skip if its the same agent + if (agentName === event.item.targetAgent.name) { + eventLogger.log(`⚠️ SKIPPING: handoff to same agent: ${agentName}`); + return; + } + + // Only apply max calls limit to internal agents (task agents) + const targetAgentConfig = agentConfig[event.item.targetAgent.name]; + if (targetAgentConfig?.outputVisibility === 'internal') { + const maxCalls = targetAgentConfig?.maxCallsPerParentAgent || 3; + const currentCalls = transferCounter.get(agentName, event.item.targetAgent.name); + if (currentCalls >= maxCalls) { + eventLogger.log(`⚠️ SKIPPING: handoff to ${event.item.targetAgent.name} - max calls ${maxCalls} exceeded from ${agentName}`); + return; + } + eventLogger.log(`📊 TRANSFER COUNT: ${agentName} -> ${event.item.targetAgent.name} = ${currentCalls}/${maxCalls}`); + } + + // inject give up control instructions if needed (parent handing off to child) + maybeInjectGiveUpControlInstructions( + agents, + agentConfig, + event.item.targetAgent.name, // child + agentName, // parent + eventLogger, + originalInstructions, + originalHandoffs + ); + + // emit transfer tool call invocation + const [transferStart, transferComplete] = createTransferEvents(agentName, event.item.targetAgent.name); + + // add messages to turn + turnMsgs.push(transferStart); + turnMsgs.push(transferComplete); + + // emit events + yield* emitEvent(eventLogger, transferStart); + yield* emitEvent(eventLogger, transferComplete); + + // update transfer counter + transferCounter.increment(agentName, event.item.targetAgent.name); + + const newAgentName = event.item.targetAgent.name; + + loopLogger.log(`🔄 AGENT SWITCH: ${agentName} -> ${newAgentName} (reason: handoff)`); + + // add current agent to stack only if new agent is internal + const newAgentConfig = agentConfig[newAgentName]; + if (newAgentConfig?.outputVisibility === 'internal' || newAgentConfig?.type === 'pipeline') { + stack.push(agentName); + loopLogger.log(`📚 STACK PUSH: ${agentName} (new agent ${newAgentName} is internal/pipeline)`); + loopLogger.log(`📚 STACK NOW: [${stack.join(' -> ')}]`); + } + + // Return the new agent name for the caller to handle + yield { newAgentName }; +} + +// Handle tool call result events +async function* handleToolCallResult( + event: any, + turnMsgs: z.infer[], + eventLogger: PrefixLogger +): AsyncIterable | z.infer> { + const m: z.infer = { + role: 'tool', + content: event.item.rawItem.output.text, + toolCallId: event.item.rawItem.callId, + toolName: event.item.rawItem.name, + }; + + // add message to turn + turnMsgs.push(m); + + // emit event + yield* emitEvent(eventLogger, m); +} + +// Handle message output events and internal agent switching +async function* handleMessageOutput( + event: any, + agentName: string, + agentConfig: Record>, + agents: Record, + pipelineConfig: Record>, + stack: string[], + turnMsgs: z.infer[], + transferCounter: AgentTransferCounter, + workflow: z.infer, + eventLogger: PrefixLogger, + loopLogger: PrefixLogger, + getAgentState: (agentName: string) => AgentState +): AsyncIterable | z.infer | { newAgentName: string | null; shouldContinue: boolean }> { + // check response visibility - could be an agent or pipeline + const agentConfigObj = agentConfig[agentName]; + const pipelineConfigObj = pipelineConfig[agentName]; + const isInternal = agentConfigObj?.outputVisibility === 'internal' || agentConfigObj?.type === 'pipeline' || !!pipelineConfigObj; + + for (const content of event.item.rawItem.content) { + if (content.type === 'output_text') { + // todo: look into what is causing empty messages + // Skip empty or whitespace-only messages + if (!content.text || content.text.trim() === '') { + eventLogger.log(`Skipping empty message from ${agentName}`); + continue; + } + + // create message + const msg: z.infer = { + role: 'assistant', + content: content.text, + agentName: agentName, + responseType: isInternal ? 'internal' : 'external', + }; + + // add message to turn + turnMsgs.push(msg); + + // emit event + yield* emitEvent(eventLogger, msg); + } + } + + // if this is an internal agent or pipeline agent, switch to previous agent + if (isInternal) { + const current = agentName; + const currentAgentConfig = agentConfig[agentName]; + const currentPipelineConfig = pipelineConfig[agentName]; + const agentState = getAgentState(agentName); + + // Check if tool calls are still pending - if so, don't switch agents yet + if (agentState.pendingToolCalls > 0) { + loopLogger.log(`🔄 Deferring agent switch: ${current} has ${agentState.pendingToolCalls} pending tool calls`); + return; // Exit without switching now + } + + // Check if this is a pipeline or pipeline agent that needs to continue the pipeline + if (currentPipelineConfig || currentAgentConfig?.type === 'pipeline') { + const result = handlePipelineAgentExecution( + agents[current], // Use the correct agent from agents collection + current, + pipelineConfig, + stack, + loopLogger, + turnMsgs, + transferCounter, + createTransferEvents + ); + + // Emit transfer events if they exist + if (result.transferEvents) { + const [transferStart, transferComplete] = result.transferEvents; + yield* emitEvent(eventLogger, transferStart); + yield* emitEvent(eventLogger, transferComplete); + } + + if (result.shouldContinue) { + yield { newAgentName: result.nextAgentName!, shouldContinue: true }; + return; + } else { + // Pipeline completed - set agentName to null to terminate turn + loopLogger.log(`Pipeline execution complete - terminating turn`); + yield { newAgentName: null, shouldContinue: false }; + return; + } + } + + let nextAgentName = agentName; + + // Check control type to determine next action + if (currentPipelineConfig) { + // For standalone pipelines, default behavior is to relinquish to parent + if (stack.length > 0) { + nextAgentName = stack.pop()!; + loopLogger.log(`-- popped agent from stack: ${nextAgentName} || reason: ${current} is a pipeline, returning to parent agent`); + } else { + nextAgentName = workflow.startAgent; + loopLogger.log(`-- using start agent: ${nextAgentName} || reason: ${current} is a pipeline, no parent agent`); + } + } else if (currentAgentConfig?.controlType === 'relinquish_to_parent' || currentAgentConfig?.controlType === 'retain') { + if (stack.length > 0) { + nextAgentName = stack.pop()!; + loopLogger.log(`-- popped agent from stack: ${nextAgentName} || reason: ${current} is an internal agent, it put out a message and it has a control type of ${currentAgentConfig?.controlType}, hence the flow of control needs to return to the previous agent`); + } else { + // Check if current agent IS the start agent - if so, terminate to avoid loop + if (current === workflow.startAgent) { + loopLogger.log(`Task agent ${current} is start agent with no parent - terminating turn`); + yield { newAgentName: null, shouldContinue: false }; + return; + } else { + nextAgentName = workflow.startAgent; + loopLogger.log(`-- using start agent (stack empty): ${nextAgentName}`); + } + } + } else if (currentAgentConfig?.controlType === 'relinquish_to_start') { + nextAgentName = workflow.startAgent; + loopLogger.log(`-- using start agent: ${nextAgentName} || reason: ${current} is an internal agent, it put out a message and it has a control type of ${currentAgentConfig?.controlType}, hence the flow of control needs to return to the start agent`); + } + + // Only emit transfer events if we're actually changing agents + if (nextAgentName !== current) { + loopLogger.log(`-- stack is now: ${JSON.stringify(stack)}`); + + // emit transfer tool call invocation + const [transferStart, transferComplete] = createTransferEvents(current, nextAgentName); + + // add messages to turn + turnMsgs.push(transferStart); + turnMsgs.push(transferComplete); + + // emit events + yield* emitEvent(eventLogger, transferStart); + yield* emitEvent(eventLogger, transferComplete); + + // update transfer counter + transferCounter.increment(current, nextAgentName); + + // set this as the new agent name + loopLogger.log(`switched to agent: ${nextAgentName} || reason: internal agent (${current}) put out a message`); + + yield { newAgentName: nextAgentName, shouldContinue: true }; + } + } +} + // Pipeline controller function to handle pipeline agent execution and transfers function handlePipelineAgentExecution( currentAgent: Agent, @@ -1056,47 +1186,54 @@ function handlePipelineAgentExecution( const pipelineName = (currentAgent as any).pipelineName; const pipelineIndex = (currentAgent as any).pipelineIndex; const isLastInPipeline = (currentAgent as any).isLastInPipeline; - + if (!pipelineName || pipelineIndex === undefined) { logger.log(`warning: pipeline agent ${currentAgentName} missing pipeline metadata`); return { nextAgentName: null, shouldContinue: false }; } - + const pipeline = pipelineConfig[pipelineName]; if (!pipeline) { logger.log(`warning: pipeline ${pipelineName} not found in config`); return { nextAgentName: null, shouldContinue: false }; } - + let nextAgentName: string | null = null; - + if (!isLastInPipeline) { // Not the last agent - continue to next agent in pipeline nextAgentName = pipeline.agents[pipelineIndex + 1]; logger.log(`-- pipeline controller: ${currentAgentName} -> ${nextAgentName} (continuing pipeline ${pipelineName})`); } else { - // Last agent - return to calling agent - nextAgentName = stack.pop()!; - logger.log(`-- pipeline controller: ${currentAgentName} -> ${nextAgentName} (pipeline ${pipelineName} complete, returning to caller)`); + // Last agent in pipeline - check if there's a calling agent to return to + if (stack.length > 0) { + // Normal case: return to calling agent + nextAgentName = stack.pop()!; + logger.log(`-- pipeline controller: ${currentAgentName} -> ${nextAgentName} (pipeline ${pipelineName} complete, returning to caller)`); + } else { + // Pipeline was start agent: no caller to return to, terminate execution + logger.log(`-- pipeline controller: pipeline ${pipelineName} complete, no caller to return to - ending turn`); + return { nextAgentName: null, shouldContinue: false }; + } } - + if (nextAgentName) { // Create transfer events for pipeline continuation const transferEvents = createTransferEvents(currentAgentName, nextAgentName); const [transferStart, transferComplete] = transferEvents; - + // Add messages to turn turnMsgs.push(transferStart); turnMsgs.push(transferComplete); - + // Update transfer counter transferCounter.increment(currentAgentName, nextAgentName); - + logger.log(`switched to agent: ${nextAgentName} || reason: pipeline controller transfer`); - + return { nextAgentName, shouldContinue: true, transferEvents }; } - + return { nextAgentName: null, shouldContinue: false }; } @@ -1133,34 +1270,61 @@ export async function* streamResponse( logger.log(`pipelines: ${Object.keys(pipelineConfig).length} (${Object.keys(pipelineConfig).join(', ')})`); logger.log(`start agent: ${workflow.startAgent}`); logger.log(`=== END CONFIGURATION ===`); - + const stack: string[] = []; logger.log(`initialized stack: ${JSON.stringify(stack)}`); // create tools const tools = createTools(logger, projectId, workflow, toolConfig); - // create agents - const { agents, originalInstructions, originalHandoffs } = createAgents(logger, projectId, workflow, agentConfig, tools, promptConfig, pipelineConfig); + // create agents with feature flag support + const createAgentsFunction = USE_NATIVE_HANDOFFS ? createAgentsWithNativeHandoffs : createAgentsLegacy; + const { agents, originalInstructions, originalHandoffs } = createAgentsFunction(logger, projectId, workflow, agentConfig, tools, promptConfig, pipelineConfig); + + logger.log(`Using ${USE_NATIVE_HANDOFFS ? 'NATIVE SDK' : 'LEGACY'} handoffs`); // track agent to agent calls const transferCounter = new AgentTransferCounter(); + + // initialize pipeline state manager for native handoffs + const pipelineStateManager = USE_NATIVE_HANDOFFS ? new PipelineStateManager(logger) : null; // get the agent that should be starting this turn - const startOfTurnAgentName = getStartOfTurnAgentName(logger, messages, agentConfig, workflow); + const startOfTurnAgentName = getStartOfTurnAgentName(logger, messages, agentConfig, pipelineConfig, workflow); logger.log(`🎯 START AGENT DECISION: ${startOfTurnAgentName}`); - - let agentName = startOfTurnAgentName; + + let agentName: string | null = startOfTurnAgentName; // start the turn loop const usageTracker = new UsageTracker(); const turnMsgs: z.infer[] = [...messages]; - logger.log('🎬 STARTING AGENT TURN'); + // Initialize agent state tracking for tool call completion + const agentStates = new Map(); + // Helper function to get or create agent state + const getAgentState = (agentName: string): AgentState => { + if (!agentStates.has(agentName)) { + agentStates.set(agentName, { pendingToolCalls: 0 }); + } + return agentStates.get(agentName)!; + }; + + // Helper function to check if agent can switch + const canSwitchAgent = (fromAgent: string, reason: string): boolean => { + const state = getAgentState(fromAgent); + if (state.pendingToolCalls > 0) { + console.log(`🚫 Blocking agent switch: ${fromAgent} has ${state.pendingToolCalls} pending tool calls (reason: ${reason})`); + return false; + } + return true; + }; + + logger.log('🎬 STARTING AGENT TURN'); + // stack-based agent execution loop let iter = 0; - const MAXTURNITERATIONS = 10; + const MAXTURNITERATIONS = 25; // loop indefinitely turnLoop: while (true) { @@ -1173,6 +1337,12 @@ export async function* streamResponse( // increment loop counter iter++; + + // Check iteration limit to prevent infinite loops + if (iter >= MAXTURNITERATIONS) { + loopLogger.log(`⚠️ TURN LIMIT REACHED: ${iter}/${MAXTURNITERATIONS} - terminating to prevent infinite loop`); + break turnLoop; + } // set up logging // const loopLogger = logger.child(`iter-${iter}`); @@ -1180,9 +1350,25 @@ export async function* streamResponse( // log agent info // loopLogger.log(`agent name: ${agentName}`); // loopLogger.log(`stack: ${JSON.stringify(stack)}`); - if (!agents[agentName]) { + + // Check if current agent is actually a pipeline + const currentPipelineConfig: z.infer | null = agentName ? pipelineConfig[agentName] : null; + if (currentPipelineConfig) { + // If agentName is a pipeline, switch to the first agent in the pipeline + if (currentPipelineConfig.agents.length === 0) { + throw new Error(`Pipeline '${agentName}' has no agents!`); + } + const firstAgentInPipeline: string = currentPipelineConfig.agents[0]; + logger.log(`🔄 Pipeline '${agentName}' starting with first agent: ${firstAgentInPipeline}`); + agentName = firstAgentInPipeline; + // Continue with the first agent in the pipeline + } + + if (!agentName || !agents[agentName]) { throw new Error(`agent not found in agent config!`); } + + // At this point, agentName is guaranteed to be non-null const agent: Agent = agents[agentName]!; // convert messages to agents sdk compatible input @@ -1194,6 +1380,7 @@ export async function* streamResponse( inputs, { stream: true, + maxTurns: MAX_AGENT_TURNS, } ); @@ -1203,105 +1390,74 @@ export async function* streamResponse( switch (event.type) { case 'raw_model_stream_event': - if (event.data.type === 'response_done') { - for (const output of event.data.response.output) { - // handle tool call invocation - // except for transfer_to_* tool calls - if (output.type === 'function_call' && !output.name.startsWith('transfer_to')) { - const m: z.infer = { - role: 'assistant', - content: null, - toolCalls: [{ - id: output.callId, - type: 'function', - function: { - name: output.name, - arguments: output.arguments, - }, - }], - agentName: agentName, - }; - - // add message to turn - turnMsgs.push(m); - - // emit event - yield* emitEvent(eventLogger, m); - } - } - - // update usage information - usageTracker.increment( - event.data.response.usage.totalTokens, - event.data.response.usage.inputTokens, - event.data.response.usage.outputTokens - ); - eventLogger.log(`updated usage information: ${JSON.stringify(usageTracker.get())}`); - } + yield* handleRawModelStreamEvent(event, agentName!, turnMsgs, usageTracker, eventLogger, getAgentState); break; + case 'run_item_stream_event': - // handle handoff event + // Track tool call completion - decrement counter when tool calls complete + if (event.item.type === 'tool_call_output_item' && + event.item.rawItem.type === 'function_call_result' && + event.item.rawItem.status === 'completed') { + + const state = getAgentState(agentName!); + if (state.pendingToolCalls > 0) { + state.pendingToolCalls--; + eventLogger.log(`✅ Tool call completed: ${agentName!} (${state.pendingToolCalls} remaining)`); + } + } + + // handle handoff event with feature flag support if (event.name === 'handoff_occurred' && event.item.type === 'handoff_output_item') { - eventLogger.log(`🔄 HANDOFF EVENT: ${agentName} -> ${event.item.targetAgent.name}`); - - // skip if its the same agent - if (agentName === event.item.targetAgent.name) { - eventLogger.log(`⚠️ SKIPPING: handoff to same agent: ${agentName}`); - break; - } - - // Only apply max calls limit to internal agents (task agents) - const targetAgentConfig = agentConfig[event.item.targetAgent.name]; - if (targetAgentConfig?.outputVisibility === 'internal') { - const maxCalls = targetAgentConfig?.maxCallsPerParentAgent || 3; - const currentCalls = transferCounter.get(agentName, event.item.targetAgent.name); - if (currentCalls >= maxCalls) { - eventLogger.log(`⚠️ SKIPPING: handoff to ${event.item.targetAgent.name} - max calls ${maxCalls} exceeded from ${agentName}`); - continue; + if (USE_NATIVE_HANDOFFS) { + // Use native SDK handoff handling + const nativeHandoffResults = handleNativeHandoffEvent( + event, + agentName!, + agentConfig, + agents, + pipelineConfig, + stack, + turnMsgs, + transferCounter, + pipelineStateManager!, + originalInstructions, + originalHandoffs, + eventLogger, + loopLogger + ); + for await (const handoffResult of nativeHandoffResults) { + if ('newAgentName' in handoffResult) { + agentName = handoffResult.newAgentName; + if (handoffResult.shouldContinue) { + continue turnLoop; + } + } else { + yield handoffResult; + } + } + } else { + // Use legacy handoff handling + const legacyHandoffResults = handleHandoffEvent( + event, + agentName!, + agentConfig, + agents, + stack, + turnMsgs, + transferCounter, + originalInstructions, + originalHandoffs, + eventLogger, + loopLogger + ); + for await (const legacyResult of legacyHandoffResults) { + if ('newAgentName' in legacyResult) { + agentName = legacyResult.newAgentName; + } else { + yield legacyResult; + } } - eventLogger.log(`📊 TRANSFER COUNT: ${agentName} -> ${event.item.targetAgent.name} = ${currentCalls}/${maxCalls}`); } - - // inject give up control instructions if needed (parent handing off to child) - maybeInjectGiveUpControlInstructions( - agents, - agentConfig, - event.item.targetAgent.name, // child - agentName, // parent - eventLogger, - originalInstructions, - originalHandoffs - ); - - // emit transfer tool call invocation - const [transferStart, transferComplete] = createTransferEvents(agentName, event.item.targetAgent.name); - - // add messages to turn - turnMsgs.push(transferStart); - turnMsgs.push(transferComplete); - - // emit events - yield* emitEvent(eventLogger, transferStart); - yield* emitEvent(eventLogger, transferComplete); - - // update transfer counter - transferCounter.increment(agentName, event.item.targetAgent.name); - - const newAgentName = event.item.targetAgent.name; - - loopLogger.log(`🔄 AGENT SWITCH: ${agentName} -> ${newAgentName} (reason: handoff)`); - - // add current agent to stack only if new agent is internal - const newAgentConfig = agentConfig[newAgentName]; - if (newAgentConfig?.outputVisibility === 'internal' || newAgentConfig?.type === 'pipeline') { - stack.push(agentName); - loopLogger.log(`📚 STACK PUSH: ${agentName} (new agent ${newAgentName} is internal/pipeline)`); - loopLogger.log(`📚 STACK NOW: [${stack.join(' -> ')}]`); - } - - // set this as the new agent name - agentName = newAgentName; - } // handle tool call result @@ -1309,119 +1465,51 @@ export async function* streamResponse( event.item.rawItem.type === 'function_call_result' && event.item.rawItem.status === 'completed' && event.item.rawItem.output.type === 'text') { - const m: z.infer = { - role: 'tool', - content: event.item.rawItem.output.text, - toolCallId: event.item.rawItem.callId, - toolName: event.item.rawItem.name, - }; - - // add message to turn - turnMsgs.push(m); - - // emit event - yield* emitEvent(eventLogger, m); + yield* handleToolCallResult(event, turnMsgs, eventLogger); } // handle model response message output if (event.item.type === 'message_output_item' && event.item.rawItem.type === 'message' && event.item.rawItem.status === 'completed') { - // check response visibility - const agentConfigObj = agentConfig[agentName]; - const isInternal = agentConfigObj?.outputVisibility === 'internal' || agentConfigObj?.type === 'pipeline'; - for (const content of event.item.rawItem.content) { - if (content.type === 'output_text') { - // create message - const msg: z.infer = { - role: 'assistant', - content: content.text, - agentName: agentName, - responseType: isInternal ? 'internal' : 'external', - }; - - // add message to turn - turnMsgs.push(msg); - - // emit event - yield* emitEvent(eventLogger, msg); - } - } - - // if this is an internal agent or pipeline agent, switch to previous agent - if (isInternal) { - const current = agentName; - const currentAgentConfig = agentConfig[agentName]; - - // Check if this is a pipeline agent that needs to continue the pipeline - if (currentAgentConfig?.type === 'pipeline') { - const result = handlePipelineAgentExecution( - agents[current], // Use the correct agent from agents collection - current, - pipelineConfig, - stack, - loopLogger, - turnMsgs, - transferCounter, - createTransferEvents - ); - - // Emit transfer events if they exist - if (result.transferEvents) { - const [transferStart, transferComplete] = result.transferEvents; - yield* emitEvent(eventLogger, transferStart); - yield* emitEvent(eventLogger, transferComplete); - } - - if (result.shouldContinue) { - agentName = result.nextAgentName!; - // Run the turn from the next agent + const messageResults = handleMessageOutput( + event, + agentName!, + agentConfig, + agents, + pipelineConfig, + stack, + turnMsgs, + transferCounter, + workflow, + eventLogger, + loopLogger, + getAgentState + ); + for await (const messageResult of messageResults) { + if ('newAgentName' in messageResult && 'shouldContinue' in messageResult) { + agentName = messageResult.newAgentName; + if (messageResult.shouldContinue) { continue turnLoop; } - } - - // Check control type to determine next action for non-pipeline agents - if (currentAgentConfig?.controlType === 'relinquish_to_parent' || currentAgentConfig?.controlType === 'retain') { - agentName = stack.pop()!; - loopLogger.log(`-- popped agent from stack: ${agentName} || reason: ${current} is an internal agent, it put out a message and it has a control type of ${currentAgentConfig?.controlType}, hence the flow of control needs to return to the previous agent`); - } else if (currentAgentConfig?.controlType === 'relinquish_to_start') { - agentName = workflow.startAgent; - loopLogger.log(`-- using start agent: ${agentName} || reason: ${current} is an internal agent, it put out a message and it has a control type of ${currentAgentConfig?.controlType}, hence the flow of control needs to return to the start agent`); - } - - // Only emit transfer events if we're actually changing agents - if (agentName !== current) { - loopLogger.log(`-- stack is now: ${JSON.stringify(stack)}`); - - // emit transfer tool call invocation - const [transferStart, transferComplete] = createTransferEvents(current, agentName); - - // add messages to turn - turnMsgs.push(transferStart); - turnMsgs.push(transferComplete); - - // emit events - yield* emitEvent(eventLogger, transferStart); - yield* emitEvent(eventLogger, transferComplete); - - // update transfer counter - transferCounter.increment(current, agentName); - - // set this as the new agent name - loopLogger.log(`switched to agent: ${agentName} || reason: internal agent (${current}) put out a message`); - - // run the turn from the previous agent - continue turnLoop; + } else { + yield messageResult; } } - break; } break; + default: break; } } + // Check if we have no next agent (pipeline or other termination) + if (!agentName) { + loopLogger.log(`no next agent available, breaking out of turn loop`); + break turnLoop; + } + // if the last message was a text response by a user-facing agent, complete the turn // loopLogger.log(`iter end, turnMsgs: ${JSON.stringify(turnMsgs)}, agentName: ${agentName}`); const lastMessage = turnMsgs[turnMsgs.length - 1]; @@ -1433,6 +1521,7 @@ export async function* streamResponse( loopLogger.log(`last message was by a user_facing agent, breaking out of parent loop`); break turnLoop; } + } // emit usage information diff --git a/apps/rowboat/app/lib/pipeline-state-manager.ts b/apps/rowboat/app/lib/pipeline-state-manager.ts new file mode 100644 index 00000000..32a474e8 --- /dev/null +++ b/apps/rowboat/app/lib/pipeline-state-manager.ts @@ -0,0 +1,322 @@ +// Pipeline State Manager for handling complex pipeline execution flow +import { Agent } from "@openai/agents"; +import { z } from "zod"; +import { WorkflowPipeline, WorkflowAgent } from "./types/workflow_types"; +import { PipelineExecutionState } from "./agents"; +import { PrefixLogger } from "./utils"; +import { createPipelineHandoff } from "./agent-handoffs"; + +export interface PipelineExecutionResult { + action: 'handoff' | 'complete' | 'error'; + nextAgent?: string; + handoff?: any; // SDK Handoff object + context?: any; + results?: any; + returnToAgent?: string; + error?: string; +} + +export class PipelineStateManager { + private pipelineStates = new Map>(); + private logger: PrefixLogger; + + constructor(logger: PrefixLogger) { + this.logger = logger.child('PipelineStateManager'); + } + + // Initialize a new pipeline execution + initializePipelineExecution( + pipelineName: string, + callingAgent: string, + pipelineConfig: z.infer, + initialData?: Record + ): z.infer { + const state: z.infer = { + pipelineName, + currentStep: 0, + totalSteps: pipelineConfig.agents.length, + callingAgent, + pipelineData: initialData || null, + stepResults: null, + currentStepResult: null, + startTime: new Date().toISOString(), + metadata: { + pipelineDescription: pipelineConfig.description + } + }; + + // Store initial state for the first agent + const firstAgent = pipelineConfig.agents[0]; + this.storePipelineState(firstAgent, state); + + this.logger.log(`🚀 Initialized pipeline "${pipelineName}" with ${state.totalSteps} steps`); + this.logger.log(`First agent: ${firstAgent}, called by: ${callingAgent}`); + + return state; + } + + // Handle pipeline execution step + async handlePipelineExecution( + currentAgentName: string, + pipelineConfig: Record>, + agents: Record, + stepResult?: Record + ): Promise { + const state = this.getPipelineState(currentAgentName); + + if (!state) { + return { + action: 'error', + error: `No pipeline state found for agent ${currentAgentName}` + }; + } + + const pipeline = pipelineConfig[state.pipelineName]; + if (!pipeline) { + return { + action: 'error', + error: `Pipeline ${state.pipelineName} not found in configuration` + }; + } + + // Store current step result + if (stepResult) { + // Safely handle stepResults as flexible union type + const existingResults = Array.isArray(state.stepResults) ? state.stepResults : []; + state.stepResults = [...existingResults, stepResult]; + state.currentStepResult = stepResult; + + // Update pipeline data if result contains data to pass forward + if (stepResult.pipelineData) { + // Safely handle pipelineData as flexible union type + const existingData = (typeof state.pipelineData === 'object' && state.pipelineData !== null) ? state.pipelineData : {}; + const newData = (typeof stepResult.pipelineData === 'object' && stepResult.pipelineData !== null) ? stepResult.pipelineData : {}; + + state.pipelineData = { + ...existingData, + ...newData + }; + } + } + + this.logger.log(`📊 Pipeline "${state.pipelineName}" step ${state.currentStep + 1}/${state.totalSteps} completed by ${currentAgentName}`); + + // Check if this is the last step + if (state.currentStep >= pipeline.agents.length - 1) { + // Pipeline complete - return to calling agent + this.logger.log(`✅ Pipeline "${state.pipelineName}" completed, returning to ${state.callingAgent}`); + + const finalResults = { + pipelineName: state.pipelineName, + totalSteps: state.totalSteps, + stepResults: state.stepResults, + finalData: state.pipelineData, + completionTime: new Date().toISOString(), + duration: Date.now() - new Date(state.startTime).getTime() + }; + + // Clean up state + this.clearPipelineState(currentAgentName); + + return { + action: 'complete', + results: finalResults, + returnToAgent: state.callingAgent + }; + } + + // Continue to next step + const nextStepIndex = state.currentStep + 1; + const nextAgentName = pipeline.agents[nextStepIndex]; + + if (!agents[nextAgentName]) { + return { + action: 'error', + error: `Next agent ${nextAgentName} not found in agents configuration` + }; + } + + // Update state for next step + const nextState: z.infer = { + ...state, + currentStep: nextStepIndex, + currentStepResult: null // Reset for next step + }; + + // Store state for next agent + this.storePipelineState(nextAgentName, nextState); + + // Create SDK handoff with rich context + const handoff = createPipelineHandoff( + agents[nextAgentName], + nextState, + this.logger + ); + + this.logger.log(`➡️ Pipeline "${state.pipelineName}": ${currentAgentName} -> ${nextAgentName} (step ${nextStepIndex + 1}/${state.totalSteps})`); + + return { + action: 'handoff', + nextAgent: nextAgentName, + handoff, + context: { + reason: 'pipeline_execution', + pipelineName: state.pipelineName, + currentStep: nextStepIndex, + totalSteps: state.totalSteps, + isLastStep: nextStepIndex >= state.totalSteps - 1, + pipelineData: nextState.pipelineData, + stepResults: nextState.stepResults + } + }; + } + + // Store pipeline state for an agent + storePipelineState(agentName: string, state: z.infer): void { + this.pipelineStates.set(agentName, state); + this.logger.log(`💾 Stored pipeline state for ${agentName}: step ${state.currentStep + 1}/${state.totalSteps}`); + } + + // Retrieve pipeline state for an agent + getPipelineState(agentName: string): z.infer | null { + return this.pipelineStates.get(agentName) || null; + } + + // Clear pipeline state (cleanup) + clearPipelineState(agentName: string): void { + this.pipelineStates.delete(agentName); + this.logger.log(`🗑️ Cleared pipeline state for ${agentName}`); + } + + // Check if agent is in a pipeline + isAgentInPipeline(agentName: string): boolean { + return this.pipelineStates.has(agentName); + } + + // Get all active pipelines (for debugging) + getActivePipelines(): Array<{agentName: string, state: z.infer}> { + return Array.from(this.pipelineStates.entries()).map(([agentName, state]) => ({ + agentName, + state + })); + } + + // Inject pipeline context into agent instructions + injectPipelineContext( + agent: Agent, + agentName: string, + originalInstructions: string + ): string { + const state = this.getPipelineState(agentName); + if (!state) { + return originalInstructions; + } + + const contextPrompt = this.createPipelineContextPrompt(state); + const enhancedInstructions = `${originalInstructions}\n\n${contextPrompt}`; + + this.logger.log(`📝 Injected pipeline context for ${agentName} in pipeline "${state.pipelineName}"`); + + return enhancedInstructions; + } + + // Create pipeline context prompt + private createPipelineContextPrompt(state: z.infer): string { + const stepInfo = `Step ${state.currentStep + 1} of ${state.totalSteps}`; + const isLast = state.currentStep >= state.totalSteps - 1; + + let contextPrompt = `## 🔄 Pipeline Execution Context + +**Pipeline**: ${state.pipelineName} +**Current Step**: ${stepInfo} +**Status**: ${isLast ? 'FINAL STEP - Provide complete results' : 'Intermediate step - Pass results forward'} + +`; + + if (state.stepResults && Array.isArray(state.stepResults) && state.stepResults.length > 0) { + contextPrompt += `**Previous Step Results**: +\`\`\`json +${JSON.stringify(state.stepResults, null, 2)} +\`\`\` + +`; + } + + if (state.pipelineData && typeof state.pipelineData === 'object' && state.pipelineData !== null && Object.keys(state.pipelineData).length > 0) { + contextPrompt += `**Pipeline Data**: +\`\`\`json +${JSON.stringify(state.pipelineData, null, 2)} +\`\`\` + +`; + } + + if (isLast) { + contextPrompt += `⚠️ **IMPORTANT**: This is the final step in the pipeline. Your response will be returned to the calling agent "${state.callingAgent}". Provide comprehensive results. + +`; + } else { + contextPrompt += `➡️ **NEXT**: After completing your task, results will automatically flow to the next step in the pipeline. + +`; + } + + return contextPrompt; + } + + // Error recovery - handle pipeline failures + handlePipelineError( + agentName: string, + error: string | Error, + shouldReturnToCaller: boolean = true + ): PipelineExecutionResult { + const state = this.getPipelineState(agentName); + const errorMessage = typeof error === 'string' ? error : error.message; + + this.logger.log(`❌ Pipeline error in agent ${agentName}: ${errorMessage}`); + + if (state && shouldReturnToCaller) { + // Clean up and return to caller with error + this.clearPipelineState(agentName); + + return { + action: 'complete', + results: { + pipelineName: state.pipelineName, + error: errorMessage, + completedSteps: state.currentStep, + totalSteps: state.totalSteps, + stepResults: state.stepResults + }, + returnToAgent: state.callingAgent + }; + } + + return { + action: 'error', + error: errorMessage + }; + } + + // Get pipeline statistics (for monitoring) + getPipelineStats(): { + activePipelines: number; + pipelinesByName: Record; + averageStepsCompleted: number; + } { + const pipelines = this.getActivePipelines(); + const pipelinesByName: Record = {}; + let totalSteps = 0; + + pipelines.forEach(({state}) => { + pipelinesByName[state.pipelineName] = (pipelinesByName[state.pipelineName] || 0) + 1; + totalSteps += state.currentStep + 1; + }); + + return { + activePipelines: pipelines.length, + pipelinesByName, + averageStepsCompleted: pipelines.length > 0 ? totalSteps / pipelines.length : 0 + }; + } +} \ No newline at end of file diff --git a/apps/rowboat/app/projects/[projectId]/workflow/entity_list.tsx b/apps/rowboat/app/projects/[projectId]/workflow/entity_list.tsx index b8ce0a16..6416ad63 100644 --- a/apps/rowboat/app/projects/[projectId]/workflow/entity_list.tsx +++ b/apps/rowboat/app/projects/[projectId]/workflow/entity_list.tsx @@ -280,6 +280,7 @@ interface PipelineCardProps { onDeletePipeline: (name: string) => void; onDeleteAgent: (name: string) => void; onAddAgentToPipeline: (pipelineName: string) => void; + onSetMainAgent: (name: string) => void; selectedRef: React.RefObject; startAgentName: string | null; dragHandle?: React.ReactNode; @@ -294,6 +295,7 @@ const PipelineCard = ({ onDeletePipeline, onDeleteAgent, onAddAgentToPipeline, + onSetMainAgent, selectedRef, startAgentName, dragHandle, @@ -347,6 +349,11 @@ const PipelineCard = ({
{pipeline.name} ({pipelineAgents.length} steps) + {startAgentName === pipeline.name && ( + + START + + )}
@@ -362,10 +369,19 @@ const PipelineCard = ({ onAction={(key) => { if (key === 'delete') { onDeletePipeline(pipeline.name); + } else if (key === 'set-main-agent') { + onSetMainAgent(pipeline.name); } }} > - Delete Pipeline + {startAgentName !== pipeline.name ? ( + <> + Set as start agent + Delete Pipeline + + ) : ( + Delete Pipeline + )} @@ -740,6 +756,7 @@ export const EntityList = forwardRef< onDeletePipeline={onDeletePipeline} onDeleteAgent={onDeleteAgent} onAddAgentToPipeline={onAddAgentToPipeline} + onSetMainAgent={onSetMainAgent} selectedRef={selectedRef} startAgentName={startAgentName} /> @@ -1663,6 +1680,7 @@ const SortablePipelineItem = ({ onDeletePipeline, onDeleteAgent, onAddAgentToPipeline, + onSetMainAgent, selectedRef, startAgentName }: { @@ -1677,6 +1695,7 @@ const SortablePipelineItem = ({ onDeletePipeline: (name: string) => void; onDeleteAgent: (name: string) => void; onAddAgentToPipeline: (pipelineName: string) => void; + onSetMainAgent: (name: string) => void; selectedRef: React.RefObject; startAgentName: string | null; }) => { @@ -1706,6 +1725,7 @@ const SortablePipelineItem = ({ onDeletePipeline={onDeletePipeline} onDeleteAgent={onDeleteAgent} onAddAgentToPipeline={onAddAgentToPipeline} + onSetMainAgent={onSetMainAgent} selectedRef={selectedRef} startAgentName={startAgentName} dragHandle={