improve agents-sdk message handling

This commit is contained in:
Ramnique Singh 2025-08-22 17:45:09 +05:30
parent 29a2030abe
commit 2c3016f143

View file

@ -747,29 +747,44 @@ function maybeInjectGiveUpControlInstructions(
async function* handleRawModelStreamEvent( async function* handleRawModelStreamEvent(
event: RunRawModelStreamEvent, event: RunRawModelStreamEvent,
agentConfig: Record<string, z.infer<typeof WorkflowAgent>>, agentConfig: Record<string, z.infer<typeof WorkflowAgent>>,
pipelineConfig: Record<string, z.infer<typeof WorkflowPipeline>>,
agentName: string, agentName: string,
turnMsgs: z.infer<typeof Message>[], turnMsgs: z.infer<typeof Message>[],
usageTracker: UsageTracker, usageTracker: UsageTracker,
eventLogger: PrefixLogger, eventLogger: PrefixLogger,
getAgentState?: (agentName: string) => AgentState getAgentState?: (agentName: string) => AgentState
): AsyncIterable<z.infer<typeof ZOutMessage>> { ): AsyncIterable<z.infer<typeof ZOutMessage>> {
// check response visibility - could be an agent or pipeline
const agentConfigObj = agentConfig[agentName];
const pipelineConfigObj = pipelineConfig[agentName];
const isInternal = agentConfigObj?.outputVisibility === 'internal' || agentConfigObj?.type === 'pipeline' || !!pipelineConfigObj;
if (event.data.type === 'response_done') { if (event.data.type === 'response_done') {
// Count tool calls (excluding transfer_to_* calls)
const toolCallCount = event.data.response.output.filter(
(output: any) => output.type === 'function_call' && !output.name.startsWith('transfer_to')
).length;
// If we have tool calls, increment pending counter
if (toolCallCount > 0 && getAgentState) {
const state = getAgentState(agentName);
state.pendingToolCalls += toolCallCount;
eventLogger.log(`🔧 Agent ${agentName} has ${toolCallCount} new tool calls (total: ${state.pendingToolCalls})`);
}
for (const output of event.data.response.output) { for (const output of event.data.response.output) {
if (output.type === 'message') {
for (const c of output.content) {
if (c.type === 'output_text' && c.text.trim()) {
const m: z.infer<typeof Message> = {
role: 'assistant',
content: c.text,
agentName: agentName,
responseType: isInternal ? 'internal' : 'external',
};
turnMsgs.push(m);
yield* emitEvent(eventLogger, m);
}
}
}
// handle tool call invocation // handle tool call invocation
// except for transfer_to_* tool calls // except for transfer_to_* tool calls
if (output.type === 'function_call' && !output.name.startsWith('transfer_to')) { if (output.type === 'function_call' && !output.name.startsWith('transfer_to')) {
if (getAgentState) {
const state = getAgentState(agentName);
state.pendingToolCalls++;
eventLogger.log(`🔧 Agent ${agentName} has ${state.pendingToolCalls} pending tool calls`);
}
const m: z.infer<typeof Message> = { const m: z.infer<typeof Message> = {
role: 'assistant', role: 'assistant',
content: null, content: null,
@ -1030,6 +1045,7 @@ async function* handleMessageOutput(
const pipelineConfigObj = pipelineConfig[agentName]; const pipelineConfigObj = pipelineConfig[agentName];
const isInternal = agentConfigObj?.outputVisibility === 'internal' || agentConfigObj?.type === 'pipeline' || !!pipelineConfigObj; const isInternal = agentConfigObj?.outputVisibility === 'internal' || agentConfigObj?.type === 'pipeline' || !!pipelineConfigObj;
/* ignore handling text messages here in favor of handling raw events
for (const content of event.item.rawItem.content) { for (const content of event.item.rawItem.content) {
if (content.type === 'output_text') { if (content.type === 'output_text') {
// todo: look into what is causing empty messages // todo: look into what is causing empty messages
@ -1054,6 +1070,7 @@ async function* handleMessageOutput(
yield* emitEvent(eventLogger, msg); yield* emitEvent(eventLogger, msg);
} }
} }
*/
// if this is an internal agent or pipeline agent, switch to previous agent // if this is an internal agent or pipeline agent, switch to previous agent
if (isInternal) { if (isInternal) {
@ -1372,10 +1389,20 @@ export async function* streamResponse(
// handle streaming events // handle streaming events
for await (const event of result) { for await (const event of result) {
const eventLogger = loopLogger.child(event.type); const eventLogger = loopLogger.child(event.type);
eventLogger.log(`*** GOT EVENT ***`, JSON.stringify(event));
switch (event.type) { switch (event.type) {
case 'raw_model_stream_event': case 'raw_model_stream_event':
yield* handleRawModelStreamEvent(event, agentConfig, agentName!, turnMsgs, usageTracker, eventLogger, getAgentState); yield* handleRawModelStreamEvent(
event,
agentConfig,
pipelineConfig,
agentName!,
turnMsgs,
usageTracker,
eventLogger,
getAgentState,
);
break; break;
case 'run_item_stream_event': case 'run_item_stream_event':