feat: add stop execution with hybrid graceful/force abort

Implement a stop execution feature that allows users to abort ongoing LLM
streaming, kill running tool calls, and clear pending permission/human input
requests. Uses a hybrid approach: first click sends graceful SIGTERM, second
click within 2s sends SIGKILL and force-closes MCP clients.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ramnique Singh 2026-01-30 06:53:50 +05:30
parent 9828ea8301
commit a3e681a7c4
13 changed files with 642 additions and 54 deletions

View file

@ -22,6 +22,7 @@ import { IBus } from "../application/lib/bus.js";
import { IMessageQueue } from "../application/lib/message-queue.js";
import { IRunsRepo } from "../runs/repo.js";
import { IRunsLock } from "../runs/lock.js";
import { IAbortRegistry } from "../runs/abort-registry.js";
import { PrefixLogger } from "@x/shared";
import { parse } from "yaml";
import { raw as noteCreationMediumRaw } from "../knowledge/note_creation_medium.js";
@ -39,6 +40,7 @@ export class AgentRuntime implements IAgentRuntime {
private messageQueue: IMessageQueue;
private modelConfigRepo: IModelConfigRepo;
private runsLock: IRunsLock;
private abortRegistry: IAbortRegistry;
constructor({
runsRepo,
@ -47,6 +49,7 @@ export class AgentRuntime implements IAgentRuntime {
messageQueue,
modelConfigRepo,
runsLock,
abortRegistry,
}: {
runsRepo: IRunsRepo;
idGenerator: IMonotonicallyIncreasingIdGenerator;
@ -54,6 +57,7 @@ export class AgentRuntime implements IAgentRuntime {
messageQueue: IMessageQueue;
modelConfigRepo: IModelConfigRepo;
runsLock: IRunsLock;
abortRegistry: IAbortRegistry;
}) {
this.runsRepo = runsRepo;
this.idGenerator = idGenerator;
@ -61,6 +65,7 @@ export class AgentRuntime implements IAgentRuntime {
this.messageQueue = messageQueue;
this.modelConfigRepo = modelConfigRepo;
this.runsLock = runsLock;
this.abortRegistry = abortRegistry;
}
async trigger(runId: string): Promise<void> {
@ -68,6 +73,7 @@ export class AgentRuntime implements IAgentRuntime {
console.log(`unable to acquire lock on run ${runId}`);
return;
}
const signal = this.abortRegistry.createForRun(runId);
try {
await this.bus.publish({
runId,
@ -75,6 +81,11 @@ export class AgentRuntime implements IAgentRuntime {
subflow: [],
});
while (true) {
// Check for abort before each iteration
if (signal.aborted) {
break;
}
let eventCount = 0;
const run = await this.runsRepo.fetch(runId);
if (!run) {
@ -84,18 +95,28 @@ export class AgentRuntime implements IAgentRuntime {
for (const event of run.log) {
state.ingest(event);
}
for await (const event of streamAgent({
state,
idGenerator: this.idGenerator,
runId,
messageQueue: this.messageQueue,
modelConfigRepo: this.modelConfigRepo,
})) {
eventCount++;
if (event.type !== "llm-stream-event") {
await this.runsRepo.appendEvents(runId, [event]);
try {
for await (const event of streamAgent({
state,
idGenerator: this.idGenerator,
runId,
messageQueue: this.messageQueue,
modelConfigRepo: this.modelConfigRepo,
signal,
abortRegistry: this.abortRegistry,
})) {
eventCount++;
if (event.type !== "llm-stream-event") {
await this.runsRepo.appendEvents(runId, [event]);
}
await this.bus.publish(event);
}
await this.bus.publish(event);
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
// Abort detected — exit cleanly
break;
}
throw error;
}
// if no events, break
@ -103,7 +124,20 @@ export class AgentRuntime implements IAgentRuntime {
break;
}
}
// Emit run-stopped event if aborted
if (signal.aborted) {
const stoppedEvent: z.infer<typeof RunEvent> = {
runId,
type: "run-stopped",
reason: "user-requested",
subflow: [],
};
await this.runsRepo.appendEvents(runId, [stoppedEvent]);
await this.bus.publish(stoppedEvent);
}
} finally {
this.abortRegistry.cleanup(runId);
await this.runsLock.release(runId);
await this.bus.publish({
runId,
@ -428,6 +462,39 @@ export class AgentState {
return response;
}
/**
* Returns tool-result messages for all pending tool calls, marking them as aborted.
* This is called when a run is stopped so the LLM knows what happened to its tool requests.
*/
getAbortedToolResults(): z.infer<typeof ToolMessage>[] {
const results: z.infer<typeof ToolMessage>[] = [];
for (const toolCallId of Object.keys(this.pendingToolCalls)) {
const toolCall = this.toolCallIdMap[toolCallId];
if (toolCall) {
results.push({
role: "tool",
content: JSON.stringify({ error: "Tool execution aborted" }),
toolCallId,
toolName: toolCall.toolName,
});
}
}
return results;
}
/**
* Clear all pending state (permissions, ask-human, tool calls).
* Used when a run is stopped.
*/
clearAllPending(): void {
this.pendingToolPermissionRequests = {};
this.pendingAskHumanRequests = {};
// Recursively clear subflows
for (const subflow of Object.values(this.subflowStates)) {
subflow.clearAllPending();
}
}
finalResponse(): string {
if (!this.lastAssistantMsg) {
return '';
@ -526,12 +593,16 @@ export async function* streamAgent({
runId,
messageQueue,
modelConfigRepo,
signal,
abortRegistry,
}: {
state: AgentState,
idGenerator: IMonotonicallyIncreasingIdGenerator;
runId: string;
messageQueue: IMessageQueue;
modelConfigRepo: IModelConfigRepo;
signal: AbortSignal;
abortRegistry: IAbortRegistry;
}): AsyncGenerator<z.infer<typeof RunEvent>, void, unknown> {
const logger = new PrefixLogger(`run-${runId}-${state.agentName}`);
@ -557,6 +628,9 @@ export async function* streamAgent({
let loopCounter = 0;
while (true) {
// Check abort at the top of each iteration
signal.throwIfAborted();
loopCounter++;
const loopLogger = logger.child(`iter-${loopCounter}`);
loopLogger.log('starting loop iteration');
@ -598,6 +672,11 @@ export async function* streamAgent({
}
// execute approved tool
// Check abort before starting tool execution
if (signal.aborted) {
_logger.log('skipping, reason: aborted');
break;
}
_logger.log('executing tool');
yield* processEvent({
runId,
@ -616,6 +695,8 @@ export async function* streamAgent({
runId,
messageQueue,
modelConfigRepo,
signal,
abortRegistry,
})) {
yield* processEvent({
...event,
@ -626,7 +707,7 @@ export async function* streamAgent({
result = subflowState.finalResponse();
}
} else {
result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments);
result = await execTool(agent.tools![toolCall.toolName], toolCall.arguments, { runId, signal, abortRegistry });
}
const resultPayload = result === undefined ? null : result;
const resultMsg: z.infer<typeof ToolMessage> = {
@ -709,6 +790,7 @@ export async function* streamAgent({
state.messages,
instructionsWithDateTime,
tools,
signal,
)) {
// Only log significant events (not text-delta to reduce noise)
if (event.type !== 'text-delta') {
@ -791,6 +873,7 @@ async function* streamLlm(
messages: z.infer<typeof MessageList>,
instructions: string,
tools: ToolSet,
signal?: AbortSignal,
): AsyncGenerator<z.infer<typeof LlmStepStreamEvent>, void, unknown> {
const { fullStream } = streamText({
model,
@ -798,8 +881,11 @@ async function* streamLlm(
system: instructions,
tools,
stopWhen: stepCountIs(1),
abortSignal: signal,
});
for await (const event of fullStream) {
// Check abort on every chunk for responsiveness
signal?.throwIfAborted();
// console.log("\n\n\t>>>>\t\tstream event", JSON.stringify(event));
switch (event.type) {
case "reasoning-start":

View file

@ -2,7 +2,7 @@ import { z, ZodType } from "zod";
import * as path from "path";
import { execSync } from "child_process";
import { glob } from "glob";
import { executeCommand } from "./command-executor.js";
import { executeCommand, executeCommandAbortable } from "./command-executor.js";
import { resolveSkill, availableSkills } from "../assistant/skills/index.js";
import { executeTool, listServers, listTools } from "../../mcp/mcp.js";
import container from "../../di/container.js";
@ -11,13 +11,14 @@ import { McpServerDefinition } from "@x/shared/dist/mcp.js";
import * as workspace from "../../workspace/workspace.js";
import { IAgentsRepo } from "../../agents/repo.js";
import { WorkDir } from "../../config/config.js";
import type { ToolContext } from "./exec-tool.js";
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const BuiltinToolsSchema = z.record(z.string(), z.object({
description: z.string(),
inputSchema: z.custom<ZodType>(),
execute: z.function({
input: z.any(),
input: z.any(), // (input, ctx?) => Promise<any>
output: z.promise(z.any()),
}),
}));
@ -611,15 +612,15 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
command: z.string().describe('The shell command to execute (e.g., "ls -la", "cat file.txt")'),
cwd: z.string().optional().describe('Working directory to execute the command in (defaults to workspace root)'),
}),
execute: async ({ command, cwd }: { command: string, cwd?: string }) => {
execute: async ({ command, cwd }: { command: string, cwd?: string }, ctx?: ToolContext) => {
try {
const rootDir = path.resolve(WorkDir);
const workingDir = cwd ? path.resolve(rootDir, cwd) : rootDir;
const rootPrefix = rootDir.endsWith(path.sep)
? rootDir
: `${rootDir}${path.sep}`;
// TODO: Re-enable this check
// const rootPrefix = rootDir.endsWith(path.sep)
// ? rootDir
// : `${rootDir}${path.sep}`;
// if (workingDir !== rootDir && !workingDir.startsWith(rootPrefix)) {
// return {
// success: false,
@ -629,8 +630,32 @@ export const BuiltinTools: z.infer<typeof BuiltinToolsSchema> = {
// };
// }
// Use abortable version when we have a signal
if (ctx?.signal) {
const { promise, process: proc } = executeCommandAbortable(command, {
cwd: workingDir,
signal: ctx.signal,
});
// Register process with abort registry for force-kill
ctx.abortRegistry.registerProcess(ctx.runId, proc);
const result = await promise;
return {
success: result.exitCode === 0 && !result.wasAborted,
stdout: result.stdout,
stderr: result.stderr,
exitCode: result.exitCode,
wasAborted: result.wasAborted,
command,
workingDir,
};
}
// Fallback to original for backward compatibility
const result = await executeCommand(command, { cwd: workingDir });
return {
success: result.exitCode === 0,
stdout: result.stdout,

View file

@ -1,4 +1,4 @@
import { exec, execSync } from 'child_process';
import { exec, execSync, spawn, ChildProcess } from 'child_process';
import { promisify } from 'util';
import { getSecurityAllowList } from '../../config/security.js';
@ -110,6 +110,159 @@ export async function executeCommand(
}
}
export interface AbortableCommandResult extends CommandResult {
wasAborted: boolean;
}
const SIGKILL_GRACE_MS = 200;
/**
* Kill a process tree using negative PID (process group kill on Unix).
* Falls back to direct kill if group kill fails.
*/
function killProcessTree(proc: ChildProcess, signal: NodeJS.Signals): void {
if (!proc.pid || proc.killed) return;
try {
// Negative PID kills the entire process group (Unix)
process.kill(-proc.pid, signal);
} catch {
try {
proc.kill(signal);
} catch {
// Process may already be dead
}
}
}
/**
* Executes a shell command with abort support.
* Uses spawn with detached=true to create a process group for proper tree killing.
* Returns both the promise and the child process handle.
*/
export function executeCommandAbortable(
command: string,
options?: {
cwd?: string;
timeout?: number;
maxBuffer?: number;
signal?: AbortSignal;
}
): { promise: Promise<AbortableCommandResult>; process: ChildProcess } {
// Check if already aborted before spawning
if (options?.signal?.aborted) {
// Return a dummy process and a resolved result
const dummyProc = spawn('true', { shell: true });
dummyProc.kill();
return {
process: dummyProc,
promise: Promise.resolve({
stdout: '',
stderr: '',
exitCode: 130,
wasAborted: true,
}),
};
}
const proc = spawn(command, [], {
shell: '/bin/sh',
cwd: options?.cwd,
detached: process.platform !== 'win32', // Create process group on Unix
stdio: ['ignore', 'pipe', 'pipe'],
});
const promise = new Promise<AbortableCommandResult>((resolve) => {
let stdout = '';
let stderr = '';
let wasAborted = false;
let exited = false;
// Collect output
proc.stdout?.on('data', (chunk: Buffer) => {
const maxBuffer = options?.maxBuffer || 1024 * 1024;
if (stdout.length < maxBuffer) {
stdout += chunk.toString();
}
});
proc.stderr?.on('data', (chunk: Buffer) => {
const maxBuffer = options?.maxBuffer || 1024 * 1024;
if (stderr.length < maxBuffer) {
stderr += chunk.toString();
}
});
// Abort handler
const abortHandler = () => {
wasAborted = true;
killProcessTree(proc, 'SIGTERM');
// Force kill after grace period
setTimeout(() => {
if (!exited) {
killProcessTree(proc, 'SIGKILL');
}
}, SIGKILL_GRACE_MS);
};
if (options?.signal) {
options.signal.addEventListener('abort', abortHandler, { once: true });
}
// Timeout handler
let timeoutId: ReturnType<typeof setTimeout> | undefined;
if (options?.timeout) {
timeoutId = setTimeout(() => {
wasAborted = true;
killProcessTree(proc, 'SIGTERM');
setTimeout(() => {
if (!exited) {
killProcessTree(proc, 'SIGKILL');
}
}, SIGKILL_GRACE_MS);
}, options.timeout);
}
proc.once('exit', (code) => {
exited = true;
// Cleanup listeners
if (options?.signal) {
options.signal.removeEventListener('abort', abortHandler);
}
if (timeoutId) {
clearTimeout(timeoutId);
}
if (wasAborted) {
stdout += '\n\n(Command was aborted)';
}
resolve({
stdout: stdout.trim(),
stderr: stderr.trim(),
exitCode: code ?? 1,
wasAborted,
});
});
proc.once('error', (err) => {
exited = true;
if (options?.signal) {
options.signal.removeEventListener('abort', abortHandler);
}
if (timeoutId) {
clearTimeout(timeoutId);
}
resolve({
stdout: '',
stderr: err.message,
exitCode: 1,
wasAborted,
});
});
});
return { promise, process: proc };
}
/**
* Executes a command synchronously (blocking)
* Use with caution - prefer executeCommand for async execution

View file

@ -2,22 +2,36 @@ import { ToolAttachment } from "@x/shared/dist/agent.js";
import { z } from "zod";
import { BuiltinTools } from "./builtin-tools.js";
import { executeTool } from "../../mcp/mcp.js";
import { IAbortRegistry } from "../../runs/abort-registry.js";
/**
* Context passed to every tool execution, providing abort signal and run metadata.
*/
export interface ToolContext {
runId: string;
signal: AbortSignal;
abortRegistry: IAbortRegistry;
}
async function execMcpTool(agentTool: z.infer<typeof ToolAttachment> & { type: "mcp" }, input: Record<string, unknown>): Promise<unknown> {
const result = await executeTool(agentTool.mcpServerName, agentTool.name, input);
return result;
}
export async function execTool(agentTool: z.infer<typeof ToolAttachment>, input: Record<string, unknown>): Promise<unknown> {
export async function execTool(agentTool: z.infer<typeof ToolAttachment>, input: Record<string, unknown>, ctx?: ToolContext): Promise<unknown> {
// Check abort before starting any tool
ctx?.signal.throwIfAborted();
switch (agentTool.type) {
case "mcp":
// MCP tools: let complete on graceful stop (most are fast)
return execMcpTool(agentTool, input);
case "builtin": {
const builtinTool = BuiltinTools[agentTool.name];
if (!builtinTool || !builtinTool.execute) {
throw new Error(`Unsupported builtin tool: ${agentTool.name}`);
}
return builtinTool.execute(input);
return builtinTool.execute(input, ctx);
}
}
}

View file

@ -11,6 +11,7 @@ import { IAgentRuntime, AgentRuntime } from "../agents/runtime.js";
import { FSOAuthRepo, IOAuthRepo } from "../auth/repo.js";
import { FSClientRegistrationRepo, IClientRegistrationRepo } from "../auth/client-repo.js";
import { FSGranolaConfigRepo, IGranolaConfigRepo } from "../knowledge/granola/repo.js";
import { IAbortRegistry, InMemoryAbortRegistry } from "../runs/abort-registry.js";
const container = createContainer({
injectionMode: InjectionMode.PROXY,
@ -22,6 +23,7 @@ container.register({
messageQueue: asClass<IMessageQueue>(InMemoryMessageQueue).singleton(),
bus: asClass<IBus>(InMemoryBus).singleton(),
runsLock: asClass<IRunsLock>(InMemoryRunsLock).singleton(),
abortRegistry: asClass<IAbortRegistry>(InMemoryAbortRegistry).singleton(),
agentRuntime: asClass<IAgentRuntime>(AgentRuntime).singleton(),
mcpConfigRepo: asClass<IMcpConfigRepo>(FSMcpConfigRepo).singleton(),

View file

@ -84,6 +84,22 @@ export async function cleanup() {
}
}
/**
* Force-close all MCP client connections.
* Used during force abort to immediately reject any pending MCP tool calls.
* Clients will be lazily reconnected on next use.
*/
export async function forceCloseAllMcpClients(): Promise<void> {
for (const [serverName, { client }] of Object.entries(clients)) {
try {
await client?.close();
} catch {
// Ignore errors during force close
}
delete clients[serverName];
}
}
export async function listServers(): Promise<z.infer<typeof McpServerList>> {
const repo = container.resolve<IMcpConfigRepo>('mcpConfigRepo');
const { mcpServers } = await repo.getConfig();

View file

@ -0,0 +1,170 @@
import { ChildProcess } from "child_process";
export interface IAbortRegistry {
/**
* Create and track an AbortController for a run.
* Returns the AbortSignal to thread through all operations.
*/
createForRun(runId: string): AbortSignal;
/**
* Track a child process for a run (so we can kill it on abort).
*/
registerProcess(runId: string, process: ChildProcess): void;
/**
* Untrack a child process after it exits.
*/
unregisterProcess(runId: string, process: ChildProcess): void;
/**
* Graceful abort:
* 1. Fires the AbortSignal (cancels LLM streaming, etc.)
* 2. Sends SIGTERM to all tracked process groups
* 3. Schedules SIGKILL fallback after grace period
*/
abort(runId: string): void;
/**
* Force abort:
* 1. Fires AbortSignal if not already fired
* 2. Sends SIGKILL to all tracked process groups immediately
*/
forceAbort(runId: string): void;
/**
* Check if a run has been aborted.
*/
isAborted(runId: string): boolean;
/**
* Clean up tracking state after a run completes or is fully stopped.
*/
cleanup(runId: string): void;
}
interface RunAbortState {
controller: AbortController;
processes: Set<ChildProcess>;
killTimers: Set<ReturnType<typeof setTimeout>>;
}
const SIGKILL_GRACE_MS = 200;
export class InMemoryAbortRegistry implements IAbortRegistry {
private runs: Map<string, RunAbortState> = new Map();
createForRun(runId: string): AbortSignal {
// If a previous run state exists, clean it up first
this.cleanup(runId);
const state: RunAbortState = {
controller: new AbortController(),
processes: new Set(),
killTimers: new Set(),
};
this.runs.set(runId, state);
return state.controller.signal;
}
registerProcess(runId: string, process: ChildProcess): void {
const state = this.runs.get(runId);
if (!state) return;
state.processes.add(process);
// Auto-unregister when process exits
const onExit = () => {
state.processes.delete(process);
};
process.once("exit", onExit);
process.once("error", onExit);
}
unregisterProcess(runId: string, process: ChildProcess): void {
const state = this.runs.get(runId);
if (!state) return;
state.processes.delete(process);
}
abort(runId: string): void {
const state = this.runs.get(runId);
if (!state) return;
// 1. Fire the abort signal
if (!state.controller.signal.aborted) {
state.controller.abort();
}
// 2. SIGTERM all tracked process groups
for (const proc of state.processes) {
this.killProcessTree(proc, "SIGTERM");
// 3. Schedule SIGKILL fallback
const timer = setTimeout(() => {
if (!proc.killed) {
this.killProcessTree(proc, "SIGKILL");
}
state.killTimers.delete(timer);
}, SIGKILL_GRACE_MS);
state.killTimers.add(timer);
}
}
forceAbort(runId: string): void {
const state = this.runs.get(runId);
if (!state) return;
// 1. Fire abort signal if not already
if (!state.controller.signal.aborted) {
state.controller.abort();
}
// 2. Clear any pending graceful kill timers
for (const timer of state.killTimers) {
clearTimeout(timer);
}
state.killTimers.clear();
// 3. SIGKILL all tracked process groups immediately
for (const proc of state.processes) {
this.killProcessTree(proc, "SIGKILL");
}
}
isAborted(runId: string): boolean {
const state = this.runs.get(runId);
return state?.controller.signal.aborted ?? false;
}
cleanup(runId: string): void {
const state = this.runs.get(runId);
if (!state) return;
// Clear any pending kill timers
for (const timer of state.killTimers) {
clearTimeout(timer);
}
this.runs.delete(runId);
}
/**
* Kill a process tree using negative PID (process group kill on Unix).
* Falls back to direct kill if group kill fails.
*/
private killProcessTree(proc: ChildProcess, signal: NodeJS.Signals): void {
if (!proc.pid || proc.killed) return;
try {
// Negative PID kills the entire process group (Unix)
process.kill(-proc.pid, signal);
} catch {
// Fallback: kill just the process directly
try {
proc.kill(signal);
} catch {
// Process may already be dead
}
}
}
}

View file

@ -5,6 +5,8 @@ import { AskHumanResponseEvent, ToolPermissionResponseEvent, CreateRunOptions, R
import { IRunsRepo } from "./repo.js";
import { IAgentRuntime } from "../agents/runtime.js";
import { IBus } from "../application/lib/bus.js";
import { IAbortRegistry } from "./abort-registry.js";
import { forceCloseAllMcpClients } from "../mcp/mcp.js";
export async function createRun(opts: z.infer<typeof CreateRunOptions>): Promise<z.infer<typeof Run>> {
const repo = container.resolve<IRunsRepo>('runsRepo');
@ -46,9 +48,21 @@ export async function replyToHumanInputRequest(runId: string, ev: z.infer<typeof
runtime.trigger(runId);
}
export async function stop(runId: string): Promise<void> {
console.log(`Stopping run ${runId}`);
throw new Error('Not implemented');
export async function stop(runId: string, force: boolean = false): Promise<void> {
const abortRegistry = container.resolve<IAbortRegistry>('abortRegistry');
if (force && abortRegistry.isAborted(runId)) {
// Second click: aggressive cleanup — SIGKILL + force close MCP clients
console.log(`Force stopping run ${runId}`);
abortRegistry.forceAbort(runId);
await forceCloseAllMcpClients();
} else {
// First click: graceful — fires AbortSignal + SIGTERM
console.log(`Gracefully stopping run ${runId}`);
abortRegistry.abort(runId);
}
// Note: The run-stopped event is emitted by AgentRuntime.trigger() when it detects the abort.
// This avoids duplicate events and ensures proper sequencing.
}
export async function fetchRun(runId: string): Promise<z.infer<typeof Run>> {