diff --git a/apps/rowboat/src/application/lib/agents-runtime/agents.ts b/apps/rowboat/src/application/lib/agents-runtime/agents.ts index 0c632c6b..4d2a7e84 100644 --- a/apps/rowboat/src/application/lib/agents-runtime/agents.ts +++ b/apps/rowboat/src/application/lib/agents-runtime/agents.ts @@ -747,29 +747,44 @@ function maybeInjectGiveUpControlInstructions( async function* handleRawModelStreamEvent( event: RunRawModelStreamEvent, agentConfig: Record>, + pipelineConfig: Record>, agentName: string, turnMsgs: z.infer[], usageTracker: UsageTracker, eventLogger: PrefixLogger, getAgentState?: (agentName: string) => AgentState ): AsyncIterable> { + // check response visibility - could be an agent or pipeline + const agentConfigObj = agentConfig[agentName]; + const pipelineConfigObj = pipelineConfig[agentName]; + const isInternal = agentConfigObj?.outputVisibility === 'internal' || agentConfigObj?.type === 'pipeline' || !!pipelineConfigObj; + if (event.data.type === 'response_done') { - // Count tool calls (excluding transfer_to_* calls) - const toolCallCount = event.data.response.output.filter( - (output: any) => output.type === 'function_call' && !output.name.startsWith('transfer_to') - ).length; - - // If we have tool calls, increment pending counter - if (toolCallCount > 0 && getAgentState) { - const state = getAgentState(agentName); - state.pendingToolCalls += toolCallCount; - eventLogger.log(`🔧 Agent ${agentName} has ${toolCallCount} new tool calls (total: ${state.pendingToolCalls})`); - } - for (const output of event.data.response.output) { + if (output.type === 'message') { + for (const c of output.content) { + if (c.type === 'output_text' && c.text.trim()) { + const m: z.infer = { + role: 'assistant', + content: c.text, + agentName: agentName, + responseType: isInternal ? 'internal' : 'external', + }; + turnMsgs.push(m); + yield* emitEvent(eventLogger, m); + } + } + } + // handle tool call invocation // except for transfer_to_* tool calls if (output.type === 'function_call' && !output.name.startsWith('transfer_to')) { + if (getAgentState) { + const state = getAgentState(agentName); + state.pendingToolCalls++; + eventLogger.log(`🔧 Agent ${agentName} has ${state.pendingToolCalls} pending tool calls`); + } + const m: z.infer = { role: 'assistant', content: null, @@ -1030,6 +1045,7 @@ async function* handleMessageOutput( const pipelineConfigObj = pipelineConfig[agentName]; const isInternal = agentConfigObj?.outputVisibility === 'internal' || agentConfigObj?.type === 'pipeline' || !!pipelineConfigObj; + /* ignore handling text messages here in favor of handling raw events for (const content of event.item.rawItem.content) { if (content.type === 'output_text') { // todo: look into what is causing empty messages @@ -1054,6 +1070,7 @@ async function* handleMessageOutput( yield* emitEvent(eventLogger, msg); } } + */ // if this is an internal agent or pipeline agent, switch to previous agent if (isInternal) { @@ -1372,10 +1389,20 @@ export async function* streamResponse( // handle streaming events for await (const event of result) { const eventLogger = loopLogger.child(event.type); + eventLogger.log(`*** GOT EVENT ***`, JSON.stringify(event)); switch (event.type) { case 'raw_model_stream_event': - yield* handleRawModelStreamEvent(event, agentConfig, agentName!, turnMsgs, usageTracker, eventLogger, getAgentState); + yield* handleRawModelStreamEvent( + event, + agentConfig, + pipelineConfig, + agentName!, + turnMsgs, + usageTracker, + eventLogger, + getAgentState, + ); break; case 'run_item_stream_event':