diff --git a/apps/x/packages/core/src/agent-loop/agent-loop.test.ts b/apps/x/packages/core/src/agent-loop/agent-loop.test.ts index 708d4acb..37503dd2 100644 --- a/apps/x/packages/core/src/agent-loop/agent-loop.test.ts +++ b/apps/x/packages/core/src/agent-loop/agent-loop.test.ts @@ -154,6 +154,14 @@ class SnapshottingStore implements TurnStore { this.snapshots.push(structuredClone(turn)); await this.inner.update(turn); } + + async latestForSession(sessionId: string) { + return this.inner.latestForSession(sessionId); + } + + async listBySession(sessionId: string) { + return this.inner.listBySession(sessionId); + } } function makeLoop(opts: { @@ -188,6 +196,8 @@ function emptyTurn( provider: null, model: null, permissionMode: "manual", + sessionId: null, + sessionSeq: null, messages: [], permissionRequests: [], permissionDecisions: [], @@ -211,12 +221,12 @@ describe("AgentLoopImpl", () => { it("completes a simple text turn and persists input metadata", async () => { const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi!") }] }); - const turn = await loop.createTurn({ + const turn = await (await loop.createTurn({ agentId: "a1", provider: "openai", model: "gpt-x", messages: [userMsg("hello")], - }).result; + })).result; expect(turn.agentId).toBe("a1"); expect(turn.provider).toBe("openai"); @@ -229,7 +239,8 @@ describe("AgentLoopImpl", () => { it("rejects a turn with no input messages", async () => { const { loop } = makeLoop(); - await expect(loop.createTurn({ messages: [] }).result).rejects.toThrow(); + // invalid input now fails at the call itself — nothing is persisted + await expect(loop.createTurn({ messages: [] })).rejects.toThrow(); }); it("runs tool calls: result and error both append ToolMessages and continue", async () => { @@ -245,7 +256,7 @@ describe("AgentLoopImpl", () => { }), }); - const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result; expect(runner.ran).toEqual(["tc1", "tc2"]); expect(toolMessages(turn)).toEqual([ @@ -267,7 +278,7 @@ describe("AgentLoopImpl", () => { runner: new FakeToolRunner({ "ask-human": () => ({ type: "pending" }) }), }); - const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result; expect(deriveTurnStatus(waiting)).toBe("waiting"); expect(waiting.dispatchedTools.map((t) => t.toolCallId)).toEqual(["tc1"]); expect(adapter.calls).toBe(1); // no model call while waiting @@ -291,7 +302,7 @@ describe("AgentLoopImpl", () => { runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }), }); - const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result; expect(waiting.dispatchedTools).toHaveLength(2); const stillWaiting = await loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "r1" }).result; @@ -309,7 +320,7 @@ describe("AgentLoopImpl", () => { runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }), }); - const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result; await expect( loop.setToolResult(waiting.id, { toolCallId: "bogus", result: "x" }).result, ).rejects.toThrow("not awaiting an external result"); @@ -327,7 +338,7 @@ describe("AgentLoopImpl", () => { gate: new FakePermissionGate({ required: () => true }), }); - const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result; expect(deriveTurnStatus(turn)).toBe("waiting"); expect(turn.permissionRequests.map((r) => r.toolCallId)).toEqual(["tc1", "tc2"]); @@ -351,7 +362,7 @@ describe("AgentLoopImpl", () => { gate: new FakePermissionGate({ required: () => true }), }); - const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result; const afterDeny = await loop .respondToPermission(waiting.id, "tc2", "denied", "nope").result; @@ -376,7 +387,7 @@ describe("AgentLoopImpl", () => { steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }], gate: new FakePermissionGate({ required: () => true }), }); - const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result; await expect( loop.respondToPermission(waiting.id, "bogus", "granted").result, ).rejects.toThrow("No open permission request"); @@ -413,7 +424,7 @@ describe("AgentLoopImpl", () => { gate, steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }], }); - const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result; expect(deriveTurnStatus(turn)).toBe("waiting"); expect(gate.classifyCalls).toBe(0); }); @@ -437,10 +448,10 @@ describe("AgentLoopImpl", () => { }), }); - const turn = await loop.createTurn({ + const turn = await (await loop.createTurn({ messages: [userMsg("go")], permissionMode: "auto", - }).result; + })).result; expect(deriveTurnStatus(turn)).toBe("completed"); expect(runner.ran).toEqual(["tc1"]); @@ -465,10 +476,10 @@ describe("AgentLoopImpl", () => { ], }); - const waiting = await loop.createTurn({ + const waiting = await (await loop.createTurn({ messages: [userMsg("go")], permissionMode: "auto", - }).result; + })).result; expect(deriveTurnStatus(waiting)).toBe("waiting"); expect(gate.classifyCalls).toBe(1); @@ -520,15 +531,10 @@ describe("AgentLoopImpl", () => { }); }); - it("stopTurn mid-stream persists no partial message; turn is idle and resumable", async () => { - const { loop } = makeLoop({ - steps: [ - { kind: "hang" }, - { kind: "message", message: assistantText("second try") }, - ], - }); + it("stopTurn mid-stream persists no partial message; the stop is a terminal error", async () => { + const { loop } = makeLoop({ steps: [{ kind: "hang" }] }); - const handle = loop.createTurn({ messages: [userMsg("go")] }); + const handle = await loop.createTurn({ messages: [userMsg("go")] }); // wait for the first streamed delta, then stop const iterator = handle.events[Symbol.asyncIterator](); const first = await iterator.next(); @@ -536,13 +542,12 @@ describe("AgentLoopImpl", () => { const stopped = await loop.stopTurn(handle.id); expect(stopped.messages).toEqual([userMsg("go")]); // no partial persisted - expect(stopped.error).toBeNull(); - expect(deriveTurnStatus(stopped)).toBe("idle"); + expect(stopped.error?.code).toBe("stopped"); + expect(deriveTurnStatus(stopped)).toBe("error"); await expect(handle.result).resolves.toBeTruthy(); // original handle also rests - const resumed = await loop.resumeTurn(handle.id).result; - expect(resumed.messages).toEqual([userMsg("go"), assistantText("second try")]); - expect(deriveTurnStatus(resumed)).toBe("completed"); + // stop is terminal: the turn can never be resumed or mutated + await expect(loop.resumeTurn(handle.id).result).rejects.toThrow("terminal error"); }); it("stopTurn immediately after createTurn aborts the queued advance", async () => { @@ -550,17 +555,71 @@ describe("AgentLoopImpl", () => { steps: [{ kind: "message", message: assistantText("should not run") }], }); - // No await between create and stop — the advance is still queued + // Stop as soon as the handle exists — the advance is still queued // behind the mutex and its controller must already be abortable. - const handle = loop.createTurn({ messages: [userMsg("go")] }); + const handle = await loop.createTurn({ messages: [userMsg("go")] }); const stopped = await loop.stopTurn(handle.id); expect(adapter.calls).toBe(0); // model never called expect(stopped.messages).toEqual([userMsg("go")]); // input fact persisted - expect(deriveTurnStatus(stopped)).toBe("idle"); + expect(stopped.error?.code).toBe("stopped"); + expect(deriveTurnStatus(stopped)).toBe("error"); + }); - const resumed = await loop.resumeTurn(handle.id).result; - expect(deriveTurnStatus(resumed)).toBe("completed"); + it("stopTurn never overwrites a finished turn's outcome", async () => { + const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi") }] }); + const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result; + + const stopped = await loop.stopTurn(turn.id); + expect(stopped.error).toBeNull(); + expect(deriveTurnStatus(stopped)).toBe("completed"); + // completed is just as terminal as stopped: no resume either + await expect(loop.resumeTurn(turn.id).result).rejects.toThrow("already completed"); + }); + + it("a stopped turn rejects every mutation: tool results, permission responses, resume", async () => { + const { loop } = makeLoop({ + steps: [{ + kind: "message", + message: assistantToolCalls(toolCall("tc1", "job"), toolCall("tc2", "write")), + }], + runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }), + gate: new FakePermissionGate({ required: (name) => name === "write" }), + }); + + // turn waits on tc2's permission; tc1 is deferred behind the batch + const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result; + expect(deriveTurnStatus(waiting)).toBe("waiting"); + + const stopped = await loop.stopTurn(waiting.id); + expect(stopped.error?.code).toBe("stopped"); + + await expect( + loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "late" }).result, + ).rejects.toThrow("terminal error"); + await expect( + loop.respondToPermission(waiting.id, "tc2", "granted").result, + ).rejects.toThrow("terminal error"); + await expect(loop.resumeTurn(waiting.id).result).rejects.toThrow("terminal error"); + + // and none of it mutated the stopped turn + const after = await loop.getTurn(waiting.id); + expect(after).toEqual(stopped); + }); + + it("stopTurn terminates a waiting turn so it can be abandoned", async () => { + const { loop } = makeLoop({ + steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }], + gate: new FakePermissionGate({ required: () => true }), + }); + const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result; + expect(deriveTurnStatus(waiting)).toBe("waiting"); + + const stopped = await loop.stopTurn(waiting.id); + expect(stopped.error?.code).toBe("stopped"); + expect(deriveTurnStatus(stopped)).toBe("error"); + await expect(loop.respondToPermission(waiting.id, "tc1", "granted").result) + .rejects.toThrow("terminal error"); }); it("a throwing tool runner becomes an error ToolMessage, not a turn error", async () => { @@ -574,7 +633,7 @@ describe("AgentLoopImpl", () => { }), }); - const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result; expect(turn.error).toBeNull(); expect(toolMessages(turn)[0]?.content).toBe("Tool execution failed: ECONNRESET"); expect(deriveTurnStatus(turn)).toBe("completed"); @@ -602,7 +661,7 @@ describe("AgentLoopImpl", () => { it("model failure is a terminal turn error; further mutations reject", async () => { const { loop } = makeLoop({ steps: [{ kind: "error", error: new Error("rate limited") }] }); - const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result; expect(deriveTurnStatus(turn)).toBe("error"); expect(turn.error?.message).toBe("rate limited"); @@ -623,7 +682,7 @@ describe("AgentLoopImpl", () => { ], }); - const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result; expect(deriveTurnStatus(turn)).toBe("error"); expect(turn.error?.message).toContain("exceeded 3 iterations"); }); @@ -640,7 +699,7 @@ describe("AgentLoopImpl", () => { runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }), }); - const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result; const [a, b] = await Promise.all([ loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "r1" }).result, @@ -664,7 +723,7 @@ describe("AgentLoopImpl", () => { ], }); - const handle = loop.createTurn({ messages: [userMsg("go")] }); + const handle = await loop.createTurn({ messages: [userMsg("go")] }); const types: string[] = []; for await (const event of handle.events) types.push(event.type); @@ -683,7 +742,7 @@ describe("AgentLoopImpl", () => { it("getTurn returns the persisted turn; unknown ids reject", async () => { const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi") }] }); - const created = await loop.createTurn({ messages: [userMsg("hello")] }).result; + const created = await (await loop.createTurn({ messages: [userMsg("hello")] })).result; const fetched = await loop.getTurn(created.id); expect(fetched).toEqual(created); diff --git a/apps/x/packages/core/src/agent-loop/agent-loop.ts b/apps/x/packages/core/src/agent-loop/agent-loop.ts index 5eb05b7c..68579efc 100644 --- a/apps/x/packages/core/src/agent-loop/agent-loop.ts +++ b/apps/x/packages/core/src/agent-loop/agent-loop.ts @@ -3,6 +3,7 @@ import { z } from "zod"; import { ToolCallPart, ToolMessage } from "@x/shared/dist/message.js"; import { EventStream } from "./event-stream.js"; import type { ModelAdapter } from "./model-adapter.js"; +import { KeyedMutex } from "./mutex.js"; import type { PermissionGate } from "./permission-gate.js"; import type { ToolRunner, ToolRunResult } from "./tool-runner.js"; import type { TurnStore } from "./turn-store.js"; @@ -23,7 +24,10 @@ export type TurnHandle = { }; export interface AgentLoop { - createTurn(input: z.infer): TurnHandle; + // Async: the turn row is persisted before the handle is returned, so the + // turn (and its session seq, if any) is visible to other readers the + // moment the caller has the handle. The advance itself runs in background. + createTurn(input: z.infer): Promise; respondToPermission( turnId: string, toolCallId: string, @@ -31,30 +35,16 @@ export interface AgentLoop { reason?: string, ): TurnHandle; setToolResult(turnId: string, r: { toolCallId: string; result: unknown }): TurnHandle; + // Re-enters the reducer on an idle (crashed) or waiting turn. Rejects for + // terminal turns (completed, errored, or stopped) — like all mutations. resumeTurn(turnId: string): TurnHandle; getTurn(turnId: string): Promise>; + // Aborts in-flight work and records the stop as a terminal turn error + // (code "stopped"). A stopped turn is immutable: it cannot be resumed, + // and pending permissions / dispatched results are abandoned. stopTurn(turnId: string): Promise>; } -// Serializes async work per key. Unlike the try-lock in runs/lock.ts, callers -// queue instead of failing — public mutations on the same turn run in order. -class TurnMutex { - private chains = new Map>(); - - run(key: string, fn: () => Promise): Promise { - const prev = this.chains.get(key) ?? Promise.resolve(); - const next = prev.then(fn, fn); - const tail: Promise = next - .catch(() => undefined) - .then(() => { - // Drop the entry once the chain is fully drained. - if (this.chains.get(key) === tail) this.chains.delete(key); - }); - this.chains.set(key, tail); - return next; - } -} - function nowIso(): string { return new Date().toISOString(); } @@ -83,7 +73,7 @@ export class AgentLoopImpl implements AgentLoop { private toolRunner: ToolRunner; private permissionGate: PermissionGate; private maxIterations: number; - private mutex = new TurnMutex(); + private mutex = new KeyedMutex(); // All not-yet-finished entries per turn (running AND queued behind the // mutex) — registered synchronously so stopTurn can never race past one. private active = new Map>(); @@ -102,28 +92,35 @@ export class AgentLoopImpl implements AgentLoop { this.maxIterations = deps.maxIterations ?? DEFAULT_MAX_ITERATIONS; } - createTurn(input: z.infer): TurnHandle { + async createTurn(input: z.infer): Promise { + const parsed = AgentLoopInput.parse(input); const turnId = crypto.randomUUID(); - return this.enter(turnId, async () => { - const parsed = AgentLoopInput.parse(input); - const now = nowIso(); - await this.store.create({ - id: turnId, - agentId: parsed.agentId ?? null, - provider: parsed.provider ?? null, - model: parsed.model ?? null, - permissionMode: parsed.permissionMode ?? "manual", - messages: parsed.messages, - permissionRequests: [], - permissionDecisions: [], - startedTools: [], - dispatchedTools: [], - error: null, - completedAt: null, - createdAt: now, - updatedAt: now, - }); + const now = nowIso(); + // Persist before returning: the id is a fresh UUID so there is no + // contention, and callers (the sessions layer) rely on the row — and + // its claimed session seq — being visible once they hold the handle. + // Between this write and enter() below the turn is store-visible but + // not yet stoppable; acceptable while turn ids only reach callers via + // the returned handle, not via store polling. + await this.store.create({ + id: turnId, + agentId: parsed.agentId ?? null, + provider: parsed.provider ?? null, + model: parsed.model ?? null, + permissionMode: parsed.permissionMode ?? "manual", + sessionId: parsed.sessionId ?? null, + sessionSeq: parsed.sessionSeq ?? null, + messages: parsed.messages, + permissionRequests: [], + permissionDecisions: [], + startedTools: [], + dispatchedTools: [], + error: null, + completedAt: null, + createdAt: now, + updatedAt: now, }); + return this.enter(turnId, async () => {}); } respondToPermission( @@ -174,7 +171,9 @@ export class AgentLoopImpl implements AgentLoop { resumeTurn(turnId: string): TurnHandle { return this.enter(turnId, async () => { - await this.mustGet(turnId); + // Resuming a terminal turn is a caller error — reject loudly + // instead of quietly resolving, like every other mutation. + this.assertMutable(await this.mustGet(turnId)); }); } @@ -186,8 +185,21 @@ export class AgentLoopImpl implements AgentLoop { for (const controller of this.active.get(turnId) ?? []) { controller.abort(); } - // Queue behind the in-flight advance so it has fully wound down. - return this.mutex.run(turnId, () => this.mustGet(turnId)); + // Queue behind the in-flight advance so it has fully wound down, then + // make the stop itself a persisted fact: the turn lands in a terminal + // error and can never be resumed or mutated. A turn that already + // finished keeps its outcome — stop never overwrites it. + return this.mutex.run(turnId, async () => { + const turn = await this.mustGet(turnId); + if (turn.error !== null || turn.completedAt !== null) return turn; + turn.error = { + message: "Stopped by the user", + code: "stopped", + at: nowIso(), + }; + await this.persist(turn); + return turn; + }); } // ── internals ─────────────────────────────────────────────────────────── @@ -390,7 +402,9 @@ export class AgentLoopImpl implements AgentLoop { turn.messages.push(assistantMessage); await this.persist(turn); } catch (error) { - if (signal.aborted) return; // stopped: turn stays as persisted (idle) + // stopped: facts stay as persisted; stopTurn's queued job + // records the terminal "stopped" error after this winds down. + if (signal.aborted) return; await this.setTurnError(turnId, error); return; } diff --git a/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts b/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts index 61e0802e..a6898259 100644 --- a/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts +++ b/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts @@ -9,6 +9,17 @@ export class InMemoryTurnStore implements TurnStore { if (this.turns.has(turn.id)) { throw new Error(`Turn already exists: ${turn.id}`); } + // Mirror the SQLite UNIQUE(session_id, session_seq) tripwire — NULL + // seqs never conflict, matching SQLite's distinct-NULLs semantics. + if (turn.sessionId !== null && turn.sessionSeq !== null) { + for (const existing of this.turns.values()) { + if (existing.sessionId === turn.sessionId && existing.sessionSeq === turn.sessionSeq) { + throw new Error( + `Turn with session seq already exists: ${turn.sessionId}#${turn.sessionSeq}`, + ); + } + } + } this.turns.set(turn.id, structuredClone(turn)); } @@ -23,4 +34,16 @@ export class InMemoryTurnStore implements TurnStore { } this.turns.set(turn.id, structuredClone(turn)); } + + async latestForSession(sessionId: string): Promise | null> { + const turns = await this.listBySession(sessionId); + return turns.length > 0 ? turns[turns.length - 1] : null; + } + + async listBySession(sessionId: string): Promise[]> { + return [...this.turns.values()] + .filter((turn) => turn.sessionId === sessionId) + .sort((a, b) => (a.sessionSeq ?? 0) - (b.sessionSeq ?? 0)) + .map((turn) => structuredClone(turn)); + } } diff --git a/apps/x/packages/core/src/agent-loop/mutex.ts b/apps/x/packages/core/src/agent-loop/mutex.ts new file mode 100644 index 00000000..31256681 --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/mutex.ts @@ -0,0 +1,19 @@ +// Serializes async work per key. Unlike the try-lock in runs/lock.ts, callers +// queue instead of failing — operations on the same key run in order. Used +// per-turn by the agent loop and per-session by the sessions layer. +export class KeyedMutex { + private chains = new Map>(); + + run(key: string, fn: () => Promise): Promise { + const prev = this.chains.get(key) ?? Promise.resolve(); + const next = prev.then(fn, fn); + const tail: Promise = next + .catch(() => undefined) + .then(() => { + // Drop the entry once the chain is fully drained. + if (this.chains.get(key) === tail) this.chains.delete(key); + }); + this.chains.set(key, tail); + return next; + } +} diff --git a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts index 455b2e54..b56276d6 100644 --- a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts +++ b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts @@ -44,13 +44,18 @@ async function loadStore() { return { store: new SqliteTurnStore(storageModule.getDb()), db: storageModule.getDb() }; } -function sampleTurn(id: string): z.infer { +function sampleTurn( + id: string, + overrides: Partial> = {}, +): z.infer { return { id, agentId: "agent-1", provider: "openai", model: "gpt-x", permissionMode: "auto", + sessionId: null, + sessionSeq: null, messages: [ { role: "user", content: "hello" }, { @@ -80,6 +85,7 @@ function sampleTurn(id: string): z.infer { completedAt: null, createdAt: "2026-06-12T00:00:00Z", updatedAt: "2026-06-12T00:00:03Z", + ...overrides, }; } @@ -124,6 +130,35 @@ describe("SqliteTurnStore", () => { await expect(store.update(sampleTurn("missing"))).rejects.toThrow("Turn not found"); }); + it("round-trips session linkage and queries turns by session in seq order", async () => { + const { store } = await loadStore(); + const t1 = sampleTurn("t1", { sessionId: "s1", sessionSeq: 1 }); + const t2 = sampleTurn("t2", { sessionId: "s1", sessionSeq: 2 }); + const other = sampleTurn("t3", { sessionId: "s2", sessionSeq: 1 }); + // insert out of order to prove ordering comes from seq + await store.create(t2); + await store.create(t1); + await store.create(other); + await store.create(sampleTurn("standalone")); + + expect(await store.get("t1")).toEqual(t1); + expect(await store.listBySession("s1")).toEqual([t1, t2]); + expect(await store.latestForSession("s1")).toEqual(t2); + expect(await store.latestForSession("missing")).toBeNull(); + expect(await store.listBySession("missing")).toEqual([]); + }); + + it("rejects a duplicate session seq via the unique index", async () => { + const { store } = await loadStore(); + await store.create(sampleTurn("t1", { sessionId: "s1", sessionSeq: 1 })); + await expect( + store.create(sampleTurn("t2", { sessionId: "s1", sessionSeq: 1 })), + ).rejects.toThrow(); + // standalone turns never conflict (NULL session_id) + await store.create(sampleTurn("t3")); + await store.create(sampleTurn("t4")); + }); + it("fails loudly on a corrupted JSON column", async () => { const { store, db } = await loadStore(); await store.create(sampleTurn("t1")); diff --git a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts index 9ef201bd..2ff90bb1 100644 --- a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts +++ b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts @@ -46,6 +46,27 @@ export class SqliteTurnStore implements TurnStore { throw new Error(`Turn not found: ${id}`); } } + + async latestForSession(sessionId: string): Promise | null> { + const row = await this.db + .selectFrom("agent_loop_turns") + .selectAll() + .where("session_id", "=", sessionId) + .orderBy("session_seq", "desc") + .limit(1) + .executeTakeFirst(); + return row ? fromRow(row) : null; + } + + async listBySession(sessionId: string): Promise[]> { + const rows = await this.db + .selectFrom("agent_loop_turns") + .selectAll() + .where("session_id", "=", sessionId) + .orderBy("session_seq", "asc") + .execute(); + return rows.map(fromRow); + } } function toRow(turn: z.infer): Insertable { @@ -55,6 +76,8 @@ function toRow(turn: z.infer): Insertable): z.infer): Promise; get(id: string): Promise | null>; update(turn: z.infer): Promise; + // Session linkage queries (used by the sessions layer); ordered by sessionSeq. + latestForSession(sessionId: string): Promise | null>; + listBySession(sessionId: string): Promise[]>; } diff --git a/apps/x/packages/core/src/agent-loop/types.ts b/apps/x/packages/core/src/agent-loop/types.ts index 73634790..5fdaeed1 100644 --- a/apps/x/packages/core/src/agent-loop/types.ts +++ b/apps/x/packages/core/src/agent-loop/types.ts @@ -63,6 +63,11 @@ export const AgentLoopTurn = z.object({ model: z.string().nullable(), permissionMode: PermissionMode, + // Session linkage — opaque to the loop (the sessions layer owns the + // meaning). seq is the turn's 1-based position within its session. + sessionId: z.string().nullable(), + sessionSeq: z.number().int().positive().nullable(), + // append-only fact logs messages: MessageList, permissionRequests: z.array(PermissionRequest), @@ -83,9 +88,14 @@ export const AgentLoopInput = z.object({ provider: z.string().nullable().optional(), model: z.string().nullable().optional(), permissionMode: PermissionMode.optional(), + sessionId: z.string().nullable().optional(), + sessionSeq: z.number().int().positive().nullable().optional(), // May include prior-conversation history; turns are self-contained by design. messages: MessageList.min(1), -}); +}).refine( + (input) => (input.sessionId == null) === (input.sessionSeq == null), + { message: "sessionId and sessionSeq must be set together" }, +); // ─── Tool definitions (environment, not turn state) ──────────────────────── diff --git a/apps/x/packages/core/src/sessions/in-memory-session-store.ts b/apps/x/packages/core/src/sessions/in-memory-session-store.ts new file mode 100644 index 00000000..bfc3df55 --- /dev/null +++ b/apps/x/packages/core/src/sessions/in-memory-session-store.ts @@ -0,0 +1,33 @@ +import { z } from "zod"; +import type { SessionStore } from "./session-store.js"; +import { Session } from "./types.js"; + +export class InMemorySessionStore implements SessionStore { + private sessions = new Map>(); + + async create(session: z.infer): Promise { + if (this.sessions.has(session.id)) { + throw new Error(`Session already exists: ${session.id}`); + } + this.sessions.set(session.id, structuredClone(session)); + } + + async get(id: string): Promise | null> { + const session = this.sessions.get(id); + return session ? structuredClone(session) : null; + } + + async list(filter?: { agentId?: string }): Promise[]> { + return [...this.sessions.values()] + .filter((s) => filter?.agentId === undefined || s.agentId === filter.agentId) + .sort((a, b) => b.updatedAt.localeCompare(a.updatedAt)) + .map((s) => structuredClone(s)); + } + + async update(session: z.infer): Promise { + if (!this.sessions.has(session.id)) { + throw new Error(`Session not found: ${session.id}`); + } + this.sessions.set(session.id, structuredClone(session)); + } +} diff --git a/apps/x/packages/core/src/sessions/index.ts b/apps/x/packages/core/src/sessions/index.ts new file mode 100644 index 00000000..60d30aa1 --- /dev/null +++ b/apps/x/packages/core/src/sessions/index.ts @@ -0,0 +1,5 @@ +export * from "./types.js"; +export * from "./session-store.js"; +export * from "./in-memory-session-store.js"; +export * from "./sqlite-session-store.js"; +export * from "./sessions.js"; diff --git a/apps/x/packages/core/src/sessions/session-store.ts b/apps/x/packages/core/src/sessions/session-store.ts new file mode 100644 index 00000000..cf127e8f --- /dev/null +++ b/apps/x/packages/core/src/sessions/session-store.ts @@ -0,0 +1,12 @@ +import { z } from "zod"; +import { Session } from "./types.js"; + +// Durable storage for session rows. The per-session mutex lives ABOVE the +// store (in SessionsImpl), not in it. +export interface SessionStore { + create(session: z.infer): Promise; + get(id: string): Promise | null>; + // Most recently active first (updatedAt descending). + list(filter?: { agentId?: string }): Promise[]>; + update(session: z.infer): Promise; +} diff --git a/apps/x/packages/core/src/sessions/sessions.test.ts b/apps/x/packages/core/src/sessions/sessions.test.ts new file mode 100644 index 00000000..8d3cc85c --- /dev/null +++ b/apps/x/packages/core/src/sessions/sessions.test.ts @@ -0,0 +1,399 @@ +import { describe, expect, it } from "vitest"; +import { z } from "zod"; +import { + AssistantMessage, + Message, + ToolCallPart, +} from "@x/shared/dist/message.js"; +import { AgentLoopImpl } from "../agent-loop/agent-loop.js"; +import { EventStream } from "../agent-loop/event-stream.js"; +import { InMemoryTurnStore } from "../agent-loop/in-memory-turn-store.js"; +import type { ModelAdapter, ModelStreamRequest } from "../agent-loop/model-adapter.js"; +import type { PermissionGate } from "../agent-loop/permission-gate.js"; +import type { ToolRunner, ToolRunResult } from "../agent-loop/tool-runner.js"; +import { + AgentLoopTurn, + deriveTurnStatus, + type ModelStreamEvent, +} from "../agent-loop/types.js"; +import { InMemorySessionStore } from "./in-memory-session-store.js"; +import { SessionsImpl } from "./sessions.js"; + +// ─── helpers ──────────────────────────────────────────────────────────────── + +function userMsg(text: string): z.infer { + return { role: "user", content: text }; +} + +function assistantText(text: string): z.infer { + return { role: "assistant", content: text }; +} + +function toolCall(toolCallId: string, toolName: string): z.infer { + return { type: "tool-call", toolCallId, toolName, arguments: {} }; +} + +function assistantToolCalls( + ...calls: z.infer[] +): z.infer { + return { role: "assistant", content: calls }; +} + +type ModelStep = + | { kind: "message"; message: z.infer } + | { kind: "hang" }; + +class FakeModelAdapter implements ModelAdapter { + calls = 0; + + constructor(private steps: ModelStep[]) {} + + stream(req: ModelStreamRequest): EventStream> { + this.calls++; + const out = new EventStream>(); + const step = this.steps.shift(); + void (async () => { + await Promise.resolve(); + if (!step) { + const error = new Error("FakeModelAdapter: no scripted step left"); + out.fail(error); + return; + } + if (step.kind === "hang") { + req.signal.addEventListener("abort", () => { + out.fail(new Error("aborted")); + }, { once: true }); + return; + } + out.push({ type: "finish", message: step.message }); + out.end(step.message); + })(); + return out; + } +} + +class FakeToolRunner implements ToolRunner { + ran: string[] = []; + + definitions() { + return []; + } + + async run(call: z.infer): Promise { + this.ran.push(call.toolCallId); + return { type: "result", value: `ok:${call.toolName}` }; + } +} + +class FakePermissionGate implements PermissionGate { + constructor(private required: (toolName: string) => boolean = () => false) {} + + async check(call: z.infer) { + return this.required(call.toolName) + ? { required: true as const, request: { tool: call.toolName } } + : { required: false as const }; + } + + async classify(): Promise<{ decision: "abstained"; reason: string }> { + return { decision: "abstained", reason: "unsure" }; + } +} + +function makeSessions(opts: { steps?: ModelStep[]; gate?: FakePermissionGate } = {}) { + const turnStore = new InMemoryTurnStore(); + const sessionStore = new InMemorySessionStore(); + const adapter = new FakeModelAdapter(opts.steps ?? []); + const runner = new FakeToolRunner(); + const loop = new AgentLoopImpl({ + store: turnStore, + modelAdapter: adapter, + toolRunner: runner, + permissionGate: opts.gate ?? new FakePermissionGate(), + }); + const sessions = new SessionsImpl({ sessionStore, turnStore, agentLoop: loop }); + return { sessions, loop, turnStore, sessionStore, adapter, runner }; +} + +function turnFixture( + id: string, + overrides: Partial> = {}, +): z.infer { + const now = new Date().toISOString(); + return { + id, + agentId: null, + provider: null, + model: null, + permissionMode: "manual", + sessionId: null, + sessionSeq: null, + messages: [], + permissionRequests: [], + permissionDecisions: [], + startedTools: [], + dispatchedTools: [], + error: null, + completedAt: null, + createdAt: now, + updatedAt: now, + ...overrides, + }; +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +// ─── tests ────────────────────────────────────────────────────────────────── + +describe("SessionsImpl", () => { + it("creates, fetches, and lists sessions with an agent filter", async () => { + const { sessions } = makeSessions(); + + const s1 = await sessions.createSession({ agentId: "a1", title: "first" }); + const s2 = await sessions.createSession({ agentId: "a2" }); + + expect(await sessions.getSession(s1.id)).toEqual(s1); + expect(s2.title).toBeNull(); + expect((await sessions.listSessions()).map((s) => s.id).sort()) + .toEqual([s1.id, s2.id].sort()); + expect(await sessions.listSessions({ agentId: "a1" })).toEqual([s1]); + await expect(sessions.getSession("missing")).rejects.toThrow("Session not found"); + }); + + it("first sendMessage creates turn 1 with session linkage and per-call config", async () => { + const { sessions } = makeSessions({ + steps: [{ kind: "message", message: assistantText("hi!") }], + }); + const session = await sessions.createSession({ agentId: "a1" }); + + const handle = await sessions.sendMessage( + session.id, + [userMsg("hello")], + { provider: "openai", model: "gpt-x" }, + ); + const turn = await handle.result; + + expect(turn.sessionId).toBe(session.id); + expect(turn.sessionSeq).toBe(1); + expect(turn.agentId).toBe("a1"); // stamped from the session + expect(turn.provider).toBe("openai"); + expect(turn.model).toBe("gpt-x"); + expect(turn.messages).toEqual([userMsg("hello"), assistantText("hi!")]); + expect(deriveTurnStatus(turn)).toBe("completed"); + }); + + it("copies the previous transcript forward into the next turn", async () => { + const { sessions } = makeSessions({ + steps: [ + { kind: "message", message: assistantText("answer one") }, + { kind: "message", message: assistantText("answer two") }, + ], + }); + const session = await sessions.createSession(); + + await (await sessions.sendMessage(session.id, [userMsg("one")])).result; + const turn2 = await (await sessions.sendMessage(session.id, [userMsg("two")])).result; + + expect(turn2.sessionSeq).toBe(2); + expect(turn2.messages).toEqual([ + userMsg("one"), + assistantText("answer one"), + userMsg("two"), + assistantText("answer two"), + ]); + expect(await sessions.getHistory(session.id)).toEqual(turn2.messages); + expect((await sessions.listTurns(session.id)).map((t) => t.sessionSeq)).toEqual([1, 2]); + }); + + it("rejects empty messages and unknown sessions", async () => { + const { sessions } = makeSessions(); + const session = await sessions.createSession(); + await expect(sessions.sendMessage(session.id, [])).rejects.toThrow(); + await expect(sessions.sendMessage("missing", [userMsg("hi")])) + .rejects.toThrow("Session not found"); + }); + + it("rejects while a turn is in flight; a stopped turn can be superseded", async () => { + const { sessions, loop } = makeSessions({ + steps: [ + { kind: "hang" }, + { kind: "message", message: assistantText("fresh start") }, + ], + }); + const session = await sessions.createSession(); + + const first = await sessions.sendMessage(session.id, [userMsg("one")]); + await expect(sessions.sendMessage(session.id, [userMsg("two")])) + .rejects.toThrow("not finished"); + + await loop.stopTurn(first.id); // terminal "stopped" error, nothing partial persisted + const turn2 = await (await sessions.sendMessage(session.id, [userMsg("two")])).result; + + expect(turn2.sessionSeq).toBe(2); + expect(turn2.messages).toEqual([ + userMsg("one"), // stopped turn's input is still part of the transcript + userMsg("two"), + assistantText("fresh start"), + ]); + }); + + it("rejects while the latest turn waits on a permission", async () => { + const { sessions, loop } = makeSessions({ + steps: [ + { kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }, + { kind: "message", message: assistantText("done") }, + { kind: "message", message: assistantText("next answer") }, + ], + gate: new FakePermissionGate(() => true), + }); + const session = await sessions.createSession(); + + const first = await sessions.sendMessage(session.id, [userMsg("one")]); + const waiting = await first.result; + expect(deriveTurnStatus(waiting)).toBe("waiting"); + + await expect(sessions.sendMessage(session.id, [userMsg("two")])) + .rejects.toThrow("not finished"); + + const done = await loop.respondToPermission(waiting.id, "tc1", "granted").result; + expect(deriveTurnStatus(done)).toBe("completed"); + + const turn2 = await (await sessions.sendMessage(session.id, [userMsg("two")])).result; + expect(turn2.sessionSeq).toBe(2); + }); + + it("a crashed turn blocks the session until stopped; its calls are closed out", async () => { + const { sessions, loop, turnStore, runner } = makeSessions({ + steps: [{ kind: "message", message: assistantText("moving on") }], + }); + const session = await sessions.createSession(); + + // crafted idle turn (crash): tc1 never evaluated, tc2 started then crashed + await turnStore.create(turnFixture("t1", { + sessionId: session.id, + sessionSeq: 1, + messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "calc"), toolCall("tc2", "send-email"))], + startedTools: [{ toolCallId: "tc2", startedAt: "2026-06-12T00:00:00Z" }], + })); + + // an idle (crashed) turn must be explicitly resumed or stopped first + await expect(sessions.sendMessage(session.id, [userMsg("never mind")])) + .rejects.toThrow("not finished"); + await loop.stopTurn("t1"); + + const closures = [ + { + role: "tool" as const, + content: "Tool was not executed: the turn was stopped before this call ran.", + toolCallId: "tc1", + toolName: "calc", + }, + { + role: "tool" as const, + content: "Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran.", + toolCallId: "tc2", + toolName: "send-email", + }, + ]; + // getHistory shows the same closed-out transcript the next turn will use + expect(await sessions.getHistory(session.id)).toEqual([ + userMsg("go"), + assistantToolCalls(toolCall("tc1", "calc"), toolCall("tc2", "send-email")), + ...closures, + ]); + + const turn2 = await (await sessions.sendMessage(session.id, [userMsg("never mind")])).result; + + expect(runner.ran).toEqual([]); // stale calls are closed out, never executed + expect(turn2.messages).toEqual([ + userMsg("go"), + assistantToolCalls(toolCall("tc1", "calc"), toolCall("tc2", "send-email")), + ...closures, + userMsg("never mind"), + assistantText("moving on"), + ]); + expect(deriveTurnStatus(turn2)).toBe("completed"); + }); + + it("a stopped waiting turn can be superseded; the abandoned call never runs", async () => { + const { sessions, loop, runner } = makeSessions({ + steps: [ + { kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }, + { kind: "message", message: assistantText("skipped it") }, + ], + gate: new FakePermissionGate(() => true), + }); + const session = await sessions.createSession(); + + const waiting = await (await sessions.sendMessage(session.id, [userMsg("one")])).result; + expect(deriveTurnStatus(waiting)).toBe("waiting"); + + await loop.stopTurn(waiting.id); // abandon instead of answering the permission + const turn2 = await (await sessions.sendMessage(session.id, [userMsg("skip it")])).result; + + expect(runner.ran).toEqual([]); + expect(turn2.sessionSeq).toBe(2); + expect(turn2.messages).toEqual([ + userMsg("one"), + assistantToolCalls(toolCall("tc1", "write")), + { + role: "tool", + content: "Tool was not executed: the turn was stopped before this call ran.", + toolCallId: "tc1", + toolName: "write", + }, + userMsg("skip it"), + assistantText("skipped it"), + ]); + }); + + it("builds on an errored turn's persisted transcript", async () => { + const { sessions, turnStore } = makeSessions({ + steps: [{ kind: "message", message: assistantText("recovered") }], + }); + const session = await sessions.createSession(); + + await turnStore.create(turnFixture("t1", { + sessionId: session.id, + sessionSeq: 1, + messages: [userMsg("go")], + error: { message: "rate limited", at: "2026-06-12T00:00:00Z" }, + })); + + const turn2 = await (await sessions.sendMessage(session.id, [userMsg("retry")])).result; + expect(turn2.sessionSeq).toBe(2); + expect(turn2.messages).toEqual([userMsg("go"), userMsg("retry"), assistantText("recovered")]); + // the errored turn is untouched — error is terminal per turn, not per session + expect((await turnStore.get("t1"))?.error?.message).toBe("rate limited"); + }); + + it("serializes concurrent sends: exactly one wins while a turn is in flight", async () => { + const { sessions } = makeSessions({ steps: [{ kind: "hang" }] }); + const session = await sessions.createSession(); + + const [a, b] = await Promise.allSettled([ + sessions.sendMessage(session.id, [userMsg("one")]), + sessions.sendMessage(session.id, [userMsg("two")]), + ]); + + expect(a.status).toBe("fulfilled"); + expect(b.status).toBe("rejected"); + const latest = await sessions.getHistory(session.id); + expect(latest).toEqual([userMsg("one")]); + }); + + it("sendMessage bumps the session's recency for listSessions ordering", async () => { + const { sessions } = makeSessions({ + steps: [{ kind: "message", message: assistantText("hi") }], + }); + const s1 = await sessions.createSession(); + await sleep(10); + const s2 = await sessions.createSession(); + expect((await sessions.listSessions()).map((s) => s.id)).toEqual([s2.id, s1.id]); + + await sleep(10); + await (await sessions.sendMessage(s1.id, [userMsg("hello")])).result; + expect((await sessions.listSessions()).map((s) => s.id)).toEqual([s1.id, s2.id]); + }); +}); diff --git a/apps/x/packages/core/src/sessions/sessions.ts b/apps/x/packages/core/src/sessions/sessions.ts new file mode 100644 index 00000000..c5eae7f2 --- /dev/null +++ b/apps/x/packages/core/src/sessions/sessions.ts @@ -0,0 +1,162 @@ +import crypto from "node:crypto"; +import { z } from "zod"; +import { Message, MessageList } from "@x/shared/dist/message.js"; +import type { AgentLoop, TurnHandle } from "../agent-loop/agent-loop.js"; +import { KeyedMutex } from "../agent-loop/mutex.js"; +import type { TurnStore } from "../agent-loop/turn-store.js"; +import { + AgentLoopTurn, + deriveToolCallState, + deriveTurnStatus, + unresolvedToolCalls, +} from "../agent-loop/types.js"; +import type { SessionStore } from "./session-store.js"; +import { CreateSessionInput, SendMessageOptions, Session } from "./types.js"; + +// A thin layer above the agent loop: a session is an ordered chain of +// self-contained turns. sendMessage builds the next turn's input from the +// previous turn's full transcript (copy-forward history) — the loop itself +// never learns that sessions exist. +export interface Sessions { + createSession(input?: z.infer): Promise>; + getSession(sessionId: string): Promise>; + listSessions(filter?: { agentId?: string }): Promise[]>; + sendMessage( + sessionId: string, + messages: z.infer, + options?: z.infer, + ): Promise; + getHistory(sessionId: string): Promise>; + listTurns(sessionId: string): Promise[]>; +} + +function nowIso(): string { + return new Date().toISOString(); +} + +export class SessionsImpl implements Sessions { + private sessionStore: SessionStore; + private turnStore: TurnStore; + private agentLoop: AgentLoop; + private mutex = new KeyedMutex(); + + constructor(deps: { + sessionStore: SessionStore; + turnStore: TurnStore; + agentLoop: AgentLoop; + }) { + this.sessionStore = deps.sessionStore; + this.turnStore = deps.turnStore; + this.agentLoop = deps.agentLoop; + } + + async createSession( + input: z.infer = {}, + ): Promise> { + const parsed = CreateSessionInput.parse(input); + const now = nowIso(); + const session: z.infer = { + id: crypto.randomUUID(), + agentId: parsed.agentId ?? null, + title: parsed.title ?? null, + createdAt: now, + updatedAt: now, + }; + await this.sessionStore.create(session); + return session; + } + + async getSession(sessionId: string): Promise> { + return this.mustGetSession(sessionId); + } + + async listSessions(filter?: { agentId?: string }): Promise[]> { + return this.sessionStore.list(filter); + } + + async sendMessage( + sessionId: string, + messages: z.infer, + options: z.infer = {}, + ): Promise { + // Validate the NEW messages alone — combined with history they would + // pass the loop's min(1) even when empty. + const newMessages = MessageList.min(1).parse(messages); + const parsedOptions = SendMessageOptions.parse(options); + return this.mutex.run(sessionId, async () => { + const session = await this.mustGetSession(sessionId); + const latest = await this.turnStore.latestForSession(sessionId); + if (latest) { + // A session only ever chains on TERMINAL turns. Anything else + // (running, waiting on a permission/tool result, or idle after + // a crash) must be resolved first: wait for it, respond to + // it, resume it, or stopTurn it — a stop is itself terminal. + // Terminal turns are immutable, so the snapshot we copy + // forward below can never go stale or be re-activated. + const status = deriveTurnStatus(latest); + if (status !== "completed" && status !== "error") { + throw new Error( + `Session's latest turn is not finished: ${latest.id} (status: ${status})`, + ); + } + } + // Bump recency BEFORE creating the turn: if this write fails, no + // orphan turn is left running with its handle lost to the caller. + session.updatedAt = nowIso(); + await this.sessionStore.update(session); + return this.agentLoop.createTurn({ + agentId: session.agentId, + provider: parsedOptions.provider ?? null, + model: parsedOptions.model ?? null, + ...(parsedOptions.permissionMode !== undefined + ? { permissionMode: parsedOptions.permissionMode } + : {}), + sessionId, + sessionSeq: (latest?.sessionSeq ?? 0) + 1, + messages: [...(latest ? historyFrom(latest) : []), ...newMessages], + }); + }); + } + + // The transcript as the next turn will see it: a stopped turn's dangling + // tool calls appear closed out, matching what sendMessage actually sends. + async getHistory(sessionId: string): Promise> { + await this.mustGetSession(sessionId); + const latest = await this.turnStore.latestForSession(sessionId); + return latest ? historyFrom(latest) : []; + } + + async listTurns(sessionId: string): Promise[]> { + await this.mustGetSession(sessionId); + return this.turnStore.listBySession(sessionId); + } + + private async mustGetSession(sessionId: string): Promise> { + const session = await this.sessionStore.get(sessionId); + if (!session) throw new Error(`Session not found: ${sessionId}`); + return session; + } +} + +// Copy-forward history: the next turn's input is the previous turn's full +// transcript. A stopped turn can carry unresolved tool calls; they are closed +// out with synthetic ToolMessages so the new turn never re-executes — or +// hangs on — stale calls. This is the sessions-layer analogue of the +// reducer's interrupted-call handling. +function historyFrom( + turn: z.infer, +): z.infer[] { + const messages = [...turn.messages]; + for (const call of unresolvedToolCalls(turn)) { + const interrupted = deriveToolCallState(turn, call.toolCallId) === "interrupted"; + messages.push({ + role: "tool", + content: interrupted + ? "Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran." + : "Tool was not executed: the turn was stopped before this call ran.", + toolCallId: call.toolCallId, + toolName: call.toolName, + }); + } + return messages; +} diff --git a/apps/x/packages/core/src/sessions/sqlite-session-store.test.ts b/apps/x/packages/core/src/sessions/sqlite-session-store.test.ts new file mode 100644 index 00000000..4efd57a5 --- /dev/null +++ b/apps/x/packages/core/src/sessions/sqlite-session-store.test.ts @@ -0,0 +1,115 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { sql } from "kysely"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { z } from "zod"; +import type { Session } from "./types.js"; + +let tmpDir: string; +let workspaceDir: string; +let storageModule: typeof import("../storage/index.js") | null = null; + +beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "rowboat-sessions-test-")); + workspaceDir = path.join(tmpDir, "workspace"); + process.env.ROWBOAT_WORKDIR = workspaceDir; + vi.resetModules(); + // config.ts kicks off knowledge-repo init as an import side effect; mock it + // out so tests don't touch git (same pattern as storage/storage.test.ts). + vi.doMock("../knowledge/version_history.js", () => ({ + initRepo: vi.fn(async () => undefined), + })); + vi.doMock("../knowledge/deprecate_today_note.js", () => ({ + deprecateTodayNote: vi.fn(async () => undefined), + })); +}); + +afterEach(async () => { + if (storageModule) { + await storageModule.shutdownStorage().catch(() => undefined); + storageModule = null; + } + delete process.env.ROWBOAT_WORKDIR; + vi.doUnmock("../knowledge/version_history.js"); + vi.doUnmock("../knowledge/deprecate_today_note.js"); + vi.resetModules(); + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +async function loadStore() { + storageModule = await import("../storage/index.js"); + await storageModule.initStorage(); + const { SqliteSessionStore } = await import("./sqlite-session-store.js"); + return { store: new SqliteSessionStore(storageModule.getDb()), db: storageModule.getDb() }; +} + +function sampleSession( + id: string, + overrides: Partial> = {}, +): z.infer { + return { + id, + agentId: "agent-1", + title: "a chat", + createdAt: "2026-06-12T00:00:00Z", + updatedAt: "2026-06-12T00:00:00Z", + ...overrides, + }; +} + +describe("SqliteSessionStore", () => { + it("migration creates the sessions table and index", async () => { + const { db } = await loadStore(); + + const tables = await sql<{ name: string }>` + select name from sqlite_master + where type = 'table' and name = 'sessions' + `.execute(db); + expect(tables.rows).toHaveLength(1); + + const indexes = await sql<{ name: string }>` + select name from sqlite_master + where type = 'index' and name = 'sessions_updated_at_idx' + `.execute(db); + expect(indexes.rows).toHaveLength(1); + }); + + it("round-trips every column", async () => { + const { store } = await loadStore(); + const session = sampleSession("s1"); + + await store.create(session); + expect(await store.get("s1")).toEqual(session); + + const updated = { ...session, title: "renamed", updatedAt: "2026-06-12T01:00:00Z" }; + await store.update(updated); + expect(await store.get("s1")).toEqual(updated); + + const nulls = sampleSession("s2", { agentId: null, title: null }); + await store.create(nulls); + expect(await store.get("s2")).toEqual(nulls); + }); + + it("lists most recently updated first, with an optional agent filter", async () => { + const { store } = await loadStore(); + const older = sampleSession("s1", { updatedAt: "2026-06-12T00:00:01Z" }); + const newer = sampleSession("s2", { updatedAt: "2026-06-12T00:00:02Z" }); + const otherAgent = sampleSession("s3", { + agentId: "agent-2", + updatedAt: "2026-06-12T00:00:03Z", + }); + await store.create(older); + await store.create(newer); + await store.create(otherAgent); + + expect((await store.list()).map((s) => s.id)).toEqual(["s3", "s2", "s1"]); + expect((await store.list({ agentId: "agent-1" })).map((s) => s.id)).toEqual(["s2", "s1"]); + }); + + it("returns null for unknown ids and rejects updates to missing sessions", async () => { + const { store } = await loadStore(); + expect(await store.get("missing")).toBeNull(); + await expect(store.update(sampleSession("missing"))).rejects.toThrow("Session not found"); + }); +}); diff --git a/apps/x/packages/core/src/sessions/sqlite-session-store.ts b/apps/x/packages/core/src/sessions/sqlite-session-store.ts new file mode 100644 index 00000000..e8857eae --- /dev/null +++ b/apps/x/packages/core/src/sessions/sqlite-session-store.ts @@ -0,0 +1,71 @@ +import type { Insertable, Kysely, Selectable } from "kysely"; +import { z } from "zod"; +import type { Database, SessionsTable } from "../storage/schema.js"; +import type { SessionStore } from "./session-store.js"; +import { Session } from "./types.js"; + +// Accepts a Kysely from the existing getDb(); it does not own the +// storage lifecycle (never calls initStorage()). +export class SqliteSessionStore implements SessionStore { + constructor(private db: Kysely) {} + + async create(session: z.infer): Promise { + await this.db + .insertInto("sessions") + .values(toRow(session)) + .execute(); + } + + async get(id: string): Promise | null> { + const row = await this.db + .selectFrom("sessions") + .selectAll() + .where("id", "=", id) + .executeTakeFirst(); + return row ? fromRow(row) : null; + } + + async list(filter?: { agentId?: string }): Promise[]> { + let query = this.db + .selectFrom("sessions") + .selectAll() + .orderBy("updated_at", "desc"); + if (filter?.agentId !== undefined) { + query = query.where("agent_id", "=", filter.agentId); + } + const rows = await query.execute(); + return rows.map(fromRow); + } + + async update(session: z.infer): Promise { + const { id, ...rest } = toRow(session); + const result = await this.db + .updateTable("sessions") + .set(rest) + .where("id", "=", id) + .executeTakeFirst(); + if (result.numUpdatedRows === 0n) { + throw new Error(`Session not found: ${id}`); + } + } +} + +function toRow(session: z.infer): Insertable { + return { + id: session.id, + agent_id: session.agentId, + title: session.title, + created_at: session.createdAt, + updated_at: session.updatedAt, + }; +} + +function fromRow(row: Selectable): z.infer { + return { + id: row.id, + agentId: row.agent_id, + title: row.title, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} diff --git a/apps/x/packages/core/src/sessions/types.ts b/apps/x/packages/core/src/sessions/types.ts new file mode 100644 index 00000000..96b4c7f9 --- /dev/null +++ b/apps/x/packages/core/src/sessions/types.ts @@ -0,0 +1,27 @@ +import { z } from "zod"; +import { PermissionMode } from "../agent-loop/types.js"; + +// A session is a grouping label plus a title — an ordered chain of turns, +// linked via the turn's sessionId/sessionSeq. All configuration (provider, +// model, permission mode) flows through sendMessage at the moment it is used +// and lands on the turn row as the durable record; the session deliberately +// stores none of it. agentId is the exception: a session is a conversation +// WITH an agent, and "list sessions for agent X" is a session-level query. +export const Session = z.object({ + id: z.string(), + agentId: z.string().nullable(), + title: z.string().nullable(), + createdAt: z.string(), + updatedAt: z.string(), +}); + +export const CreateSessionInput = z.object({ + agentId: z.string().nullable().optional(), + title: z.string().nullable().optional(), +}); + +export const SendMessageOptions = z.object({ + provider: z.string().nullable().optional(), + model: z.string().nullable().optional(), + permissionMode: PermissionMode.optional(), +}); diff --git a/apps/x/packages/core/src/storage/migrations.ts b/apps/x/packages/core/src/storage/migrations.ts index 0953df6b..d298536b 100644 --- a/apps/x/packages/core/src/storage/migrations.ts +++ b/apps/x/packages/core/src/storage/migrations.ts @@ -53,6 +53,53 @@ const migrations: Record = { await db.schema.dropTable("agent_loop_turns").ifExists().execute(); }, }, + "2026-06-12_0003_sessions": { + async up(db: MigrationDb): Promise { + await db.schema + .createTable("sessions") + .ifNotExists() + .addColumn("id", "text", (col) => col.primaryKey()) + .addColumn("agent_id", "text") + .addColumn("title", "text") + .addColumn("created_at", "text", (col) => col.notNull()) + .addColumn("updated_at", "text", (col) => col.notNull()) + .execute(); + + await db.schema + .createIndex("sessions_updated_at_idx") + .ifNotExists() + .on("sessions") + .column("updated_at") + .execute(); + + await db.schema + .alterTable("agent_loop_turns") + .addColumn("session_id", "text") + .execute(); + await db.schema + .alterTable("agent_loop_turns") + .addColumn("session_seq", "integer") + .execute(); + + // Tripwire: a second writer racing past the per-session mutex must + // fail loudly instead of silently forking the turn chain. NULL + // session_ids never conflict (standalone turns). + await db.schema + .createIndex("agent_loop_turns_session_seq_uniq") + .ifNotExists() + .unique() + .on("agent_loop_turns") + .columns(["session_id", "session_seq"]) + .execute(); + }, + async down(db: MigrationDb): Promise { + await db.schema.dropIndex("agent_loop_turns_session_seq_uniq").ifExists().execute(); + await db.schema.alterTable("agent_loop_turns").dropColumn("session_seq").execute(); + await db.schema.alterTable("agent_loop_turns").dropColumn("session_id").execute(); + await db.schema.dropIndex("sessions_updated_at_idx").ifExists().execute(); + await db.schema.dropTable("sessions").ifExists().execute(); + }, + }, }; class InCodeMigrationProvider implements MigrationProvider { diff --git a/apps/x/packages/core/src/storage/schema.ts b/apps/x/packages/core/src/storage/schema.ts index 781e4e14..5fd9ec52 100644 --- a/apps/x/packages/core/src/storage/schema.ts +++ b/apps/x/packages/core/src/storage/schema.ts @@ -14,6 +14,8 @@ export interface AgentLoopTurnsTable { provider: string | null; model: string | null; permission_mode: string; + session_id: string | null; + session_seq: number | null; messages: string; // JSON: MessageList permission_requests: string; // JSON: PermissionRequest[] permission_decisions: string; // JSON: PermissionDecision[] @@ -25,7 +27,16 @@ export interface AgentLoopTurnsTable { completed_at: string | null; } +export interface SessionsTable { + id: string; + agent_id: string | null; + title: string | null; + created_at: TimestampColumn; + updated_at: TimestampColumn; +} + export interface Database { storage_metadata: StorageMetadataTable; agent_loop_turns: AgentLoopTurnsTable; + sessions: SessionsTable; }