diff --git a/apps/x/apps/renderer/src/App.tsx b/apps/x/apps/renderer/src/App.tsx index 8292a196..56bdab3f 100644 --- a/apps/x/apps/renderer/src/App.tsx +++ b/apps/x/apps/renderer/src/App.tsx @@ -1972,6 +1972,21 @@ function App() { break } + case 'tool-output-stream': + { + if (!isActiveRun) return + setConversation(prev => prev.map(item => { + if ( + isToolCall(item) + && item.id === event.toolCallId + ) { + return { ...item, streamingOutput: (item.streamingOutput ?? '') + event.output } + } + return item + })) + break + } + case 'tool-permission-request': { if (!isActiveRun) return const key = event.toolCall.toolCallId @@ -3842,7 +3857,16 @@ function App() { /> - {output !== null ? ( + {item.streamingOutput && item.status === 'running' ? ( + + + Live Output + + + {item.streamingOutput} + + + ) : output !== null ? ( ) : null} diff --git a/apps/x/apps/renderer/src/lib/chat-conversation.ts b/apps/x/apps/renderer/src/lib/chat-conversation.ts index d92c124d..89cbc2ff 100644 --- a/apps/x/apps/renderer/src/lib/chat-conversation.ts +++ b/apps/x/apps/renderer/src/lib/chat-conversation.ts @@ -23,6 +23,7 @@ export interface ToolCall { name: string input: ToolUIPart['input'] result?: ToolUIPart['output'] + streamingOutput?: string status: 'pending' | 'running' | 'completed' | 'error' timestamp: number } diff --git a/apps/x/packages/core/src/agents/runtime.ts b/apps/x/packages/core/src/agents/runtime.ts index b597a8d6..f58f658e 100644 --- a/apps/x/packages/core/src/agents/runtime.ts +++ b/apps/x/packages/core/src/agents/runtime.ts @@ -161,6 +161,7 @@ export class AgentRuntime implements IAgentRuntime { modelConfigRepo: this.modelConfigRepo, signal, abortRegistry: this.abortRegistry, + bus: this.bus, })) { eventCount++; if (event.type !== "llm-stream-event") { @@ -822,6 +823,7 @@ export async function* streamAgent({ modelConfigRepo, signal, abortRegistry, + bus, }: { state: AgentState, idGenerator: IMonotonicallyIncreasingIdGenerator; @@ -830,6 +832,7 @@ export async function* streamAgent({ modelConfigRepo: IModelConfigRepo; signal: AbortSignal; abortRegistry: IAbortRegistry; + bus: IBus; }): AsyncGenerator, void, unknown> { const logger = new PrefixLogger(`run-${runId}-${state.agentName}`); @@ -942,6 +945,7 @@ export async function* streamAgent({ modelConfigRepo, signal, abortRegistry, + bus, })) { yield* processEvent({ ...event, @@ -952,7 +956,7 @@ export async function* streamAgent({ result = subflowState.finalResponse(); } } else { - result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments, { runId, signal, abortRegistry }); + result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments, { runId, toolCallId, signal, abortRegistry, publish: (event) => bus.publish(event) }); } const resultPayload = result === undefined ? null : result; const resultMsg: z.infer = { diff --git a/apps/x/packages/core/src/application/lib/builtin-tools.ts b/apps/x/packages/core/src/application/lib/builtin-tools.ts index 069cf7ef..d3b5cb19 100644 --- a/apps/x/packages/core/src/application/lib/builtin-tools.ts +++ b/apps/x/packages/core/src/application/lib/builtin-tools.ts @@ -847,6 +847,16 @@ export const BuiltinTools: z.infer = { const { promise, process: proc } = executeCommandAbortable(command, { cwd: workingDir, signal: ctx.signal, + onData: ctx.publish ? (chunk: string) => { + ctx.publish({ + runId: ctx.runId, + type: "tool-output-stream", + toolCallId: ctx.toolCallId, + toolName: "executeCommand", + output: chunk, + subflow: [], + }); + } : undefined, }); // Register process with abort registry for force-kill diff --git a/apps/x/packages/core/src/application/lib/command-executor.ts b/apps/x/packages/core/src/application/lib/command-executor.ts index 611bde45..0bfe0cb5 100644 --- a/apps/x/packages/core/src/application/lib/command-executor.ts +++ b/apps/x/packages/core/src/application/lib/command-executor.ts @@ -143,6 +143,7 @@ export function executeCommandAbortable( timeout?: number; maxBuffer?: number; signal?: AbortSignal; + onData?: (chunk: string) => void; } ): { promise: Promise; process: ChildProcess } { // Check if already aborted before spawning @@ -176,16 +177,20 @@ export function executeCommandAbortable( // Collect output proc.stdout?.on('data', (chunk: Buffer) => { + const text = chunk.toString(); const maxBuffer = options?.maxBuffer || 1024 * 1024; if (stdout.length < maxBuffer) { - stdout += chunk.toString(); + stdout += text; } + options?.onData?.(text); }); proc.stderr?.on('data', (chunk: Buffer) => { + const text = chunk.toString(); const maxBuffer = options?.maxBuffer || 1024 * 1024; if (stderr.length < maxBuffer) { - stderr += chunk.toString(); + stderr += text; } + options?.onData?.(text); }); // Abort handler diff --git a/apps/x/packages/core/src/application/lib/exec-tool.ts b/apps/x/packages/core/src/application/lib/exec-tool.ts index 09983402..909cae65 100644 --- a/apps/x/packages/core/src/application/lib/exec-tool.ts +++ b/apps/x/packages/core/src/application/lib/exec-tool.ts @@ -1,4 +1,5 @@ import { ToolAttachment } from "@x/shared/dist/agent.js"; +import { RunEvent } from "@x/shared/dist/runs.js"; import { z } from "zod"; import { BuiltinTools } from "./builtin-tools.js"; import { executeTool } from "../../mcp/mcp.js"; @@ -9,8 +10,10 @@ import { IAbortRegistry } from "../../runs/abort-registry.js"; */ export interface ToolContext { runId: string; + toolCallId: string; signal: AbortSignal; abortRegistry: IAbortRegistry; + publish: (event: z.infer) => Promise; } async function execMcpTool(agentTool: z.infer & { type: "mcp" }, input: Record): Promise { diff --git a/apps/x/packages/shared/src/runs.ts b/apps/x/packages/shared/src/runs.ts index 5f52f611..d8f4c3b6 100644 --- a/apps/x/packages/shared/src/runs.ts +++ b/apps/x/packages/shared/src/runs.ts @@ -81,6 +81,13 @@ export const RunErrorEvent = BaseRunEvent.extend({ error: z.string(), }); +export const ToolOutputStreamEvent = BaseRunEvent.extend({ + type: z.literal("tool-output-stream"), + toolCallId: z.string(), + toolName: z.string(), + output: z.string(), +}); + export const RunStoppedEvent = BaseRunEvent.extend({ type: z.literal("run-stopped"), reason: z.enum(["user-requested", "force-stopped"]).optional(), @@ -95,6 +102,7 @@ export const RunEvent = z.union([ MessageEvent, ToolInvocationEvent, ToolResultEvent, + ToolOutputStreamEvent, AskHumanRequestEvent, AskHumanResponseEvent, ToolPermissionRequestEvent,
+ {item.streamingOutput} +