Step 3: delete the old runs/ runtime infrastructure

With direct code-mode on its own event store/bus (step 1) and rowboat on the
new sessions runtime (step 2), nothing uses the generic runs/ infra anymore.

- delete runs/runs.ts, runs/repo.ts, runs/bus.ts (keep runs/lock.ts +
  runs/abort-registry.ts — the new runtime uses them)
- remove the runs:fetch + runs:events IPC channels, their handlers, the
  bus -> runs:events forwarder (emitRunEvent / startRunsWatcher / stopRunsWatcher)
- drop the now-unused runsRepo + bus DI registrations

The old LLM agent runtime AND its run event-log/bus are now fully gone; chat,
headless, and both code-mode modes run on the new runtime (code-mode direct on
its own dedicated event store + bus).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ramnique Singh 2026-06-15 11:24:44 +05:30
parent 15a08da783
commit deda770e6e
8 changed files with 5 additions and 442 deletions

View file

@ -10,9 +10,6 @@ import {
import { watcher as watcherCore, workspace } from '@x/core';
import { workspace as workspaceShared } from '@x/shared';
import * as mcpCore from '@x/core/dist/mcp/mcp.js';
import * as runsCore from '@x/core/dist/runs/runs.js';
import { bus } from '@x/core/dist/runs/bus.js';
import { RunEvent } from '@x/shared/dist/runs.js';
import type { AgentRuntime } from '@x/core/dist/agent-runtime/index.js';
import type { SessionBusEvent } from '@x/shared/dist/sessions.js';
import { serviceBus } from '@x/core/dist/services/service_bus.js';
@ -417,35 +414,6 @@ export async function startCodeEventWatcher(): Promise<void> {
});
}
// Forward the generic event bus → renderer (runs:events). Code-mode (direct ACP
// sessions) streams its live events (code-run-event, permission, message, …)
// through this feed; chat + headless use the sessions:events feed below.
function emitRunEvent(event: z.infer<typeof RunEvent>): void {
const windows = BrowserWindow.getAllWindows();
for (const win of windows) {
if (!win.isDestroyed() && win.webContents) {
win.webContents.send('runs:events', event);
}
}
}
let runsWatcher: (() => void) | null = null;
export async function startRunsWatcher(): Promise<void> {
if (runsWatcher) {
return;
}
runsWatcher = await bus.subscribe('*', async (event) => {
emitRunEvent(event);
});
}
export function stopRunsWatcher(): void {
if (runsWatcher) {
runsWatcher();
runsWatcher = null;
}
}
function emitSessionEvent(event: SessionBusEvent): void {
const windows = BrowserWindow.getAllWindows();
for (const win of windows) {
@ -687,10 +655,6 @@ export function setupIpcHandlers(agentRuntime: AgentRuntime) {
registry.resolve(args.requestId, args.decision);
return { success: true };
},
// Code-mode reads a session's transcript from the generic run event-log.
'runs:fetch': async (_event, args) => {
return runsCore.fetchRun(args.runId);
},
// Code-mode's own transcript history (its dedicated SQLite event store).
'codeSession:getEvents': async (_event, args) => {
const store = container.resolve<CodeEventStore>('codeEventStore');

View file

@ -2,7 +2,6 @@ import { app, BrowserWindow, desktopCapturer, protocol, net, shell, session, typ
import path from "node:path";
import {
setupIpcHandlers,
startRunsWatcher,
startCodeSessionStatusWatcher,
startCodeEventWatcher,
startSessionsWatcher,
@ -364,10 +363,6 @@ app.whenReady().then(async () => {
// start sessions watcher (new runtime event feed → renderer)
startSessionsWatcher(agentRuntime);
// start runs watcher — forwards the generic event bus → renderer (runs:events).
// Code-mode (direct ACP sessions) streams its live events through this feed.
startRunsWatcher();
// start code-session status tracker (derives working/needs-you/idle + notifications)
startCodeSessionStatusWatcher();

View file

@ -53,10 +53,10 @@ function messageText(content: unknown): string {
return ''
}
// Conversation state for one coding session, fed by the run JSONL (history)
// and the live runs:events stream. Handles both modes: direct turns arrive as
// code-run-events with a `direct-` toolCallId; Rowboat turns arrive as the
// usual LLM message/tool events (incl. code_agent_run blocks).
// Conversation state for a DIRECT coding session, fed by code-mode's own event
// store (codeSession:getEvents) and live feed (codeSession:events). Rowboat
// sessions render in the main chat on the new sessions runtime instead, so this
// hook is only mounted for direct mode.
export function useCodeChat(session: CodeSession | null) {
const sessionId = session?.id ?? null
const [items, setItems] = useState<CodeChatItem[]>([])

View file

@ -2,7 +2,6 @@ import { asClass, asValue, createContainer, InjectionMode } from "awilix";
import { FSModelConfigRepo, IModelConfigRepo } from "../models/repo.js";
import { FSMcpConfigRepo, IMcpConfigRepo } from "../mcp/repo.js";
import { FSAgentsRepo, IAgentsRepo } from "../agents/repo.js";
import { FSRunsRepo, IRunsRepo } from "../runs/repo.js";
import { IMonotonicallyIncreasingIdGenerator, IdGen } from "../application/lib/id-gen.js";
import { IBus, InMemoryBus } from "../application/lib/bus.js";
import { IRunsLock, InMemoryRunsLock } from "../runs/lock.js";
@ -31,16 +30,12 @@ const container = createContainer({
container.register({
idGenerator: asClass<IMonotonicallyIncreasingIdGenerator>(IdGen).singleton(),
bus: asClass<IBus>(InMemoryBus).singleton(),
runsLock: asClass<IRunsLock>(InMemoryRunsLock).singleton(),
abortRegistry: asClass<IAbortRegistry>(InMemoryAbortRegistry).singleton(),
mcpConfigRepo: asClass<IMcpConfigRepo>(FSMcpConfigRepo).singleton(),
modelConfigRepo: asClass<IModelConfigRepo>(FSModelConfigRepo).singleton(),
agentsRepo: asClass<IAgentsRepo>(FSAgentsRepo).singleton(),
// Generic run event-log store (JSONL). The LLM agent runtime that once drove
// it is retired; code-mode now uses it as its session event store.
runsRepo: asClass<IRunsRepo>(FSRunsRepo).singleton(),
oauthRepo: asClass<IOAuthRepo>(FSOAuthRepo).singleton(),
clientRegistrationRepo: asClass<IClientRegistrationRepo>(FSClientRegistrationRepo).singleton(),
granolaConfigRepo: asClass<IGranolaConfigRepo>(FSGranolaConfigRepo).singleton(),

View file

@ -1,4 +0,0 @@
import container from "../di/container.js";
import { IBus } from "../application/lib/bus.js";
export const bus = container.resolve<IBus>('bus');

View file

@ -1,328 +0,0 @@
import z from "zod";
import { IMonotonicallyIncreasingIdGenerator } from "../application/lib/id-gen.js";
import { WorkDir } from "../config/config.js";
import path from "path";
import fsp from "fs/promises";
import fs from "fs";
import readline from "readline";
import { Run, RunEvent, StartEvent, ListRunsResponse, MessageEvent, UseCase } from "@x/shared/dist/runs.js";
import { getDefaultModelAndProvider } from "../models/defaults.js";
/**
* Reading-only schemas: extend the canonical `StartEvent` / `RunEvent` to
* accept legacy run files written before `model`/`provider` were required.
*
* `RunEvent.or(LegacyStartEvent)` works because zod unions try left-to-right:
* for any non-start event RunEvent matches first; for a strict start event
* RunEvent still matches; only a legacy start event falls through and parses
* as LegacyStartEvent. New event types stay maintained in one place
* (`@x/shared/dist/runs.js`) the lenient form just adds one fallback variant.
*/
const LegacyStartEvent = StartEvent.extend({
model: z.string().optional(),
provider: z.string().optional(),
// Pre-rename run files carry `useCase: "track_block"`. Map it to its
// canonical successor on read so the strict downstream types never see
// the old value. Read-only — writes always use the current enum.
useCase: z.preprocess(
(v) => (v === 'track_block' ? 'live_note_agent' : v),
StartEvent.shape.useCase,
),
});
const ReadRunEvent = RunEvent.or(LegacyStartEvent);
export type CreateRunRepoOptions = {
agentId: string;
model: string;
provider: string;
permissionMode: "manual" | "auto";
useCase: z.infer<typeof UseCase>;
subUseCase?: string;
};
function runLogPath(runId: string): string {
return path.join(WorkDir, 'runs', `${runId}.jsonl`);
}
export interface IRunsRepo {
create(options: CreateRunRepoOptions): Promise<z.infer<typeof Run>>;
fetch(id: string): Promise<z.infer<typeof Run>>;
list(cursor?: string): Promise<z.infer<typeof ListRunsResponse>>;
appendEvents(runId: string, events: z.infer<typeof RunEvent>[]): Promise<void>;
delete(id: string): Promise<void>;
}
/**
* Strip attached-files XML from message content for title display (keeps @mentions)
*/
function cleanContentForTitle(content: string): string {
// Remove the entire attached-files block
let cleaned = content.replace(/<attached-files>\s*[\s\S]*?\s*<\/attached-files>/g, '');
// Clean up extra whitespace
cleaned = cleaned.replace(/\s+/g, ' ').trim();
return cleaned;
}
export class FSRunsRepo implements IRunsRepo {
private idGenerator: IMonotonicallyIncreasingIdGenerator;
constructor({
idGenerator,
}: {
idGenerator: IMonotonicallyIncreasingIdGenerator;
}) {
this.idGenerator = idGenerator;
// ensure default runs directory exists
fsp.mkdir(path.join(WorkDir, 'runs'), { recursive: true });
}
private extractTitle(events: z.infer<typeof RunEvent>[]): string | undefined {
for (const event of events) {
if (event.type === 'message') {
const messageEvent = event as z.infer<typeof MessageEvent>;
if (messageEvent.message.role === 'user') {
const content = messageEvent.message.content;
let textContent: string | undefined;
if (typeof content === 'string') {
textContent = content;
} else {
textContent = content
.filter(p => p.type === 'text')
.map(p => p.text)
.join('');
}
if (textContent && textContent.trim()) {
const cleaned = cleanContentForTitle(textContent);
if (!cleaned) continue;
return cleaned.length > 100 ? cleaned.substring(0, 100) : cleaned;
}
}
}
}
return undefined;
}
/**
* Read file line-by-line using streams, stopping early once we have
* the start event and title (or determine there's no title).
*
* Parses the start event with `LegacyStartEvent` so runs written before
* `model`/`provider` were required still surface in the list view.
*/
private async readRunMetadata(filePath: string): Promise<{
start: z.infer<typeof LegacyStartEvent>;
title: string | undefined;
} | null> {
return new Promise((resolve) => {
const stream = fs.createReadStream(filePath, { encoding: 'utf8' });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity });
let start: z.infer<typeof LegacyStartEvent> | null = null;
let title: string | undefined;
let lineIndex = 0;
rl.on('line', (line) => {
const trimmed = line.trim();
if (!trimmed) return;
try {
if (lineIndex === 0) {
start = LegacyStartEvent.parse(JSON.parse(trimmed));
} else {
// Subsequent lines - look for first user message or assistant response
const event = ReadRunEvent.parse(JSON.parse(trimmed));
if (event.type === 'message') {
const msg = event.message;
if (msg.role === 'user') {
// Found first user message - use as title
const content = msg.content;
let textContent: string | undefined;
if (typeof content === 'string') {
textContent = content;
} else {
textContent = content
.filter(p => p.type === 'text')
.map(p => p.text)
.join('');
}
if (textContent && textContent.trim()) {
const cleaned = cleanContentForTitle(textContent);
if (cleaned) {
title = cleaned.length > 100 ? cleaned.substring(0, 100) : cleaned;
}
}
// Stop reading
rl.close();
stream.destroy();
return;
} else if (msg.role === 'assistant') {
// Assistant responded before any user message - no title
rl.close();
stream.destroy();
return;
}
}
}
lineIndex++;
} catch {
// Skip malformed lines
}
});
rl.on('close', () => {
if (start) {
resolve({ start, title });
} else {
resolve(null);
}
});
rl.on('error', () => {
resolve(null);
});
stream.on('error', () => {
rl.close();
resolve(null);
});
});
}
async appendEvents(runId: string, events: z.infer<typeof RunEvent>[]): Promise<void> {
await fsp.appendFile(
runLogPath(runId),
events.map(event => JSON.stringify(event)).join("\n") + "\n"
);
}
async create(options: CreateRunRepoOptions): Promise<z.infer<typeof Run>> {
const runId = await this.idGenerator.next();
const ts = new Date().toISOString();
const start: z.infer<typeof StartEvent> = {
type: "start",
runId,
agentName: options.agentId,
model: options.model,
provider: options.provider,
permissionMode: options.permissionMode,
useCase: options.useCase,
...(options.subUseCase ? { subUseCase: options.subUseCase } : {}),
subflow: [],
ts,
};
await this.appendEvents(runId, [start]);
return {
id: runId,
createdAt: ts,
agentId: options.agentId,
model: options.model,
provider: options.provider,
permissionMode: options.permissionMode,
useCase: options.useCase,
...(options.subUseCase ? { subUseCase: options.subUseCase } : {}),
log: [start],
};
}
async fetch(id: string): Promise<z.infer<typeof Run>> {
const contents = await fsp.readFile(runLogPath(id), 'utf8');
// Parse with the lenient schema so legacy start events (no model/provider) load.
const rawEvents = contents.split('\n')
.filter(line => line.trim() !== '')
.map(line => ReadRunEvent.parse(JSON.parse(line)));
if (rawEvents.length === 0 || rawEvents[0].type !== 'start') {
throw new Error('Corrupt run data');
}
// Backfill model/provider on the start event from current defaults if missing,
// then promote to the canonical strict types for callers.
const rawStart = rawEvents[0];
const defaults = (!rawStart.model || !rawStart.provider)
? await getDefaultModelAndProvider()
: null;
const start: z.infer<typeof StartEvent> = {
...rawStart,
model: rawStart.model ?? defaults!.model,
provider: rawStart.provider ?? defaults!.provider,
};
const events: z.infer<typeof RunEvent>[] = [start, ...rawEvents.slice(1) as z.infer<typeof RunEvent>[]];
const title = this.extractTitle(events);
return {
id,
title,
createdAt: start.ts!,
agentId: start.agentName,
model: start.model,
provider: start.provider,
permissionMode: start.permissionMode ?? "manual",
...(start.useCase ? { useCase: start.useCase } : {}),
...(start.subUseCase ? { subUseCase: start.subUseCase } : {}),
log: events,
};
}
async list(cursor?: string): Promise<z.infer<typeof ListRunsResponse>> {
const runsDir = path.join(WorkDir, 'runs');
const PAGE_SIZE = 20;
let files: string[] = [];
try {
const entries = await fsp.readdir(runsDir, { withFileTypes: true });
files = entries
.filter(e => e.isFile() && e.name.endsWith('.jsonl'))
.map(e => e.name);
} catch (err: unknown) {
const e = err as { code?: string };
if (e.code === 'ENOENT') {
return { runs: [] };
}
throw err;
}
files.sort((a, b) => b.localeCompare(a));
const cursorFile = cursor;
let startIndex = 0;
if (cursorFile) {
const exact = files.indexOf(cursorFile);
if (exact >= 0) {
startIndex = exact + 1;
} else {
const firstOlder = files.findIndex(name => name.localeCompare(cursorFile) < 0);
startIndex = firstOlder === -1 ? files.length : firstOlder;
}
}
const selected = files.slice(startIndex, startIndex + PAGE_SIZE);
const runs: z.infer<typeof ListRunsResponse>['runs'] = [];
for (const name of selected) {
const runId = name.slice(0, -'.jsonl'.length);
const metadata = await this.readRunMetadata(path.join(runsDir, name));
if (!metadata) {
continue;
}
runs.push({
id: runId,
title: metadata.title,
createdAt: metadata.start.ts!,
agentId: metadata.start.agentName,
...(metadata.start.useCase ? { useCase: metadata.start.useCase } : {}),
});
}
const hasMore = startIndex + PAGE_SIZE < files.length;
const nextCursor = hasMore && selected.length > 0
? selected[selected.length - 1]
: undefined;
return {
runs,
...(nextCursor ? { nextCursor } : {}),
};
}
async delete(id: string): Promise<void> {
await fsp.unlink(runLogPath(id));
}
}

View file

@ -1,46 +0,0 @@
import z from "zod";
import container from "../di/container.js";
import { CreateRunOptions, Run } from "@x/shared/dist/runs.js";
import { IRunsRepo } from "./repo.js";
import { IBus } from "../application/lib/bus.js";
import { loadAgent } from "../agents/runtime.js";
import { getDefaultModelAndProvider } from "../models/defaults.js";
// The generic run event-log helpers that survive the retirement of the old LLM
// agent runtime. The message/permission/stop helpers that drove the LLM loop
// (createMessage → agentRuntime.trigger, authorizePermission, replyToHumanInput,
// stop) are gone with it; chat + headless run on the new sessions/turn runtime.
// What remains is the minimal surface code-mode uses to mint and read a session's
// append-only event log: createRun (id + start event) and fetchRun.
export async function createRun(opts: z.infer<typeof CreateRunOptions>): Promise<z.infer<typeof Run>> {
const repo = container.resolve<IRunsRepo>('runsRepo');
const bus = container.resolve<IBus>('bus');
// Resolve model+provider once at creation: opts > agent declaration > defaults.
// Both fields are plain strings (provider is a name, looked up at runtime).
// Use `||` (not `??`) so an empty-string override — what an LLM tool call
// sometimes synthesizes for "I'm not setting this" — falls through to the
// next link in the chain instead of being treated as a real value.
const agent = await loadAgent(opts.agentId);
const defaults = await getDefaultModelAndProvider();
const model = opts.model || agent.model || defaults.model;
const provider = opts.provider || agent.provider || defaults.provider;
const useCase = opts.useCase ?? "copilot_chat";
const run = await repo.create({
agentId: opts.agentId,
model,
provider,
permissionMode: opts.permissionMode ?? "manual",
useCase,
...(opts.subUseCase ? { subUseCase: opts.subUseCase } : {}),
});
await bus.publish(run.log[0]);
return run;
}
export async function fetchRun(runId: string): Promise<z.infer<typeof Run>> {
const repo = container.resolve<IRunsRepo>('runsRepo');
return repo.fetch(runId);
}

View file

@ -21,7 +21,7 @@ import { BrowserStateSchema } from './browser-control.js';
import { BillingInfoSchema } from './billing.js';
import { GmailThreadSchema } from './blocks.js';
import { PermissionDecision, ApprovalPolicy, CodingAgent } from './code-mode.js';
import { Run, RunEvent } from './runs.js';
import { RunEvent } from './runs.js';
import { NotificationSettingsSchema } from './notification-settings.js';
import { CodeProject, CodeSession, CodeSessionMode, CodeSessionStatus, GitRepoInfo, GitStatusFile } from './code-sessions.js';
@ -238,19 +238,6 @@ const ipcSchemas = {
result: z.unknown(),
}),
},
// Code-mode reuses the generic runs event-log + bus (decoupled from the
// retired LLM agent runtime): fetch a session's transcript and stream its
// live events. Chat + headless use the sessions:* channels instead.
'runs:fetch': {
req: z.object({
runId: z.string(),
}),
res: Run,
},
'runs:events': {
req: z.null(),
res: z.null(),
},
'services:events': {
req: ServiceEvent,
res: z.null(),