Rebase onto dev: reconcile code-mode + direct mode on its own runtime

Rebased new-runtime onto dev (which added code-mode on the old runs-based
runtime). Reconciliation: kept the generic event-log + bus (runs/repo.ts,
runs/bus.ts, a trimmed runs.ts = createRun + fetchRun only) decoupled from the
retired LLM runtime; restored the runsRepo DI registration and the
bus -> runs:events forwarder.

Step 1 of the code-mode migration — direct mode on its own dedicated runtime:
- new SQLite-backed CodeEventStore (migration 0008 + code_session_events) replaces
  the runs JSONL log for code sessions
- dedicated codeEventBus (InMemoryBus) + codeSession:events feed +
  codeSession:getEvents history channel, replacing the shared bus / runs:events /
  runs:fetch for code
- CodeSessionService mints its own id (drops createRun) and writes to
  codeEventStore / codeEventBus; status-tracker subscribes to codeEventBus
- renderer (use-code-chat) loads history via codeSession:getEvents and streams via
  codeSession:events

Rowboat mode is temporarily disabled in the UI (its old copilot-LLM path is
retired); it moves onto the new sessions runtime in step 2.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ramnique Singh 2026-06-15 10:39:20 +05:30
parent 251a462686
commit d0ba9fa4a6
12 changed files with 201 additions and 175 deletions

View file

@ -10,6 +10,9 @@ import {
import { watcher as watcherCore, workspace } from '@x/core';
import { workspace as workspaceShared } from '@x/shared';
import * as mcpCore from '@x/core/dist/mcp/mcp.js';
import * as runsCore from '@x/core/dist/runs/runs.js';
import { bus } from '@x/core/dist/runs/bus.js';
import { RunEvent } from '@x/shared/dist/runs.js';
import type { AgentRuntime } from '@x/core/dist/agent-runtime/index.js';
import type { SessionBusEvent } from '@x/shared/dist/sessions.js';
import { serviceBus } from '@x/core/dist/services/service_bus.js';
@ -36,6 +39,8 @@ import type { ICodeProjectsRepo } from '@x/core/dist/code-mode/projects/repo.js'
import type { ICodeSessionsRepo } from '@x/core/dist/code-mode/sessions/repo.js';
import { CodeSessionService } from '@x/core/dist/code-mode/sessions/service.js';
import { CodeSessionStatusTracker } from '@x/core/dist/code-mode/sessions/status-tracker.js';
import { CodeEventStore } from '@x/core/dist/code-mode/sessions/event-store.js';
import type { IBus } from '@x/core/dist/application/lib/bus.js';
import * as codeGit from '@x/core/dist/code-mode/git/service.js';
import { readProjectDir, readProjectFile } from '@x/core/dist/code-mode/projects/fs.js';
import { ensureTerminal, writeTerminal, resizeTerminal, disposeTerminal } from './terminal.js';
@ -398,6 +403,20 @@ export async function startCodeSessionStatusWatcher(): Promise<void> {
});
}
let codeEventWatcher: (() => void) | null = null;
export async function startCodeEventWatcher(): Promise<void> {
if (codeEventWatcher) return;
const codeEventBus = container.resolve<IBus>('codeEventBus');
codeEventWatcher = await codeEventBus.subscribe('*', async (event) => {
const windows = BrowserWindow.getAllWindows();
for (const win of windows) {
if (!win.isDestroyed() && win.webContents) {
win.webContents.send('codeSession:events', event);
}
}
});
}
// Forward the generic event bus → renderer (runs:events). Code-mode (direct ACP
// sessions) streams its live events (code-run-event, permission, message, …)
// through this feed; chat + headless use the sessions:events feed below.
@ -652,6 +671,15 @@ export function setupIpcHandlers(agentRuntime: AgentRuntime) {
registry.resolve(args.requestId, args.decision);
return { success: true };
},
// Code-mode reads a session's transcript from the generic run event-log.
'runs:fetch': async (_event, args) => {
return runsCore.fetchRun(args.runId);
},
// Code-mode's own transcript history (its dedicated SQLite event store).
'codeSession:getEvents': async (_event, args) => {
const store = container.resolve<CodeEventStore>('codeEventStore');
return { events: await store.list(args.sessionId) };
},
'models:list': async () => {
if (await isSignedIn()) {
return await listGatewayModels();

View file

@ -4,6 +4,7 @@ import {
setupIpcHandlers,
startRunsWatcher,
startCodeSessionStatusWatcher,
startCodeEventWatcher,
startSessionsWatcher,
startServicesWatcher,
startLiveNoteAgentWatcher,
@ -370,6 +371,10 @@ app.whenReady().then(async () => {
// start code-session status tracker (derives working/needs-you/idle + notifications)
startCodeSessionStatusWatcher();
// start code-mode event watcher — forwards code-mode's own event bus to the
// renderer (codeSession:events feed for direct ACP sessions).
startCodeEventWatcher();
// start services watcher
startServicesWatcher();

View file

@ -1,6 +1,6 @@
import { useCallback, useEffect, useRef, useState } from 'react'
import type z from 'zod'
import type { RunEvent, ToolPermissionRequestEvent, AskHumanRequestEvent } from '@x/shared/src/runs.js'
import type { ToolPermissionRequestEvent, AskHumanRequestEvent } from '@x/shared/src/runs.js'
import type { CodeRunEvent, PermissionAsk, PermissionDecision } from '@x/shared/src/code-mode.js'
import type { CodeSession } from '@x/shared/src/code-sessions.js'
import {
@ -111,7 +111,7 @@ export function useCodeChat(session: CodeSession | null) {
setPendingAskHumans(new Map())
seenMessageIdsRef.current = new Set()
void window.ipc.invoke('runs:fetch', { runId: sessionId }).then((run) => {
void window.ipc.invoke('codeSession:getEvents', { sessionId }).then(({ events }) => {
if (cancelled) return
const loaded: CodeChatItem[] = []
const toolCallMap = new Map<string, ToolCall>()
@ -121,7 +121,7 @@ export function useCodeChat(session: CodeSession | null) {
const toolPerms = new Map<string, z.infer<typeof ToolPermissionRequestEvent>>()
const askHumans = new Map<string, z.infer<typeof AskHumanRequestEvent>>()
for (const event of run.log as z.infer<typeof RunEvent>[]) {
for (const event of events) {
const ts = event.ts ? new Date(event.ts).getTime() : Date.now()
switch (event.type) {
case 'message': {
@ -209,7 +209,7 @@ export function useCodeChat(session: CodeSession | null) {
setPendingToolPermissions(toolPerms)
setPendingAskHumans(askHumans)
}).catch(() => {
// Run log unreadable — show an empty conversation rather than crashing.
// Transcript unreadable — show an empty conversation rather than crashing.
}).finally(() => {
if (!cancelled) setLoading(false)
})
@ -217,12 +217,10 @@ export function useCodeChat(session: CodeSession | null) {
return () => { cancelled = true }
}, [sessionId])
// Live event stream.
// Live event stream — code-mode's own dedicated feed (direct ACP sessions).
useEffect(() => {
if (!sessionId) return
// runs:events is schema-less on the wire (req: z.null()) — cast like App.tsx does.
return window.ipc.on('runs:events', ((raw: unknown) => {
const event = raw as z.infer<typeof RunEvent>
return window.ipc.on('codeSession:events', (event) => {
if (event.runId !== sessionId) return
switch (event.type) {
case 'run-processing-start':
@ -372,7 +370,7 @@ export function useCodeChat(session: CodeSession | null) {
default:
break
}
}) as unknown as (event: null) => void)
})
}, [sessionId, applyCodeRunEvent])
const send = useCallback(async (text: string): Promise<{ ok: boolean; error?: string }> => {
@ -395,13 +393,11 @@ export function useCodeChat(session: CodeSession | null) {
return { ok: false, error: res.error ?? 'The session is busy.' }
}
} else {
await window.ipc.invoke('runs:createMessage', {
runId: session.id,
message: trimmed,
codeMode: session.agent,
codeCwd: session.cwd,
codePolicy: session.policy,
})
// Rowboat mode (copilot-orchestrated) is being migrated onto the new
// sessions runtime; the old run-based path is retired. Temporarily
// unavailable until that lands.
setIsProcessing(false)
return { ok: false, error: 'Rowboat mode is temporarily unavailable while it migrates to the new runtime.' }
}
return { ok: true }
} catch (err) {
@ -437,10 +433,10 @@ export function useCodeChat(session: CodeSession | null) {
next.delete(toolCallId)
return next
})
await window.ipc.invoke('runs:authorizePermission', {
runId: sessionId,
authorization: { subflow, toolCallId, response, scope },
})
// Rowboat copilot gates are disabled while rowboat migrates to the new
// sessions runtime (these will use sessions:respondToPermission).
void subflow; void response; void scope
console.warn('Rowboat tool-permission response ignored (rowboat migrating to the new runtime).')
}, [sessionId])
const respondToAskHuman = useCallback(async (toolCallId: string, subflow: string[], response: string) => {
@ -450,10 +446,8 @@ export function useCodeChat(session: CodeSession | null) {
next.delete(toolCallId)
return next
})
await window.ipc.invoke('runs:provideHumanInput', {
runId: sessionId,
reply: { subflow, toolCallId, response },
})
void subflow; void response
console.warn('Rowboat ask-human response ignored (rowboat migrating to the new runtime).')
}, [sessionId])
return {

View file

@ -0,0 +1,51 @@
import { z } from "zod";
import { RunEvent } from "@x/shared/dist/runs.js";
import { getDb } from "../../storage/database.js";
// Code-mode's own append-only event log, backed by the new SQLite storage.
// This is the dedicated replacement for the generic runs/ JSONL store the old
// agent runtime shared: a direct (ACP) code session's transcript lives here,
// keyed by session id and ordered by insertion. Events are RunEvents (the same
// shape the renderer already renders) — only the storage backend changed.
export class CodeEventStore {
private get db() {
return getDb();
}
async append(sessionId: string, events: z.infer<typeof RunEvent>[]): Promise<void> {
if (events.length === 0) return;
const now = new Date().toISOString();
await this.db
.insertInto("code_session_events")
.values(events.map((event) => ({
session_id: sessionId,
event: JSON.stringify(event),
created_at: now,
})))
.execute();
}
async list(sessionId: string): Promise<z.infer<typeof RunEvent>[]> {
const rows = await this.db
.selectFrom("code_session_events")
.select("event")
.where("session_id", "=", sessionId)
.orderBy("id", "asc")
.execute();
const out: z.infer<typeof RunEvent>[] = [];
for (const row of rows) {
const parsed = RunEvent.safeParse(JSON.parse(row.event));
// Skip rather than throw on a stray/legacy row — a corrupt event must
// not make the whole transcript unloadable.
if (parsed.success) out.push(parsed.data);
}
return out;
}
async delete(sessionId: string): Promise<void> {
await this.db
.deleteFrom("code_session_events")
.where("session_id", "=", sessionId)
.execute();
}
}

View file

@ -1,3 +1,4 @@
import crypto from 'node:crypto';
import path from 'path';
import fs from 'fs/promises';
import z from 'zod';
@ -5,7 +6,7 @@ import { WorkDir } from '../../config/config.js';
import type { CodeSession, CodeSessionMode } from '@x/shared/dist/code-sessions.js';
import type { CodingAgent, ApprovalPolicy } from '@x/shared/dist/code-mode.js';
import { RunEvent, MessageEvent } from '@x/shared/dist/runs.js';
import type { IRunsRepo } from '../../runs/repo.js';
import type { CodeEventStore } from './event-store.js';
import type { IRunsLock } from '../../runs/lock.js';
import type { IBus } from '../../application/lib/bus.js';
import type { IMonotonicallyIncreasingIdGenerator } from '../../application/lib/id-gen.js';
@ -59,9 +60,9 @@ async function persistRunWorkDir(runId: string, cwd: string): Promise<void> {
// of the app (stop IPC, status tracking, event forwarding) needs no special
// casing.
export class CodeSessionService {
private readonly runsRepo: IRunsRepo;
private readonly codeEventStore: CodeEventStore;
private readonly runsLock: IRunsLock;
private readonly bus: IBus;
private readonly codeEventBus: IBus;
private readonly idGenerator: IMonotonicallyIncreasingIdGenerator;
private readonly abortRegistry: IAbortRegistry;
private readonly codeModeManager: CodeModeManager;
@ -73,9 +74,9 @@ export class CodeSessionService {
private readonly inflight = new Set<string>();
constructor({
runsRepo,
codeEventStore,
runsLock,
bus,
codeEventBus,
idGenerator,
abortRegistry,
codeModeManager,
@ -83,9 +84,9 @@ export class CodeSessionService {
codeSessionsRepo,
codeProjectsRepo,
}: {
runsRepo: IRunsRepo;
codeEventStore: CodeEventStore;
runsLock: IRunsLock;
bus: IBus;
codeEventBus: IBus;
idGenerator: IMonotonicallyIncreasingIdGenerator;
abortRegistry: IAbortRegistry;
codeModeManager: CodeModeManager;
@ -93,9 +94,9 @@ export class CodeSessionService {
codeSessionsRepo: ICodeSessionsRepo;
codeProjectsRepo: ICodeProjectsRepo;
}) {
this.runsRepo = runsRepo;
this.codeEventStore = codeEventStore;
this.runsLock = runsLock;
this.bus = bus;
this.codeEventBus = codeEventBus;
this.idGenerator = idGenerator;
this.abortRegistry = abortRegistry;
this.codeModeManager = codeModeManager;
@ -108,16 +109,11 @@ export class CodeSessionService {
const project = await this.codeProjectsRepo.get(args.projectId);
if (!project) throw new Error(`Unknown project: ${args.projectId}`);
// The session is a real run so Rowboat mode (agent runtime) works on it
// directly and the existing runs plumbing (fetch/events/stop) applies.
const { createRun } = await import('../../runs/runs.js');
const run = await createRun({
agentId: 'copilot',
useCase: 'code_session',
...(args.model ? { model: args.model } : {}),
...(args.provider ? { provider: args.provider } : {}),
});
const sessionId = run.id;
// The session id is its own opaque key — code-mode owns its event log
// (codeEventStore) and metadata (codeSessionsRepo) under this id; no run
// record is minted. Rowboat mode reuses this id as a sessions-runtime
// session id (wired in a later step).
const sessionId = crypto.randomUUID();
let cwd = project.path;
let worktree: CodeSession['worktree'];
@ -181,12 +177,12 @@ export class CodeSessionService {
const toolCallId = `direct-${turnId}`;
const appendAndPublish = async (event: z.infer<typeof RunEvent>) => {
await this.runsRepo.appendEvents(sessionId, [event]);
await this.bus.publish(event);
await this.codeEventStore.append(sessionId, [event]);
await this.codeEventBus.publish(event);
};
try {
await this.bus.publish({ runId: sessionId, type: 'run-processing-start', subflow: [] });
await this.codeEventBus.publish({ runId: sessionId, type: 'run-processing-start', subflow: [] });
const userEvent: z.infer<typeof MessageEvent> = {
runId: sessionId,
@ -228,14 +224,14 @@ export class CodeSessionService {
event,
subflow: [],
};
void this.bus.publish(streamEvent);
void this.codeEventBus.publish(streamEvent);
if (event.type === 'tool_call' || event.type === 'tool_call_update'
|| event.type === 'plan' || event.type === 'permission') {
persistQueue.push({ ...streamEvent, ts: new Date().toISOString() });
}
},
ask: (permAsk) => this.codePermissionRegistry.request(sessionId, (requestId) => {
void this.bus.publish({
void this.codeEventBus.publish({
runId: sessionId,
type: 'code-run-permission-request',
toolCallId,
@ -256,7 +252,7 @@ export class CodeSessionService {
}
if (persistQueue.length > 0) {
await this.runsRepo.appendEvents(sessionId, persistQueue);
await this.codeEventStore.append(sessionId, persistQueue);
}
if (finalText.trim()) {
await appendAndPublish({
@ -282,7 +278,7 @@ export class CodeSessionService {
this.inflight.delete(sessionId);
this.abortRegistry.cleanup(sessionId);
await this.runsLock.release(sessionId);
await this.bus.publish({ runId: sessionId, type: 'run-processing-end', subflow: [] });
await this.codeEventBus.publish({ runId: sessionId, type: 'run-processing-end', subflow: [] });
}
}
@ -349,7 +345,7 @@ export class CodeSessionService {
}
await clearStoredSession(sessionId);
await this.codeSessionsRepo.remove(sessionId);
await this.runsRepo.delete(sessionId).catch(() => {});
await this.codeEventStore.delete(sessionId).catch(() => {});
await fs.rm(path.join(WorkDir, 'config', `workdir-${sessionId}.json`), { force: true }).catch(() => {});
}

View file

@ -13,7 +13,7 @@ export type StatusListener = (sessionId: string, status: CodeSessionStatus) => v
// direct turns and Rowboat-mode code_agent_run turns publish the same event
// types on the bus. The renderer just renders what this pushes.
export class CodeSessionStatusTracker {
private readonly bus: IBus;
private readonly codeEventBus: IBus;
private readonly codeSessionsRepo: ICodeSessionsRepo;
private readonly statuses = new Map<string, CodeSessionStatus>();
private readonly busySince = new Map<string, number>();
@ -27,15 +27,15 @@ export class CodeSessionStatusTracker {
// an id that misses the refresh can never become a session later.
private readonly knownNonSessions = new Set<string>();
constructor({ bus, codeSessionsRepo }: { bus: IBus; codeSessionsRepo: ICodeSessionsRepo }) {
this.bus = bus;
constructor({ codeEventBus, codeSessionsRepo }: { codeEventBus: IBus; codeSessionsRepo: ICodeSessionsRepo }) {
this.codeEventBus = codeEventBus;
this.codeSessionsRepo = codeSessionsRepo;
}
async start(): Promise<void> {
if (this.unsubscribe) return;
await this.refreshKnownSessions();
this.unsubscribe = await this.bus.subscribe('*', async (event) => {
this.unsubscribe = await this.codeEventBus.subscribe('*', async (event) => {
await this.handle(event);
});
}

View file

@ -2,6 +2,7 @@ import { asClass, asValue, createContainer, InjectionMode } from "awilix";
import { FSModelConfigRepo, IModelConfigRepo } from "../models/repo.js";
import { FSMcpConfigRepo, IMcpConfigRepo } from "../mcp/repo.js";
import { FSAgentsRepo, IAgentsRepo } from "../agents/repo.js";
import { FSRunsRepo, IRunsRepo } from "../runs/repo.js";
import { IMonotonicallyIncreasingIdGenerator, IdGen } from "../application/lib/id-gen.js";
import { IBus, InMemoryBus } from "../application/lib/bus.js";
import { IRunsLock, InMemoryRunsLock } from "../runs/lock.js";
@ -17,6 +18,7 @@ import { CodeModeManager } from "../code-mode/acp/manager.js";
import { CodePermissionRegistry } from "../code-mode/acp/permission-registry.js";
import { FSCodeProjectsRepo, ICodeProjectsRepo } from "../code-mode/projects/repo.js";
import { FSCodeSessionsRepo, ICodeSessionsRepo } from "../code-mode/sessions/repo.js";
import { CodeEventStore } from "../code-mode/sessions/event-store.js";
import { CodeSessionService } from "../code-mode/sessions/service.js";
import { CodeSessionStatusTracker } from "../code-mode/sessions/status-tracker.js";
import type { IBrowserControlService } from "../application/browser-control/service.js";
@ -36,6 +38,9 @@ container.register({
mcpConfigRepo: asClass<IMcpConfigRepo>(FSMcpConfigRepo).singleton(),
modelConfigRepo: asClass<IModelConfigRepo>(FSModelConfigRepo).singleton(),
agentsRepo: asClass<IAgentsRepo>(FSAgentsRepo).singleton(),
// Generic run event-log store (JSONL). The LLM agent runtime that once drove
// it is retired; code-mode now uses it as its session event store.
runsRepo: asClass<IRunsRepo>(FSRunsRepo).singleton(),
oauthRepo: asClass<IOAuthRepo>(FSOAuthRepo).singleton(),
clientRegistrationRepo: asClass<IClientRegistrationRepo>(FSClientRegistrationRepo).singleton(),
granolaConfigRepo: asClass<IGranolaConfigRepo>(FSGranolaConfigRepo).singleton(),
@ -51,9 +56,13 @@ container.register({
codePermissionRegistry: asClass(CodePermissionRegistry).singleton(),
// Code section: project registry, session metadata, the direct-drive
// session service, and the live status tracker.
// session service, and the live status tracker. Code-mode owns its own
// event store + event bus (dedicated runtime, decoupled from the retired
// LLM agent runtime's runs/ infra).
codeProjectsRepo: asClass<ICodeProjectsRepo>(FSCodeProjectsRepo).singleton(),
codeSessionsRepo: asClass<ICodeSessionsRepo>(FSCodeSessionsRepo).singleton(),
codeEventStore: asClass(CodeEventStore).singleton(),
codeEventBus: asClass<IBus>(InMemoryBus).singleton(),
codeSessionService: asClass(CodeSessionService).singleton(),
codeSessionStatusTracker: asClass(CodeSessionStatusTracker).singleton(),
});

View file

@ -0,0 +1,4 @@
import container from "../di/container.js";
import { IBus } from "../application/lib/bus.js";
export const bus = container.resolve<IBus>('bus');

View file

@ -1,19 +1,18 @@
import z from "zod";
import container from "../di/container.js";
import { IMessageQueue, UserMessageContentType, VoiceOutputMode, MiddlePaneContext } from "../application/lib/message-queue.js";
import { AskHumanResponseEvent, ToolPermissionRequestEvent, ToolPermissionResponseEvent, CreateRunOptions, Run, ListRunsResponse, ToolPermissionAuthorizePayload, AskHumanResponsePayload } from "@x/shared/dist/runs.js";
import { CreateRunOptions, Run } from "@x/shared/dist/runs.js";
import { IRunsRepo } from "./repo.js";
import { ICodeSessionsRepo } from "../code-mode/sessions/repo.js";
import { IAgentRuntime } from "../agents/runtime.js";
import { IBus } from "../application/lib/bus.js";
import { IAbortRegistry } from "./abort-registry.js";
import { IRunsLock } from "./lock.js";
import { forceCloseAllMcpClients } from "../mcp/mcp.js";
import { extractCommandNames } from "../application/lib/command-executor.js";
import { addFileAccessGrant, addToSecurityConfig } from "../config/security.js";
import { loadAgent } from "../agents/runtime.js";
import { getDefaultModelAndProvider } from "../models/defaults.js";
// The generic run event-log helpers that survive the retirement of the old LLM
// agent runtime. The message/permission/stop helpers that drove the LLM loop
// (createMessage → agentRuntime.trigger, authorizePermission, replyToHumanInput,
// stop) are gone with it; chat + headless run on the new sessions/turn runtime.
// What remains is the minimal surface code-mode uses to mint and read a session's
// append-only event log: createRun (id + start event) and fetchRun.
export async function createRun(opts: z.infer<typeof CreateRunOptions>): Promise<z.infer<typeof Run>> {
const repo = container.resolve<IRunsRepo>('runsRepo');
const bus = container.resolve<IBus>('bus');
@ -41,114 +40,7 @@ export async function createRun(opts: z.infer<typeof CreateRunOptions>): Promise
return run;
}
export async function createMessage(runId: string, message: UserMessageContentType, voiceInput?: boolean, voiceOutput?: VoiceOutputMode, searchEnabled?: boolean, middlePaneContext?: MiddlePaneContext, codeMode?: 'claude' | 'codex', codeCwd?: string, codePolicy?: 'ask' | 'auto-approve-reads' | 'yolo'): Promise<string> {
// Code-section sessions carry their coding context in the session meta.
// Pin it here — not in the composer — so EVERY path into the run (assistant
// chat pane, voice, palette) drives the session's agent in its directory,
// and the session header stays the single source of truth.
try {
const sessionMeta = await container.resolve<ICodeSessionsRepo>('codeSessionsRepo').get(runId);
if (sessionMeta) {
codeMode = sessionMeta.agent;
codeCwd = sessionMeta.cwd;
codePolicy = sessionMeta.policy;
}
} catch {
// sessions repo unavailable — treat as a regular chat run
}
const queue = container.resolve<IMessageQueue>('messageQueue');
const id = await queue.enqueue(runId, message, voiceInput, voiceOutput, searchEnabled, middlePaneContext, codeMode, codeCwd, codePolicy);
const runtime = container.resolve<IAgentRuntime>('agentRuntime');
runtime.trigger(runId);
return id;
}
export async function authorizePermission(runId: string, ev: z.infer<typeof ToolPermissionAuthorizePayload>): Promise<void> {
const { scope, ...rest } = ev;
// For "always" scope, derive command from the run log and persist to security config
if (rest.response === "approve" && scope === "always") {
const repo = container.resolve<IRunsRepo>('runsRepo');
const run = await repo.fetch(runId);
const permReqEvent = run.log.find(
(e): e is z.infer<typeof ToolPermissionRequestEvent> =>
e.type === "tool-permission-request"
&& e.toolCall.toolCallId === rest.toolCallId
&& JSON.stringify(e.subflow) === JSON.stringify(rest.subflow)
);
if (permReqEvent?.permission?.kind === "file") {
await addFileAccessGrant({
operation: permReqEvent.permission.operation,
pathPrefix: permReqEvent.permission.pathPrefix,
});
} else if (permReqEvent && typeof permReqEvent.toolCall.arguments === 'object' && permReqEvent.toolCall.arguments !== null && 'command' in permReqEvent.toolCall.arguments) {
const commandNames = extractCommandNames(String(permReqEvent.toolCall.arguments.command));
if (commandNames.length > 0) {
await addToSecurityConfig(commandNames);
}
}
}
const repo = container.resolve<IRunsRepo>('runsRepo');
const event: z.infer<typeof ToolPermissionResponseEvent> = {
...rest,
runId,
type: "tool-permission-response",
scope,
};
await repo.appendEvents(runId, [event]);
const runtime = container.resolve<IAgentRuntime>('agentRuntime');
runtime.trigger(runId);
}
export async function replyToHumanInputRequest(runId: string, ev: z.infer<typeof AskHumanResponsePayload>): Promise<void> {
const repo = container.resolve<IRunsRepo>('runsRepo');
const event: z.infer<typeof AskHumanResponseEvent> = {
...ev,
runId,
type: "ask-human-response",
};
await repo.appendEvents(runId, [event]);
const runtime = container.resolve<IAgentRuntime>('agentRuntime');
runtime.trigger(runId);
}
export async function stop(runId: string, force: boolean = false): Promise<void> {
const abortRegistry = container.resolve<IAbortRegistry>('abortRegistry');
if (force && abortRegistry.isAborted(runId)) {
// Second click: aggressive cleanup — SIGKILL + force close MCP clients
console.log(`Force stopping run ${runId}`);
abortRegistry.forceAbort(runId);
await forceCloseAllMcpClients();
} else {
// First click: graceful — fires AbortSignal + SIGTERM
console.log(`Gracefully stopping run ${runId}`);
abortRegistry.abort(runId);
}
// Note: The run-stopped event is emitted by AgentRuntime.trigger() when it detects the abort.
// This avoids duplicate events and ensures proper sequencing.
}
export async function deleteRun(runId: string): Promise<void> {
const runsLock = container.resolve<IRunsLock>('runsLock');
if (!await runsLock.lock(runId)) {
throw new Error(`Cannot delete run ${runId}: run is currently active`);
}
try {
const repo = container.resolve<IRunsRepo>('runsRepo');
await repo.delete(runId);
} finally {
await runsLock.release(runId);
}
}
export async function fetchRun(runId: string): Promise<z.infer<typeof Run>> {
const repo = container.resolve<IRunsRepo>('runsRepo');
return repo.fetch(runId);
}
export async function listRuns(cursor?: string): Promise<z.infer<typeof ListRunsResponse>> {
const repo = container.resolve<IRunsRepo>('runsRepo');
return repo.list(cursor);
}

View file

@ -156,6 +156,31 @@ const migrations: Record<string, Migration> = {
await db.schema.alterTable("agent_loop_turns").dropColumn("use_case").execute();
},
},
"2026-06-15_0008_code_session_events": {
async up(db: MigrationDb): Promise<void> {
// Code-mode's own append-only event log (direct ACP sessions),
// replacing the generic runs/ JSONL store.
await db.schema
.createTable("code_session_events")
.ifNotExists()
.addColumn("id", "integer", (col) => col.primaryKey().autoIncrement())
.addColumn("session_id", "text", (col) => col.notNull())
.addColumn("event", "text", (col) => col.notNull())
.addColumn("created_at", "text", (col) => col.notNull())
.execute();
await db.schema
.createIndex("code_session_events_session_id_idx")
.ifNotExists()
.on("code_session_events")
.column("session_id")
.execute();
},
async down(db: MigrationDb): Promise<void> {
await db.schema.dropIndex("code_session_events_session_id_idx").ifExists().execute();
await db.schema.dropTable("code_session_events").ifExists().execute();
},
},
};
class InCodeMigrationProvider implements MigrationProvider {

View file

@ -1,4 +1,4 @@
import type { ColumnType } from "kysely";
import type { ColumnType, Generated } from "kysely";
export type TimestampColumn = ColumnType<string, string, string>;
@ -40,8 +40,19 @@ export interface SessionsTable {
updated_at: TimestampColumn;
}
// Append-only event log for code-mode (direct ACP) sessions. This is code-mode's
// own dedicated event store — it replaces the generic runs/ JSONL log the old
// agent runtime shared. One row per RunEvent; ordered by the autoincrement id.
export interface CodeSessionEventsTable {
id: Generated<number>;
session_id: string;
event: string; // JSON: RunEvent
created_at: TimestampColumn;
}
export interface Database {
storage_metadata: StorageMetadataTable;
agent_loop_turns: AgentLoopTurnsTable;
sessions: SessionsTable;
code_session_events: CodeSessionEventsTable;
}

View file

@ -19,9 +19,9 @@ import { RowboatApiConfig } from './rowboat-account.js';
import { ZListToolkitsResponse } from './composio.js';
import { BrowserStateSchema } from './browser-control.js';
import { BillingInfoSchema } from './billing.js';
import { EmailBlockSchema, GmailThreadSchema } from './blocks.js';
import { GmailThreadSchema } from './blocks.js';
import { PermissionDecision, ApprovalPolicy, CodingAgent } from './code-mode.js';
import { Run } from './runs.js';
import { Run, RunEvent } from './runs.js';
import { NotificationSettingsSchema } from './notification-settings.js';
import { CodeProject, CodeSession, CodeSessionMode, CodeSessionStatus, GitRepoInfo, GitStatusFile } from './code-sessions.js';
@ -572,6 +572,17 @@ const ipcSchemas = {
}),
res: z.null(),
},
// Code-mode's own transcript history (replaces the generic runs:fetch).
'codeSession:getEvents': {
req: z.object({ sessionId: z.string() }),
res: z.object({ events: z.array(RunEvent) }),
},
// main → renderer: code-mode's live event feed (replaces runs:events). Carries
// the session's RunEvents (code-run-event, message, processing, …).
'codeSession:events': {
req: RunEvent,
res: z.null(),
},
// ==========================================================================
// Embedded terminal (Code section): one PTY per coding session
// ==========================================================================