diff --git a/apps/x/apps/main/src/ipc.ts b/apps/x/apps/main/src/ipc.ts index 01644e90..be662f29 100644 --- a/apps/x/apps/main/src/ipc.ts +++ b/apps/x/apps/main/src/ipc.ts @@ -295,7 +295,7 @@ export function setupIpcHandlers() { return { success: true }; }, 'runs:stop': async (_event, args) => { - await runsCore.stop(args.runId); + await runsCore.stop(args.runId, args.force); return { success: true }; }, 'runs:fetch': async (_event, args) => { diff --git a/apps/x/apps/renderer/src/App.tsx b/apps/x/apps/renderer/src/App.tsx index d1a6f1fe..69576938 100644 --- a/apps/x/apps/renderer/src/App.tsx +++ b/apps/x/apps/renderer/src/App.tsx @@ -6,7 +6,7 @@ import type { LanguageModelUsage, ToolUIPart } from 'ai'; import './App.css' import z from 'zod'; import { Button } from './components/ui/button'; -import { CheckIcon, LoaderIcon, ArrowUp, PanelRightIcon, SquarePen } from 'lucide-react'; +import { CheckIcon, LoaderIcon, ArrowUp, PanelRightIcon, SquarePen, Square } from 'lucide-react'; import { cn } from '@/lib/utils'; import { MarkdownEditor } from './components/markdown-editor'; import { ChatInputBar } from './components/chat-button'; @@ -279,7 +279,9 @@ const collectFilePaths = (nodes: TreeNode[]): string[] => // Inner component that uses the controller to access mentions interface ChatInputInnerProps { onSubmit: (message: PromptInputMessage, mentions?: FileMention[]) => void + onStop?: () => void isProcessing: boolean + isStopping?: boolean presetMessage?: string onPresetMessageConsumed?: () => void runId?: string | null @@ -287,7 +289,9 @@ interface ChatInputInnerProps { function ChatInputInner({ onSubmit, + onStop, isProcessing, + isStopping, presetMessage, onPresetMessageConsumed, runId, @@ -327,19 +331,39 @@ function ChatInputInner({ focusTrigger={runId} className="min-h-6 py-0 border-0 shadow-none focus-visible:ring-0 rounded-none" /> - + {isProcessing ? ( + + ) : ( + + )} ) } @@ -350,7 +374,9 @@ interface ChatInputWithMentionsProps { recentFiles: string[] visibleFiles: string[] onSubmit: (message: PromptInputMessage, mentions?: FileMention[]) => void + onStop?: () => void isProcessing: boolean + isStopping?: boolean presetMessage?: string onPresetMessageConsumed?: () => void runId?: string | null @@ -361,7 +387,9 @@ function ChatInputWithMentions({ recentFiles, visibleFiles, onSubmit, + onStop, isProcessing, + isStopping, presetMessage, onPresetMessageConsumed, runId, @@ -370,7 +398,9 @@ function ChatInputWithMentions({ (null) const runIdRef = useRef(null) const [isProcessing, setIsProcessing] = useState(false) + const [isStopping, setIsStopping] = useState(false) + const [stopClickedAt, setStopClickedAt] = useState(null) const [agentId] = useState('copilot') const [presetMessage, setPresetMessage] = useState(undefined) @@ -758,6 +790,8 @@ function App() { case 'run-processing-end': setIsProcessing(false) + setIsStopping(false) + setStopClickedAt(null) break case 'start': @@ -936,8 +970,32 @@ function App() { break } + case 'run-stopped': + setIsProcessing(false) + setIsStopping(false) + setStopClickedAt(null) + // Clear pending requests since they've been aborted + setPendingPermissionRequests(new Map()) + setPendingAskHumanRequests(new Map()) + // Flush any streaming content as a message + setCurrentAssistantMessage(currentMsg => { + if (currentMsg) { + setConversation(prev => [...prev, { + id: `assistant-stopped-${Date.now()}`, + role: 'assistant', + content: currentMsg, + timestamp: Date.now(), + }]) + } + return '' + }) + setCurrentReasoning('') + break + case 'error': setIsProcessing(false) + setIsStopping(false) + setStopClickedAt(null) console.error('Run error:', event.error) break } @@ -1009,6 +1067,21 @@ function App() { } } + const handleStop = useCallback(async () => { + if (!runId) return + const now = Date.now() + const isForce = isStopping && stopClickedAt !== null && (now - stopClickedAt) < 2000 + + setStopClickedAt(now) + setIsStopping(true) + + try { + await window.ipc.invoke('runs:stop', { runId, force: isForce }) + } catch (error) { + console.error('Failed to stop run:', error) + } + }, [runId, isStopping, stopClickedAt]) + const handlePermissionResponse = useCallback(async (toolCallId: string, subflow: string[], response: 'approve' | 'deny') => { if (!runId) return @@ -1779,7 +1852,9 @@ function App() { recentFiles={recentWikiFiles} visibleFiles={visibleKnowledgeFiles} onSubmit={handlePromptSubmit} + onStop={handleStop} isProcessing={isProcessing} + isStopping={isStopping} presetMessage={presetMessage} onPresetMessageConsumed={() => setPresetMessage(undefined)} runId={runId} @@ -1801,6 +1876,8 @@ function App() { currentAssistantMessage={currentAssistantMessage} currentReasoning={currentReasoning} isProcessing={isProcessing} + isStopping={isStopping} + onStop={handleStop} message={message} onMessageChange={setMessage} onSubmit={handlePromptSubmit} diff --git a/apps/x/apps/renderer/src/components/chat-sidebar.tsx b/apps/x/apps/renderer/src/components/chat-sidebar.tsx index 4cddfdf0..4c97d581 100644 --- a/apps/x/apps/renderer/src/components/chat-sidebar.tsx +++ b/apps/x/apps/renderer/src/components/chat-sidebar.tsx @@ -1,5 +1,5 @@ import { useCallback, useEffect, useMemo, useRef, useState } from 'react' -import { ArrowUp, Expand, Plus } from 'lucide-react' +import { ArrowUp, Expand, LoaderIcon, Plus, Square } from 'lucide-react' import type { ToolUIPart } from 'ai' import { Button } from '@/components/ui/button' import { cn } from '@/lib/utils' @@ -115,6 +115,8 @@ interface ChatSidebarProps { currentAssistantMessage: string currentReasoning: string isProcessing: boolean + isStopping?: boolean + onStop?: () => void message: string onMessageChange: (message: string) => void onSubmit: (message: PromptInputMessage, mentions?: FileMention[]) => void @@ -139,6 +141,8 @@ export function ChatSidebar({ currentAssistantMessage, currentReasoning, isProcessing, + isStopping, + onStop, message, onMessageChange, onSubmit, @@ -595,19 +599,39 @@ export function ChatSidebar({ style={{ fieldSizing: 'content' } as React.CSSProperties} /> - + {isProcessing ? ( + + ) : ( + + )} {knowledgeFiles.length > 0 && ( { @@ -68,6 +73,7 @@ export class AgentRuntime implements IAgentRuntime { console.log(`unable to acquire lock on run ${runId}`); return; } + const signal = this.abortRegistry.createForRun(runId); try { await this.bus.publish({ runId, @@ -75,6 +81,11 @@ export class AgentRuntime implements IAgentRuntime { subflow: [], }); while (true) { + // Check for abort before each iteration + if (signal.aborted) { + break; + } + let eventCount = 0; const run = await this.runsRepo.fetch(runId); if (!run) { @@ -84,18 +95,28 @@ export class AgentRuntime implements IAgentRuntime { for (const event of run.log) { state.ingest(event); } - for await (const event of streamAgent({ - state, - idGenerator: this.idGenerator, - runId, - messageQueue: this.messageQueue, - modelConfigRepo: this.modelConfigRepo, - })) { - eventCount++; - if (event.type !== "llm-stream-event") { - await this.runsRepo.appendEvents(runId, [event]); + try { + for await (const event of streamAgent({ + state, + idGenerator: this.idGenerator, + runId, + messageQueue: this.messageQueue, + modelConfigRepo: this.modelConfigRepo, + signal, + abortRegistry: this.abortRegistry, + })) { + eventCount++; + if (event.type !== "llm-stream-event") { + await this.runsRepo.appendEvents(runId, [event]); + } + await this.bus.publish(event); } - await this.bus.publish(event); + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + // Abort detected — exit cleanly + break; + } + throw error; } // if no events, break @@ -103,7 +124,20 @@ export class AgentRuntime implements IAgentRuntime { break; } } + + // Emit run-stopped event if aborted + if (signal.aborted) { + const stoppedEvent: z.infer = { + runId, + type: "run-stopped", + reason: "user-requested", + subflow: [], + }; + await this.runsRepo.appendEvents(runId, [stoppedEvent]); + await this.bus.publish(stoppedEvent); + } } finally { + this.abortRegistry.cleanup(runId); await this.runsLock.release(runId); await this.bus.publish({ runId, @@ -428,6 +462,39 @@ export class AgentState { return response; } + /** + * Returns tool-result messages for all pending tool calls, marking them as aborted. + * This is called when a run is stopped so the LLM knows what happened to its tool requests. + */ + getAbortedToolResults(): z.infer[] { + const results: z.infer[] = []; + for (const toolCallId of Object.keys(this.pendingToolCalls)) { + const toolCall = this.toolCallIdMap[toolCallId]; + if (toolCall) { + results.push({ + role: "tool", + content: JSON.stringify({ error: "Tool execution aborted" }), + toolCallId, + toolName: toolCall.toolName, + }); + } + } + return results; + } + + /** + * Clear all pending state (permissions, ask-human, tool calls). + * Used when a run is stopped. + */ + clearAllPending(): void { + this.pendingToolPermissionRequests = {}; + this.pendingAskHumanRequests = {}; + // Recursively clear subflows + for (const subflow of Object.values(this.subflowStates)) { + subflow.clearAllPending(); + } + } + finalResponse(): string { if (!this.lastAssistantMsg) { return ''; @@ -526,12 +593,16 @@ export async function* streamAgent({ runId, messageQueue, modelConfigRepo, + signal, + abortRegistry, }: { state: AgentState, idGenerator: IMonotonicallyIncreasingIdGenerator; runId: string; messageQueue: IMessageQueue; modelConfigRepo: IModelConfigRepo; + signal: AbortSignal; + abortRegistry: IAbortRegistry; }): AsyncGenerator, void, unknown> { const logger = new PrefixLogger(`run-${runId}-${state.agentName}`); @@ -557,6 +628,9 @@ export async function* streamAgent({ let loopCounter = 0; while (true) { + // Check abort at the top of each iteration + signal.throwIfAborted(); + loopCounter++; const loopLogger = logger.child(`iter-${loopCounter}`); loopLogger.log('starting loop iteration'); @@ -598,6 +672,11 @@ export async function* streamAgent({ } // execute approved tool + // Check abort before starting tool execution + if (signal.aborted) { + _logger.log('skipping, reason: aborted'); + break; + } _logger.log('executing tool'); yield* processEvent({ runId, @@ -616,6 +695,8 @@ export async function* streamAgent({ runId, messageQueue, modelConfigRepo, + signal, + abortRegistry, })) { yield* processEvent({ ...event, @@ -626,7 +707,7 @@ export async function* streamAgent({ result = subflowState.finalResponse(); } } else { - result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments); + result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments, { runId, signal, abortRegistry }); } const resultPayload = result === undefined ? null : result; const resultMsg: z.infer = { @@ -709,6 +790,7 @@ export async function* streamAgent({ state.messages, instructionsWithDateTime, tools, + signal, )) { // Only log significant events (not text-delta to reduce noise) if (event.type !== 'text-delta') { @@ -791,6 +873,7 @@ async function* streamLlm( messages: z.infer, instructions: string, tools: ToolSet, + signal?: AbortSignal, ): AsyncGenerator, void, unknown> { const { fullStream } = streamText({ model, @@ -798,8 +881,11 @@ async function* streamLlm( system: instructions, tools, stopWhen: stepCountIs(1), + abortSignal: signal, }); for await (const event of fullStream) { + // Check abort on every chunk for responsiveness + signal?.throwIfAborted(); // console.log("\n\n\t>>>>\t\tstream event", JSON.stringify(event)); switch (event.type) { case "reasoning-start": diff --git a/apps/x/packages/core/src/application/lib/builtin-tools.ts b/apps/x/packages/core/src/application/lib/builtin-tools.ts index 2fbf0c10..c6ac1475 100644 --- a/apps/x/packages/core/src/application/lib/builtin-tools.ts +++ b/apps/x/packages/core/src/application/lib/builtin-tools.ts @@ -2,7 +2,7 @@ import { z, ZodType } from "zod"; import * as path from "path"; import { execSync } from "child_process"; import { glob } from "glob"; -import { executeCommand } from "./command-executor.js"; +import { executeCommand, executeCommandAbortable } from "./command-executor.js"; import { resolveSkill, availableSkills } from "../assistant/skills/index.js"; import { executeTool, listServers, listTools } from "../../mcp/mcp.js"; import container from "../../di/container.js"; @@ -11,13 +11,14 @@ import { McpServerDefinition } from "@x/shared/dist/mcp.js"; import * as workspace from "../../workspace/workspace.js"; import { IAgentsRepo } from "../../agents/repo.js"; import { WorkDir } from "../../config/config.js"; +import type { ToolContext } from "./exec-tool.js"; // eslint-disable-next-line @typescript-eslint/no-unused-vars const BuiltinToolsSchema = z.record(z.string(), z.object({ description: z.string(), inputSchema: z.custom(), execute: z.function({ - input: z.any(), + input: z.any(), // (input, ctx?) => Promise output: z.promise(z.any()), }), })); @@ -611,15 +612,15 @@ export const BuiltinTools: z.infer = { command: z.string().describe('The shell command to execute (e.g., "ls -la", "cat file.txt")'), cwd: z.string().optional().describe('Working directory to execute the command in (defaults to workspace root)'), }), - execute: async ({ command, cwd }: { command: string, cwd?: string }) => { + execute: async ({ command, cwd }: { command: string, cwd?: string }, ctx?: ToolContext) => { try { const rootDir = path.resolve(WorkDir); const workingDir = cwd ? path.resolve(rootDir, cwd) : rootDir; - const rootPrefix = rootDir.endsWith(path.sep) - ? rootDir - : `${rootDir}${path.sep}`; // TODO: Re-enable this check + // const rootPrefix = rootDir.endsWith(path.sep) + // ? rootDir + // : `${rootDir}${path.sep}`; // if (workingDir !== rootDir && !workingDir.startsWith(rootPrefix)) { // return { // success: false, @@ -629,8 +630,32 @@ export const BuiltinTools: z.infer = { // }; // } + // Use abortable version when we have a signal + if (ctx?.signal) { + const { promise, process: proc } = executeCommandAbortable(command, { + cwd: workingDir, + signal: ctx.signal, + }); + + // Register process with abort registry for force-kill + ctx.abortRegistry.registerProcess(ctx.runId, proc); + + const result = await promise; + + return { + success: result.exitCode === 0 && !result.wasAborted, + stdout: result.stdout, + stderr: result.stderr, + exitCode: result.exitCode, + wasAborted: result.wasAborted, + command, + workingDir, + }; + } + + // Fallback to original for backward compatibility const result = await executeCommand(command, { cwd: workingDir }); - + return { success: result.exitCode === 0, stdout: result.stdout, diff --git a/apps/x/packages/core/src/application/lib/command-executor.ts b/apps/x/packages/core/src/application/lib/command-executor.ts index 01be2ee7..e865e98d 100644 --- a/apps/x/packages/core/src/application/lib/command-executor.ts +++ b/apps/x/packages/core/src/application/lib/command-executor.ts @@ -1,4 +1,4 @@ -import { exec, execSync } from 'child_process'; +import { exec, execSync, spawn, ChildProcess } from 'child_process'; import { promisify } from 'util'; import { getSecurityAllowList } from '../../config/security.js'; @@ -110,6 +110,159 @@ export async function executeCommand( } } +export interface AbortableCommandResult extends CommandResult { + wasAborted: boolean; +} + +const SIGKILL_GRACE_MS = 200; + +/** + * Kill a process tree using negative PID (process group kill on Unix). + * Falls back to direct kill if group kill fails. + */ +function killProcessTree(proc: ChildProcess, signal: NodeJS.Signals): void { + if (!proc.pid || proc.killed) return; + try { + // Negative PID kills the entire process group (Unix) + process.kill(-proc.pid, signal); + } catch { + try { + proc.kill(signal); + } catch { + // Process may already be dead + } + } +} + +/** + * Executes a shell command with abort support. + * Uses spawn with detached=true to create a process group for proper tree killing. + * Returns both the promise and the child process handle. + */ +export function executeCommandAbortable( + command: string, + options?: { + cwd?: string; + timeout?: number; + maxBuffer?: number; + signal?: AbortSignal; + } +): { promise: Promise; process: ChildProcess } { + // Check if already aborted before spawning + if (options?.signal?.aborted) { + // Return a dummy process and a resolved result + const dummyProc = spawn('true', { shell: true }); + dummyProc.kill(); + return { + process: dummyProc, + promise: Promise.resolve({ + stdout: '', + stderr: '', + exitCode: 130, + wasAborted: true, + }), + }; + } + + const proc = spawn(command, [], { + shell: '/bin/sh', + cwd: options?.cwd, + detached: process.platform !== 'win32', // Create process group on Unix + stdio: ['ignore', 'pipe', 'pipe'], + }); + + const promise = new Promise((resolve) => { + let stdout = ''; + let stderr = ''; + let wasAborted = false; + let exited = false; + + // Collect output + proc.stdout?.on('data', (chunk: Buffer) => { + const maxBuffer = options?.maxBuffer || 1024 * 1024; + if (stdout.length < maxBuffer) { + stdout += chunk.toString(); + } + }); + proc.stderr?.on('data', (chunk: Buffer) => { + const maxBuffer = options?.maxBuffer || 1024 * 1024; + if (stderr.length < maxBuffer) { + stderr += chunk.toString(); + } + }); + + // Abort handler + const abortHandler = () => { + wasAborted = true; + killProcessTree(proc, 'SIGTERM'); + // Force kill after grace period + setTimeout(() => { + if (!exited) { + killProcessTree(proc, 'SIGKILL'); + } + }, SIGKILL_GRACE_MS); + }; + + if (options?.signal) { + options.signal.addEventListener('abort', abortHandler, { once: true }); + } + + // Timeout handler + let timeoutId: ReturnType | undefined; + if (options?.timeout) { + timeoutId = setTimeout(() => { + wasAborted = true; + killProcessTree(proc, 'SIGTERM'); + setTimeout(() => { + if (!exited) { + killProcessTree(proc, 'SIGKILL'); + } + }, SIGKILL_GRACE_MS); + }, options.timeout); + } + + proc.once('exit', (code) => { + exited = true; + // Cleanup listeners + if (options?.signal) { + options.signal.removeEventListener('abort', abortHandler); + } + if (timeoutId) { + clearTimeout(timeoutId); + } + + if (wasAborted) { + stdout += '\n\n(Command was aborted)'; + } + + resolve({ + stdout: stdout.trim(), + stderr: stderr.trim(), + exitCode: code ?? 1, + wasAborted, + }); + }); + + proc.once('error', (err) => { + exited = true; + if (options?.signal) { + options.signal.removeEventListener('abort', abortHandler); + } + if (timeoutId) { + clearTimeout(timeoutId); + } + resolve({ + stdout: '', + stderr: err.message, + exitCode: 1, + wasAborted, + }); + }); + }); + + return { promise, process: proc }; +} + /** * Executes a command synchronously (blocking) * Use with caution - prefer executeCommand for async execution diff --git a/apps/x/packages/core/src/application/lib/exec-tool.ts b/apps/x/packages/core/src/application/lib/exec-tool.ts index fbe46f57..09983402 100644 --- a/apps/x/packages/core/src/application/lib/exec-tool.ts +++ b/apps/x/packages/core/src/application/lib/exec-tool.ts @@ -2,22 +2,36 @@ import { ToolAttachment } from "@x/shared/dist/agent.js"; import { z } from "zod"; import { BuiltinTools } from "./builtin-tools.js"; import { executeTool } from "../../mcp/mcp.js"; +import { IAbortRegistry } from "../../runs/abort-registry.js"; + +/** + * Context passed to every tool execution, providing abort signal and run metadata. + */ +export interface ToolContext { + runId: string; + signal: AbortSignal; + abortRegistry: IAbortRegistry; +} async function execMcpTool(agentTool: z.infer & { type: "mcp" }, input: Record): Promise { const result = await executeTool(agentTool.mcpServerName, agentTool.name, input); return result; } -export async function execTool(agentTool: z.infer, input: Record): Promise { +export async function execTool(agentTool: z.infer, input: Record, ctx?: ToolContext): Promise { + // Check abort before starting any tool + ctx?.signal.throwIfAborted(); + switch (agentTool.type) { case "mcp": + // MCP tools: let complete on graceful stop (most are fast) return execMcpTool(agentTool, input); case "builtin": { const builtinTool = BuiltinTools[agentTool.name]; if (!builtinTool || !builtinTool.execute) { throw new Error(`Unsupported builtin tool: ${agentTool.name}`); } - return builtinTool.execute(input); + return builtinTool.execute(input, ctx); } } } \ No newline at end of file diff --git a/apps/x/packages/core/src/di/container.ts b/apps/x/packages/core/src/di/container.ts index 2e85d34a..2b3fd2d7 100644 --- a/apps/x/packages/core/src/di/container.ts +++ b/apps/x/packages/core/src/di/container.ts @@ -11,6 +11,7 @@ import { IAgentRuntime, AgentRuntime } from "../agents/runtime.js"; import { FSOAuthRepo, IOAuthRepo } from "../auth/repo.js"; import { FSClientRegistrationRepo, IClientRegistrationRepo } from "../auth/client-repo.js"; import { FSGranolaConfigRepo, IGranolaConfigRepo } from "../knowledge/granola/repo.js"; +import { IAbortRegistry, InMemoryAbortRegistry } from "../runs/abort-registry.js"; const container = createContainer({ injectionMode: InjectionMode.PROXY, @@ -22,6 +23,7 @@ container.register({ messageQueue: asClass(InMemoryMessageQueue).singleton(), bus: asClass(InMemoryBus).singleton(), runsLock: asClass(InMemoryRunsLock).singleton(), + abortRegistry: asClass(InMemoryAbortRegistry).singleton(), agentRuntime: asClass(AgentRuntime).singleton(), mcpConfigRepo: asClass(FSMcpConfigRepo).singleton(), diff --git a/apps/x/packages/core/src/mcp/mcp.ts b/apps/x/packages/core/src/mcp/mcp.ts index 4cf664d9..22c8740b 100644 --- a/apps/x/packages/core/src/mcp/mcp.ts +++ b/apps/x/packages/core/src/mcp/mcp.ts @@ -84,6 +84,22 @@ export async function cleanup() { } } +/** + * Force-close all MCP client connections. + * Used during force abort to immediately reject any pending MCP tool calls. + * Clients will be lazily reconnected on next use. + */ +export async function forceCloseAllMcpClients(): Promise { + for (const [serverName, { client }] of Object.entries(clients)) { + try { + await client?.close(); + } catch { + // Ignore errors during force close + } + delete clients[serverName]; + } +} + export async function listServers(): Promise> { const repo = container.resolve('mcpConfigRepo'); const { mcpServers } = await repo.getConfig(); diff --git a/apps/x/packages/core/src/runs/abort-registry.ts b/apps/x/packages/core/src/runs/abort-registry.ts new file mode 100644 index 00000000..7d3ca334 --- /dev/null +++ b/apps/x/packages/core/src/runs/abort-registry.ts @@ -0,0 +1,170 @@ +import { ChildProcess } from "child_process"; + +export interface IAbortRegistry { + /** + * Create and track an AbortController for a run. + * Returns the AbortSignal to thread through all operations. + */ + createForRun(runId: string): AbortSignal; + + /** + * Track a child process for a run (so we can kill it on abort). + */ + registerProcess(runId: string, process: ChildProcess): void; + + /** + * Untrack a child process after it exits. + */ + unregisterProcess(runId: string, process: ChildProcess): void; + + /** + * Graceful abort: + * 1. Fires the AbortSignal (cancels LLM streaming, etc.) + * 2. Sends SIGTERM to all tracked process groups + * 3. Schedules SIGKILL fallback after grace period + */ + abort(runId: string): void; + + /** + * Force abort: + * 1. Fires AbortSignal if not already fired + * 2. Sends SIGKILL to all tracked process groups immediately + */ + forceAbort(runId: string): void; + + /** + * Check if a run has been aborted. + */ + isAborted(runId: string): boolean; + + /** + * Clean up tracking state after a run completes or is fully stopped. + */ + cleanup(runId: string): void; +} + +interface RunAbortState { + controller: AbortController; + processes: Set; + killTimers: Set>; +} + +const SIGKILL_GRACE_MS = 200; + +export class InMemoryAbortRegistry implements IAbortRegistry { + private runs: Map = new Map(); + + createForRun(runId: string): AbortSignal { + // If a previous run state exists, clean it up first + this.cleanup(runId); + + const state: RunAbortState = { + controller: new AbortController(), + processes: new Set(), + killTimers: new Set(), + }; + this.runs.set(runId, state); + return state.controller.signal; + } + + registerProcess(runId: string, process: ChildProcess): void { + const state = this.runs.get(runId); + if (!state) return; + state.processes.add(process); + + // Auto-unregister when process exits + const onExit = () => { + state.processes.delete(process); + }; + process.once("exit", onExit); + process.once("error", onExit); + } + + unregisterProcess(runId: string, process: ChildProcess): void { + const state = this.runs.get(runId); + if (!state) return; + state.processes.delete(process); + } + + abort(runId: string): void { + const state = this.runs.get(runId); + if (!state) return; + + // 1. Fire the abort signal + if (!state.controller.signal.aborted) { + state.controller.abort(); + } + + // 2. SIGTERM all tracked process groups + for (const proc of state.processes) { + this.killProcessTree(proc, "SIGTERM"); + + // 3. Schedule SIGKILL fallback + const timer = setTimeout(() => { + if (!proc.killed) { + this.killProcessTree(proc, "SIGKILL"); + } + state.killTimers.delete(timer); + }, SIGKILL_GRACE_MS); + state.killTimers.add(timer); + } + } + + forceAbort(runId: string): void { + const state = this.runs.get(runId); + if (!state) return; + + // 1. Fire abort signal if not already + if (!state.controller.signal.aborted) { + state.controller.abort(); + } + + // 2. Clear any pending graceful kill timers + for (const timer of state.killTimers) { + clearTimeout(timer); + } + state.killTimers.clear(); + + // 3. SIGKILL all tracked process groups immediately + for (const proc of state.processes) { + this.killProcessTree(proc, "SIGKILL"); + } + } + + isAborted(runId: string): boolean { + const state = this.runs.get(runId); + return state?.controller.signal.aborted ?? false; + } + + cleanup(runId: string): void { + const state = this.runs.get(runId); + if (!state) return; + + // Clear any pending kill timers + for (const timer of state.killTimers) { + clearTimeout(timer); + } + + this.runs.delete(runId); + } + + /** + * Kill a process tree using negative PID (process group kill on Unix). + * Falls back to direct kill if group kill fails. + */ + private killProcessTree(proc: ChildProcess, signal: NodeJS.Signals): void { + if (!proc.pid || proc.killed) return; + + try { + // Negative PID kills the entire process group (Unix) + process.kill(-proc.pid, signal); + } catch { + // Fallback: kill just the process directly + try { + proc.kill(signal); + } catch { + // Process may already be dead + } + } + } +} diff --git a/apps/x/packages/core/src/runs/runs.ts b/apps/x/packages/core/src/runs/runs.ts index 7d7a8ebb..80ffc80f 100644 --- a/apps/x/packages/core/src/runs/runs.ts +++ b/apps/x/packages/core/src/runs/runs.ts @@ -5,6 +5,8 @@ import { AskHumanResponseEvent, ToolPermissionResponseEvent, CreateRunOptions, R import { IRunsRepo } from "./repo.js"; import { IAgentRuntime } from "../agents/runtime.js"; import { IBus } from "../application/lib/bus.js"; +import { IAbortRegistry } from "./abort-registry.js"; +import { forceCloseAllMcpClients } from "../mcp/mcp.js"; export async function createRun(opts: z.infer): Promise> { const repo = container.resolve('runsRepo'); @@ -46,9 +48,21 @@ export async function replyToHumanInputRequest(runId: string, ev: z.infer { - console.log(`Stopping run ${runId}`); - throw new Error('Not implemented'); +export async function stop(runId: string, force: boolean = false): Promise { + const abortRegistry = container.resolve('abortRegistry'); + + if (force && abortRegistry.isAborted(runId)) { + // Second click: aggressive cleanup — SIGKILL + force close MCP clients + console.log(`Force stopping run ${runId}`); + abortRegistry.forceAbort(runId); + await forceCloseAllMcpClients(); + } else { + // First click: graceful — fires AbortSignal + SIGTERM + console.log(`Gracefully stopping run ${runId}`); + abortRegistry.abort(runId); + } + // Note: The run-stopped event is emitted by AgentRuntime.trigger() when it detects the abort. + // This avoids duplicate events and ensures proper sequencing. } export async function fetchRun(runId: string): Promise> { diff --git a/apps/x/packages/shared/src/ipc.ts b/apps/x/packages/shared/src/ipc.ts index 93b797a9..ca69646a 100644 --- a/apps/x/packages/shared/src/ipc.ts +++ b/apps/x/packages/shared/src/ipc.ts @@ -151,6 +151,7 @@ const ipcSchemas = { 'runs:stop': { req: z.object({ runId: z.string(), + force: z.boolean().optional().default(false), }), res: z.object({ success: z.literal(true), diff --git a/apps/x/packages/shared/src/runs.ts b/apps/x/packages/shared/src/runs.ts index 429d827b..eccfb6a7 100644 --- a/apps/x/packages/shared/src/runs.ts +++ b/apps/x/packages/shared/src/runs.ts @@ -80,6 +80,11 @@ export const RunErrorEvent = BaseRunEvent.extend({ error: z.string(), }); +export const RunStoppedEvent = BaseRunEvent.extend({ + type: z.literal("run-stopped"), + reason: z.enum(["user-requested", "force-stopped"]).optional(), +}); + export const RunEvent = z.union([ RunProcessingStartEvent, RunProcessingEndEvent, @@ -94,6 +99,7 @@ export const RunEvent = z.union([ ToolPermissionRequestEvent, ToolPermissionResponseEvent, RunErrorEvent, + RunStoppedEvent, ]); export const ToolPermissionAuthorizePayload = ToolPermissionResponseEvent.pick({