From d0ba9fa4a61cc1df6d676658496e982f8f858522 Mon Sep 17 00:00:00 2001 From: Ramnique Singh <30795890+ramnique@users.noreply.github.com> Date: Mon, 15 Jun 2026 10:39:20 +0530 Subject: [PATCH] Rebase onto dev: reconcile code-mode + direct mode on its own runtime MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rebased new-runtime onto dev (which added code-mode on the old runs-based runtime). Reconciliation: kept the generic event-log + bus (runs/repo.ts, runs/bus.ts, a trimmed runs.ts = createRun + fetchRun only) decoupled from the retired LLM runtime; restored the runsRepo DI registration and the bus -> runs:events forwarder. Step 1 of the code-mode migration — direct mode on its own dedicated runtime: - new SQLite-backed CodeEventStore (migration 0008 + code_session_events) replaces the runs JSONL log for code sessions - dedicated codeEventBus (InMemoryBus) + codeSession:events feed + codeSession:getEvents history channel, replacing the shared bus / runs:events / runs:fetch for code - CodeSessionService mints its own id (drops createRun) and writes to codeEventStore / codeEventBus; status-tracker subscribes to codeEventBus - renderer (use-code-chat) loads history via codeSession:getEvents and streams via codeSession:events Rowboat mode is temporarily disabled in the UI (its old copilot-LLM path is retired); it moves onto the new sessions runtime in step 2. Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/x/apps/main/src/ipc.ts | 28 ++++ apps/x/apps/main/src/main.ts | 5 + .../src/components/code/use-code-chat.ts | 42 +++--- .../src/code-mode/sessions/event-store.ts | 51 +++++++ .../core/src/code-mode/sessions/service.ts | 50 ++++--- .../src/code-mode/sessions/status-tracker.ts | 8 +- apps/x/packages/core/src/di/container.ts | 11 +- apps/x/packages/core/src/runs/bus.ts | 4 + apps/x/packages/core/src/runs/runs.ts | 124 ++---------------- .../x/packages/core/src/storage/migrations.ts | 25 ++++ apps/x/packages/core/src/storage/schema.ts | 13 +- apps/x/packages/shared/src/ipc.ts | 15 ++- 12 files changed, 201 insertions(+), 175 deletions(-) create mode 100644 apps/x/packages/core/src/code-mode/sessions/event-store.ts create mode 100644 apps/x/packages/core/src/runs/bus.ts diff --git a/apps/x/apps/main/src/ipc.ts b/apps/x/apps/main/src/ipc.ts index 583aec5d..1218c81a 100644 --- a/apps/x/apps/main/src/ipc.ts +++ b/apps/x/apps/main/src/ipc.ts @@ -10,6 +10,9 @@ import { import { watcher as watcherCore, workspace } from '@x/core'; import { workspace as workspaceShared } from '@x/shared'; import * as mcpCore from '@x/core/dist/mcp/mcp.js'; +import * as runsCore from '@x/core/dist/runs/runs.js'; +import { bus } from '@x/core/dist/runs/bus.js'; +import { RunEvent } from '@x/shared/dist/runs.js'; import type { AgentRuntime } from '@x/core/dist/agent-runtime/index.js'; import type { SessionBusEvent } from '@x/shared/dist/sessions.js'; import { serviceBus } from '@x/core/dist/services/service_bus.js'; @@ -36,6 +39,8 @@ import type { ICodeProjectsRepo } from '@x/core/dist/code-mode/projects/repo.js' import type { ICodeSessionsRepo } from '@x/core/dist/code-mode/sessions/repo.js'; import { CodeSessionService } from '@x/core/dist/code-mode/sessions/service.js'; import { CodeSessionStatusTracker } from '@x/core/dist/code-mode/sessions/status-tracker.js'; +import { CodeEventStore } from '@x/core/dist/code-mode/sessions/event-store.js'; +import type { IBus } from '@x/core/dist/application/lib/bus.js'; import * as codeGit from '@x/core/dist/code-mode/git/service.js'; import { readProjectDir, readProjectFile } from '@x/core/dist/code-mode/projects/fs.js'; import { ensureTerminal, writeTerminal, resizeTerminal, disposeTerminal } from './terminal.js'; @@ -398,6 +403,20 @@ export async function startCodeSessionStatusWatcher(): Promise { }); } +let codeEventWatcher: (() => void) | null = null; +export async function startCodeEventWatcher(): Promise { + if (codeEventWatcher) return; + const codeEventBus = container.resolve('codeEventBus'); + codeEventWatcher = await codeEventBus.subscribe('*', async (event) => { + const windows = BrowserWindow.getAllWindows(); + for (const win of windows) { + if (!win.isDestroyed() && win.webContents) { + win.webContents.send('codeSession:events', event); + } + } + }); +} + // Forward the generic event bus → renderer (runs:events). Code-mode (direct ACP // sessions) streams its live events (code-run-event, permission, message, …) // through this feed; chat + headless use the sessions:events feed below. @@ -652,6 +671,15 @@ export function setupIpcHandlers(agentRuntime: AgentRuntime) { registry.resolve(args.requestId, args.decision); return { success: true }; }, + // Code-mode reads a session's transcript from the generic run event-log. + 'runs:fetch': async (_event, args) => { + return runsCore.fetchRun(args.runId); + }, + // Code-mode's own transcript history (its dedicated SQLite event store). + 'codeSession:getEvents': async (_event, args) => { + const store = container.resolve('codeEventStore'); + return { events: await store.list(args.sessionId) }; + }, 'models:list': async () => { if (await isSignedIn()) { return await listGatewayModels(); diff --git a/apps/x/apps/main/src/main.ts b/apps/x/apps/main/src/main.ts index ba977cc4..1ec07c02 100644 --- a/apps/x/apps/main/src/main.ts +++ b/apps/x/apps/main/src/main.ts @@ -4,6 +4,7 @@ import { setupIpcHandlers, startRunsWatcher, startCodeSessionStatusWatcher, + startCodeEventWatcher, startSessionsWatcher, startServicesWatcher, startLiveNoteAgentWatcher, @@ -370,6 +371,10 @@ app.whenReady().then(async () => { // start code-session status tracker (derives working/needs-you/idle + notifications) startCodeSessionStatusWatcher(); + // start code-mode event watcher — forwards code-mode's own event bus to the + // renderer (codeSession:events feed for direct ACP sessions). + startCodeEventWatcher(); + // start services watcher startServicesWatcher(); diff --git a/apps/x/apps/renderer/src/components/code/use-code-chat.ts b/apps/x/apps/renderer/src/components/code/use-code-chat.ts index 08468c49..b26344ca 100644 --- a/apps/x/apps/renderer/src/components/code/use-code-chat.ts +++ b/apps/x/apps/renderer/src/components/code/use-code-chat.ts @@ -1,6 +1,6 @@ import { useCallback, useEffect, useRef, useState } from 'react' import type z from 'zod' -import type { RunEvent, ToolPermissionRequestEvent, AskHumanRequestEvent } from '@x/shared/src/runs.js' +import type { ToolPermissionRequestEvent, AskHumanRequestEvent } from '@x/shared/src/runs.js' import type { CodeRunEvent, PermissionAsk, PermissionDecision } from '@x/shared/src/code-mode.js' import type { CodeSession } from '@x/shared/src/code-sessions.js' import { @@ -111,7 +111,7 @@ export function useCodeChat(session: CodeSession | null) { setPendingAskHumans(new Map()) seenMessageIdsRef.current = new Set() - void window.ipc.invoke('runs:fetch', { runId: sessionId }).then((run) => { + void window.ipc.invoke('codeSession:getEvents', { sessionId }).then(({ events }) => { if (cancelled) return const loaded: CodeChatItem[] = [] const toolCallMap = new Map() @@ -121,7 +121,7 @@ export function useCodeChat(session: CodeSession | null) { const toolPerms = new Map>() const askHumans = new Map>() - for (const event of run.log as z.infer[]) { + for (const event of events) { const ts = event.ts ? new Date(event.ts).getTime() : Date.now() switch (event.type) { case 'message': { @@ -209,7 +209,7 @@ export function useCodeChat(session: CodeSession | null) { setPendingToolPermissions(toolPerms) setPendingAskHumans(askHumans) }).catch(() => { - // Run log unreadable — show an empty conversation rather than crashing. + // Transcript unreadable — show an empty conversation rather than crashing. }).finally(() => { if (!cancelled) setLoading(false) }) @@ -217,12 +217,10 @@ export function useCodeChat(session: CodeSession | null) { return () => { cancelled = true } }, [sessionId]) - // Live event stream. + // Live event stream — code-mode's own dedicated feed (direct ACP sessions). useEffect(() => { if (!sessionId) return - // runs:events is schema-less on the wire (req: z.null()) — cast like App.tsx does. - return window.ipc.on('runs:events', ((raw: unknown) => { - const event = raw as z.infer + return window.ipc.on('codeSession:events', (event) => { if (event.runId !== sessionId) return switch (event.type) { case 'run-processing-start': @@ -372,7 +370,7 @@ export function useCodeChat(session: CodeSession | null) { default: break } - }) as unknown as (event: null) => void) + }) }, [sessionId, applyCodeRunEvent]) const send = useCallback(async (text: string): Promise<{ ok: boolean; error?: string }> => { @@ -395,13 +393,11 @@ export function useCodeChat(session: CodeSession | null) { return { ok: false, error: res.error ?? 'The session is busy.' } } } else { - await window.ipc.invoke('runs:createMessage', { - runId: session.id, - message: trimmed, - codeMode: session.agent, - codeCwd: session.cwd, - codePolicy: session.policy, - }) + // Rowboat mode (copilot-orchestrated) is being migrated onto the new + // sessions runtime; the old run-based path is retired. Temporarily + // unavailable until that lands. + setIsProcessing(false) + return { ok: false, error: 'Rowboat mode is temporarily unavailable while it migrates to the new runtime.' } } return { ok: true } } catch (err) { @@ -437,10 +433,10 @@ export function useCodeChat(session: CodeSession | null) { next.delete(toolCallId) return next }) - await window.ipc.invoke('runs:authorizePermission', { - runId: sessionId, - authorization: { subflow, toolCallId, response, scope }, - }) + // Rowboat copilot gates are disabled while rowboat migrates to the new + // sessions runtime (these will use sessions:respondToPermission). + void subflow; void response; void scope + console.warn('Rowboat tool-permission response ignored (rowboat migrating to the new runtime).') }, [sessionId]) const respondToAskHuman = useCallback(async (toolCallId: string, subflow: string[], response: string) => { @@ -450,10 +446,8 @@ export function useCodeChat(session: CodeSession | null) { next.delete(toolCallId) return next }) - await window.ipc.invoke('runs:provideHumanInput', { - runId: sessionId, - reply: { subflow, toolCallId, response }, - }) + void subflow; void response + console.warn('Rowboat ask-human response ignored (rowboat migrating to the new runtime).') }, [sessionId]) return { diff --git a/apps/x/packages/core/src/code-mode/sessions/event-store.ts b/apps/x/packages/core/src/code-mode/sessions/event-store.ts new file mode 100644 index 00000000..c0296475 --- /dev/null +++ b/apps/x/packages/core/src/code-mode/sessions/event-store.ts @@ -0,0 +1,51 @@ +import { z } from "zod"; +import { RunEvent } from "@x/shared/dist/runs.js"; +import { getDb } from "../../storage/database.js"; + +// Code-mode's own append-only event log, backed by the new SQLite storage. +// This is the dedicated replacement for the generic runs/ JSONL store the old +// agent runtime shared: a direct (ACP) code session's transcript lives here, +// keyed by session id and ordered by insertion. Events are RunEvents (the same +// shape the renderer already renders) — only the storage backend changed. +export class CodeEventStore { + private get db() { + return getDb(); + } + + async append(sessionId: string, events: z.infer[]): Promise { + if (events.length === 0) return; + const now = new Date().toISOString(); + await this.db + .insertInto("code_session_events") + .values(events.map((event) => ({ + session_id: sessionId, + event: JSON.stringify(event), + created_at: now, + }))) + .execute(); + } + + async list(sessionId: string): Promise[]> { + const rows = await this.db + .selectFrom("code_session_events") + .select("event") + .where("session_id", "=", sessionId) + .orderBy("id", "asc") + .execute(); + const out: z.infer[] = []; + for (const row of rows) { + const parsed = RunEvent.safeParse(JSON.parse(row.event)); + // Skip rather than throw on a stray/legacy row — a corrupt event must + // not make the whole transcript unloadable. + if (parsed.success) out.push(parsed.data); + } + return out; + } + + async delete(sessionId: string): Promise { + await this.db + .deleteFrom("code_session_events") + .where("session_id", "=", sessionId) + .execute(); + } +} diff --git a/apps/x/packages/core/src/code-mode/sessions/service.ts b/apps/x/packages/core/src/code-mode/sessions/service.ts index 6eaf047b..a7ca1206 100644 --- a/apps/x/packages/core/src/code-mode/sessions/service.ts +++ b/apps/x/packages/core/src/code-mode/sessions/service.ts @@ -1,3 +1,4 @@ +import crypto from 'node:crypto'; import path from 'path'; import fs from 'fs/promises'; import z from 'zod'; @@ -5,7 +6,7 @@ import { WorkDir } from '../../config/config.js'; import type { CodeSession, CodeSessionMode } from '@x/shared/dist/code-sessions.js'; import type { CodingAgent, ApprovalPolicy } from '@x/shared/dist/code-mode.js'; import { RunEvent, MessageEvent } from '@x/shared/dist/runs.js'; -import type { IRunsRepo } from '../../runs/repo.js'; +import type { CodeEventStore } from './event-store.js'; import type { IRunsLock } from '../../runs/lock.js'; import type { IBus } from '../../application/lib/bus.js'; import type { IMonotonicallyIncreasingIdGenerator } from '../../application/lib/id-gen.js'; @@ -59,9 +60,9 @@ async function persistRunWorkDir(runId: string, cwd: string): Promise { // of the app (stop IPC, status tracking, event forwarding) needs no special // casing. export class CodeSessionService { - private readonly runsRepo: IRunsRepo; + private readonly codeEventStore: CodeEventStore; private readonly runsLock: IRunsLock; - private readonly bus: IBus; + private readonly codeEventBus: IBus; private readonly idGenerator: IMonotonicallyIncreasingIdGenerator; private readonly abortRegistry: IAbortRegistry; private readonly codeModeManager: CodeModeManager; @@ -73,9 +74,9 @@ export class CodeSessionService { private readonly inflight = new Set(); constructor({ - runsRepo, + codeEventStore, runsLock, - bus, + codeEventBus, idGenerator, abortRegistry, codeModeManager, @@ -83,9 +84,9 @@ export class CodeSessionService { codeSessionsRepo, codeProjectsRepo, }: { - runsRepo: IRunsRepo; + codeEventStore: CodeEventStore; runsLock: IRunsLock; - bus: IBus; + codeEventBus: IBus; idGenerator: IMonotonicallyIncreasingIdGenerator; abortRegistry: IAbortRegistry; codeModeManager: CodeModeManager; @@ -93,9 +94,9 @@ export class CodeSessionService { codeSessionsRepo: ICodeSessionsRepo; codeProjectsRepo: ICodeProjectsRepo; }) { - this.runsRepo = runsRepo; + this.codeEventStore = codeEventStore; this.runsLock = runsLock; - this.bus = bus; + this.codeEventBus = codeEventBus; this.idGenerator = idGenerator; this.abortRegistry = abortRegistry; this.codeModeManager = codeModeManager; @@ -108,16 +109,11 @@ export class CodeSessionService { const project = await this.codeProjectsRepo.get(args.projectId); if (!project) throw new Error(`Unknown project: ${args.projectId}`); - // The session is a real run so Rowboat mode (agent runtime) works on it - // directly and the existing runs plumbing (fetch/events/stop) applies. - const { createRun } = await import('../../runs/runs.js'); - const run = await createRun({ - agentId: 'copilot', - useCase: 'code_session', - ...(args.model ? { model: args.model } : {}), - ...(args.provider ? { provider: args.provider } : {}), - }); - const sessionId = run.id; + // The session id is its own opaque key — code-mode owns its event log + // (codeEventStore) and metadata (codeSessionsRepo) under this id; no run + // record is minted. Rowboat mode reuses this id as a sessions-runtime + // session id (wired in a later step). + const sessionId = crypto.randomUUID(); let cwd = project.path; let worktree: CodeSession['worktree']; @@ -181,12 +177,12 @@ export class CodeSessionService { const toolCallId = `direct-${turnId}`; const appendAndPublish = async (event: z.infer) => { - await this.runsRepo.appendEvents(sessionId, [event]); - await this.bus.publish(event); + await this.codeEventStore.append(sessionId, [event]); + await this.codeEventBus.publish(event); }; try { - await this.bus.publish({ runId: sessionId, type: 'run-processing-start', subflow: [] }); + await this.codeEventBus.publish({ runId: sessionId, type: 'run-processing-start', subflow: [] }); const userEvent: z.infer = { runId: sessionId, @@ -228,14 +224,14 @@ export class CodeSessionService { event, subflow: [], }; - void this.bus.publish(streamEvent); + void this.codeEventBus.publish(streamEvent); if (event.type === 'tool_call' || event.type === 'tool_call_update' || event.type === 'plan' || event.type === 'permission') { persistQueue.push({ ...streamEvent, ts: new Date().toISOString() }); } }, ask: (permAsk) => this.codePermissionRegistry.request(sessionId, (requestId) => { - void this.bus.publish({ + void this.codeEventBus.publish({ runId: sessionId, type: 'code-run-permission-request', toolCallId, @@ -256,7 +252,7 @@ export class CodeSessionService { } if (persistQueue.length > 0) { - await this.runsRepo.appendEvents(sessionId, persistQueue); + await this.codeEventStore.append(sessionId, persistQueue); } if (finalText.trim()) { await appendAndPublish({ @@ -282,7 +278,7 @@ export class CodeSessionService { this.inflight.delete(sessionId); this.abortRegistry.cleanup(sessionId); await this.runsLock.release(sessionId); - await this.bus.publish({ runId: sessionId, type: 'run-processing-end', subflow: [] }); + await this.codeEventBus.publish({ runId: sessionId, type: 'run-processing-end', subflow: [] }); } } @@ -349,7 +345,7 @@ export class CodeSessionService { } await clearStoredSession(sessionId); await this.codeSessionsRepo.remove(sessionId); - await this.runsRepo.delete(sessionId).catch(() => {}); + await this.codeEventStore.delete(sessionId).catch(() => {}); await fs.rm(path.join(WorkDir, 'config', `workdir-${sessionId}.json`), { force: true }).catch(() => {}); } diff --git a/apps/x/packages/core/src/code-mode/sessions/status-tracker.ts b/apps/x/packages/core/src/code-mode/sessions/status-tracker.ts index 92abdf80..537817bd 100644 --- a/apps/x/packages/core/src/code-mode/sessions/status-tracker.ts +++ b/apps/x/packages/core/src/code-mode/sessions/status-tracker.ts @@ -13,7 +13,7 @@ export type StatusListener = (sessionId: string, status: CodeSessionStatus) => v // direct turns and Rowboat-mode code_agent_run turns publish the same event // types on the bus. The renderer just renders what this pushes. export class CodeSessionStatusTracker { - private readonly bus: IBus; + private readonly codeEventBus: IBus; private readonly codeSessionsRepo: ICodeSessionsRepo; private readonly statuses = new Map(); private readonly busySince = new Map(); @@ -27,15 +27,15 @@ export class CodeSessionStatusTracker { // an id that misses the refresh can never become a session later. private readonly knownNonSessions = new Set(); - constructor({ bus, codeSessionsRepo }: { bus: IBus; codeSessionsRepo: ICodeSessionsRepo }) { - this.bus = bus; + constructor({ codeEventBus, codeSessionsRepo }: { codeEventBus: IBus; codeSessionsRepo: ICodeSessionsRepo }) { + this.codeEventBus = codeEventBus; this.codeSessionsRepo = codeSessionsRepo; } async start(): Promise { if (this.unsubscribe) return; await this.refreshKnownSessions(); - this.unsubscribe = await this.bus.subscribe('*', async (event) => { + this.unsubscribe = await this.codeEventBus.subscribe('*', async (event) => { await this.handle(event); }); } diff --git a/apps/x/packages/core/src/di/container.ts b/apps/x/packages/core/src/di/container.ts index 03648b11..70e73309 100644 --- a/apps/x/packages/core/src/di/container.ts +++ b/apps/x/packages/core/src/di/container.ts @@ -2,6 +2,7 @@ import { asClass, asValue, createContainer, InjectionMode } from "awilix"; import { FSModelConfigRepo, IModelConfigRepo } from "../models/repo.js"; import { FSMcpConfigRepo, IMcpConfigRepo } from "../mcp/repo.js"; import { FSAgentsRepo, IAgentsRepo } from "../agents/repo.js"; +import { FSRunsRepo, IRunsRepo } from "../runs/repo.js"; import { IMonotonicallyIncreasingIdGenerator, IdGen } from "../application/lib/id-gen.js"; import { IBus, InMemoryBus } from "../application/lib/bus.js"; import { IRunsLock, InMemoryRunsLock } from "../runs/lock.js"; @@ -17,6 +18,7 @@ import { CodeModeManager } from "../code-mode/acp/manager.js"; import { CodePermissionRegistry } from "../code-mode/acp/permission-registry.js"; import { FSCodeProjectsRepo, ICodeProjectsRepo } from "../code-mode/projects/repo.js"; import { FSCodeSessionsRepo, ICodeSessionsRepo } from "../code-mode/sessions/repo.js"; +import { CodeEventStore } from "../code-mode/sessions/event-store.js"; import { CodeSessionService } from "../code-mode/sessions/service.js"; import { CodeSessionStatusTracker } from "../code-mode/sessions/status-tracker.js"; import type { IBrowserControlService } from "../application/browser-control/service.js"; @@ -36,6 +38,9 @@ container.register({ mcpConfigRepo: asClass(FSMcpConfigRepo).singleton(), modelConfigRepo: asClass(FSModelConfigRepo).singleton(), agentsRepo: asClass(FSAgentsRepo).singleton(), + // Generic run event-log store (JSONL). The LLM agent runtime that once drove + // it is retired; code-mode now uses it as its session event store. + runsRepo: asClass(FSRunsRepo).singleton(), oauthRepo: asClass(FSOAuthRepo).singleton(), clientRegistrationRepo: asClass(FSClientRegistrationRepo).singleton(), granolaConfigRepo: asClass(FSGranolaConfigRepo).singleton(), @@ -51,9 +56,13 @@ container.register({ codePermissionRegistry: asClass(CodePermissionRegistry).singleton(), // Code section: project registry, session metadata, the direct-drive - // session service, and the live status tracker. + // session service, and the live status tracker. Code-mode owns its own + // event store + event bus (dedicated runtime, decoupled from the retired + // LLM agent runtime's runs/ infra). codeProjectsRepo: asClass(FSCodeProjectsRepo).singleton(), codeSessionsRepo: asClass(FSCodeSessionsRepo).singleton(), + codeEventStore: asClass(CodeEventStore).singleton(), + codeEventBus: asClass(InMemoryBus).singleton(), codeSessionService: asClass(CodeSessionService).singleton(), codeSessionStatusTracker: asClass(CodeSessionStatusTracker).singleton(), }); diff --git a/apps/x/packages/core/src/runs/bus.ts b/apps/x/packages/core/src/runs/bus.ts new file mode 100644 index 00000000..34f826d8 --- /dev/null +++ b/apps/x/packages/core/src/runs/bus.ts @@ -0,0 +1,4 @@ +import container from "../di/container.js"; +import { IBus } from "../application/lib/bus.js"; + +export const bus = container.resolve('bus'); diff --git a/apps/x/packages/core/src/runs/runs.ts b/apps/x/packages/core/src/runs/runs.ts index 80d7f7c9..91104df8 100644 --- a/apps/x/packages/core/src/runs/runs.ts +++ b/apps/x/packages/core/src/runs/runs.ts @@ -1,19 +1,18 @@ import z from "zod"; import container from "../di/container.js"; -import { IMessageQueue, UserMessageContentType, VoiceOutputMode, MiddlePaneContext } from "../application/lib/message-queue.js"; -import { AskHumanResponseEvent, ToolPermissionRequestEvent, ToolPermissionResponseEvent, CreateRunOptions, Run, ListRunsResponse, ToolPermissionAuthorizePayload, AskHumanResponsePayload } from "@x/shared/dist/runs.js"; +import { CreateRunOptions, Run } from "@x/shared/dist/runs.js"; import { IRunsRepo } from "./repo.js"; -import { ICodeSessionsRepo } from "../code-mode/sessions/repo.js"; -import { IAgentRuntime } from "../agents/runtime.js"; import { IBus } from "../application/lib/bus.js"; -import { IAbortRegistry } from "./abort-registry.js"; -import { IRunsLock } from "./lock.js"; -import { forceCloseAllMcpClients } from "../mcp/mcp.js"; -import { extractCommandNames } from "../application/lib/command-executor.js"; -import { addFileAccessGrant, addToSecurityConfig } from "../config/security.js"; import { loadAgent } from "../agents/runtime.js"; import { getDefaultModelAndProvider } from "../models/defaults.js"; +// The generic run event-log helpers that survive the retirement of the old LLM +// agent runtime. The message/permission/stop helpers that drove the LLM loop +// (createMessage → agentRuntime.trigger, authorizePermission, replyToHumanInput, +// stop) are gone with it; chat + headless run on the new sessions/turn runtime. +// What remains is the minimal surface code-mode uses to mint and read a session's +// append-only event log: createRun (id + start event) and fetchRun. + export async function createRun(opts: z.infer): Promise> { const repo = container.resolve('runsRepo'); const bus = container.resolve('bus'); @@ -41,114 +40,7 @@ export async function createRun(opts: z.infer): Promise return run; } -export async function createMessage(runId: string, message: UserMessageContentType, voiceInput?: boolean, voiceOutput?: VoiceOutputMode, searchEnabled?: boolean, middlePaneContext?: MiddlePaneContext, codeMode?: 'claude' | 'codex', codeCwd?: string, codePolicy?: 'ask' | 'auto-approve-reads' | 'yolo'): Promise { - // Code-section sessions carry their coding context in the session meta. - // Pin it here — not in the composer — so EVERY path into the run (assistant - // chat pane, voice, palette) drives the session's agent in its directory, - // and the session header stays the single source of truth. - try { - const sessionMeta = await container.resolve('codeSessionsRepo').get(runId); - if (sessionMeta) { - codeMode = sessionMeta.agent; - codeCwd = sessionMeta.cwd; - codePolicy = sessionMeta.policy; - } - } catch { - // sessions repo unavailable — treat as a regular chat run - } - const queue = container.resolve('messageQueue'); - const id = await queue.enqueue(runId, message, voiceInput, voiceOutput, searchEnabled, middlePaneContext, codeMode, codeCwd, codePolicy); - const runtime = container.resolve('agentRuntime'); - runtime.trigger(runId); - return id; -} - -export async function authorizePermission(runId: string, ev: z.infer): Promise { - const { scope, ...rest } = ev; - - // For "always" scope, derive command from the run log and persist to security config - if (rest.response === "approve" && scope === "always") { - const repo = container.resolve('runsRepo'); - const run = await repo.fetch(runId); - const permReqEvent = run.log.find( - (e): e is z.infer => - e.type === "tool-permission-request" - && e.toolCall.toolCallId === rest.toolCallId - && JSON.stringify(e.subflow) === JSON.stringify(rest.subflow) - ); - if (permReqEvent?.permission?.kind === "file") { - await addFileAccessGrant({ - operation: permReqEvent.permission.operation, - pathPrefix: permReqEvent.permission.pathPrefix, - }); - } else if (permReqEvent && typeof permReqEvent.toolCall.arguments === 'object' && permReqEvent.toolCall.arguments !== null && 'command' in permReqEvent.toolCall.arguments) { - const commandNames = extractCommandNames(String(permReqEvent.toolCall.arguments.command)); - if (commandNames.length > 0) { - await addToSecurityConfig(commandNames); - } - } - } - - const repo = container.resolve('runsRepo'); - const event: z.infer = { - ...rest, - runId, - type: "tool-permission-response", - scope, - }; - await repo.appendEvents(runId, [event]); - const runtime = container.resolve('agentRuntime'); - runtime.trigger(runId); -} - -export async function replyToHumanInputRequest(runId: string, ev: z.infer): Promise { - const repo = container.resolve('runsRepo'); - const event: z.infer = { - ...ev, - runId, - type: "ask-human-response", - }; - await repo.appendEvents(runId, [event]); - const runtime = container.resolve('agentRuntime'); - runtime.trigger(runId); -} - -export async function stop(runId: string, force: boolean = false): Promise { - const abortRegistry = container.resolve('abortRegistry'); - - if (force && abortRegistry.isAborted(runId)) { - // Second click: aggressive cleanup — SIGKILL + force close MCP clients - console.log(`Force stopping run ${runId}`); - abortRegistry.forceAbort(runId); - await forceCloseAllMcpClients(); - } else { - // First click: graceful — fires AbortSignal + SIGTERM - console.log(`Gracefully stopping run ${runId}`); - abortRegistry.abort(runId); - } - // Note: The run-stopped event is emitted by AgentRuntime.trigger() when it detects the abort. - // This avoids duplicate events and ensures proper sequencing. -} - -export async function deleteRun(runId: string): Promise { - const runsLock = container.resolve('runsLock'); - if (!await runsLock.lock(runId)) { - throw new Error(`Cannot delete run ${runId}: run is currently active`); - } - try { - const repo = container.resolve('runsRepo'); - await repo.delete(runId); - } finally { - await runsLock.release(runId); - } -} - export async function fetchRun(runId: string): Promise> { const repo = container.resolve('runsRepo'); return repo.fetch(runId); } - -export async function listRuns(cursor?: string): Promise> { - const repo = container.resolve('runsRepo'); - return repo.list(cursor); -} diff --git a/apps/x/packages/core/src/storage/migrations.ts b/apps/x/packages/core/src/storage/migrations.ts index e88cbaa7..052da721 100644 --- a/apps/x/packages/core/src/storage/migrations.ts +++ b/apps/x/packages/core/src/storage/migrations.ts @@ -156,6 +156,31 @@ const migrations: Record = { await db.schema.alterTable("agent_loop_turns").dropColumn("use_case").execute(); }, }, + "2026-06-15_0008_code_session_events": { + async up(db: MigrationDb): Promise { + // Code-mode's own append-only event log (direct ACP sessions), + // replacing the generic runs/ JSONL store. + await db.schema + .createTable("code_session_events") + .ifNotExists() + .addColumn("id", "integer", (col) => col.primaryKey().autoIncrement()) + .addColumn("session_id", "text", (col) => col.notNull()) + .addColumn("event", "text", (col) => col.notNull()) + .addColumn("created_at", "text", (col) => col.notNull()) + .execute(); + + await db.schema + .createIndex("code_session_events_session_id_idx") + .ifNotExists() + .on("code_session_events") + .column("session_id") + .execute(); + }, + async down(db: MigrationDb): Promise { + await db.schema.dropIndex("code_session_events_session_id_idx").ifExists().execute(); + await db.schema.dropTable("code_session_events").ifExists().execute(); + }, + }, }; class InCodeMigrationProvider implements MigrationProvider { diff --git a/apps/x/packages/core/src/storage/schema.ts b/apps/x/packages/core/src/storage/schema.ts index 1e8879e5..67f5e7d2 100644 --- a/apps/x/packages/core/src/storage/schema.ts +++ b/apps/x/packages/core/src/storage/schema.ts @@ -1,4 +1,4 @@ -import type { ColumnType } from "kysely"; +import type { ColumnType, Generated } from "kysely"; export type TimestampColumn = ColumnType; @@ -40,8 +40,19 @@ export interface SessionsTable { updated_at: TimestampColumn; } +// Append-only event log for code-mode (direct ACP) sessions. This is code-mode's +// own dedicated event store — it replaces the generic runs/ JSONL log the old +// agent runtime shared. One row per RunEvent; ordered by the autoincrement id. +export interface CodeSessionEventsTable { + id: Generated; + session_id: string; + event: string; // JSON: RunEvent + created_at: TimestampColumn; +} + export interface Database { storage_metadata: StorageMetadataTable; agent_loop_turns: AgentLoopTurnsTable; sessions: SessionsTable; + code_session_events: CodeSessionEventsTable; } diff --git a/apps/x/packages/shared/src/ipc.ts b/apps/x/packages/shared/src/ipc.ts index 6487b0de..94beb1a6 100644 --- a/apps/x/packages/shared/src/ipc.ts +++ b/apps/x/packages/shared/src/ipc.ts @@ -19,9 +19,9 @@ import { RowboatApiConfig } from './rowboat-account.js'; import { ZListToolkitsResponse } from './composio.js'; import { BrowserStateSchema } from './browser-control.js'; import { BillingInfoSchema } from './billing.js'; -import { EmailBlockSchema, GmailThreadSchema } from './blocks.js'; +import { GmailThreadSchema } from './blocks.js'; import { PermissionDecision, ApprovalPolicy, CodingAgent } from './code-mode.js'; -import { Run } from './runs.js'; +import { Run, RunEvent } from './runs.js'; import { NotificationSettingsSchema } from './notification-settings.js'; import { CodeProject, CodeSession, CodeSessionMode, CodeSessionStatus, GitRepoInfo, GitStatusFile } from './code-sessions.js'; @@ -572,6 +572,17 @@ const ipcSchemas = { }), res: z.null(), }, + // Code-mode's own transcript history (replaces the generic runs:fetch). + 'codeSession:getEvents': { + req: z.object({ sessionId: z.string() }), + res: z.object({ events: z.array(RunEvent) }), + }, + // main → renderer: code-mode's live event feed (replaces runs:events). Carries + // the session's RunEvents (code-run-event, message, processing, …). + 'codeSession:events': { + req: RunEvent, + res: z.null(), + }, // ========================================================================== // Embedded terminal (Code section): one PTY per coding session // ==========================================================================