Add sessions layer above the agent loop

A session is a grouping label (id, agentId, title) over an ordered chain
of self-contained turns; the loop itself never learns sessions exist.

- sendMessage builds each turn's input by copying the previous turn's
  full transcript forward (copy-forward history) and only ever chains on
  TERMINAL turns: anything running, waiting, or crashed-idle must be
  resolved or stopped first. Terminal turns are immutable, so the copied
  snapshot can never go stale or be re-activated.
- stopTurn now persists the stop as a terminal turn error (code
  "stopped") instead of leaving the turn idle/resumable; resumeTurn,
  setToolResult, and respondToPermission all reject terminal turns.
  Unresolved tool calls on a stopped turn are closed out with synthetic
  ToolMessages when history is copied forward — never re-executed.
- Turn rows gain sessionId/sessionSeq (opaque to the reducer) with a
  UNIQUE(session_id, session_seq) index as a fork tripwire; createTurn
  is now async and persists the row before returning so a claimed seq is
  visible the moment the caller holds the handle.
- New sessions/ module: Session types, SessionStore (in-memory +
  SQLite), SessionsImpl facade with a per-session KeyedMutex (extracted
  from the loop's TurnMutex). getHistory returns the same closed-out
  transcript the next turn will actually send.
- Migration 2026-06-12_0003_sessions: sessions table + additive turn
  columns. Provider/model/permissionMode deliberately flow per
  sendMessage call and land on the turn row, not the session.
- 21 new vitest tests (79 total): copy-forward, supersede-after-stop,
  crashed-turn blocking, terminal-mutation guards, concurrent-send
  serialization, SQLite round-trips and unique-seq enforcement.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Ramnique Singh 2026-06-12 19:26:18 +05:30
parent e29701fb5d
commit 8a5427e841
18 changed files with 1156 additions and 86 deletions

View file

@ -154,6 +154,14 @@ class SnapshottingStore implements TurnStore {
this.snapshots.push(structuredClone(turn));
await this.inner.update(turn);
}
async latestForSession(sessionId: string) {
return this.inner.latestForSession(sessionId);
}
async listBySession(sessionId: string) {
return this.inner.listBySession(sessionId);
}
}
function makeLoop(opts: {
@ -188,6 +196,8 @@ function emptyTurn(
provider: null,
model: null,
permissionMode: "manual",
sessionId: null,
sessionSeq: null,
messages: [],
permissionRequests: [],
permissionDecisions: [],
@ -211,12 +221,12 @@ describe("AgentLoopImpl", () => {
it("completes a simple text turn and persists input metadata", async () => {
const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi!") }] });
const turn = await loop.createTurn({
const turn = await (await loop.createTurn({
agentId: "a1",
provider: "openai",
model: "gpt-x",
messages: [userMsg("hello")],
}).result;
})).result;
expect(turn.agentId).toBe("a1");
expect(turn.provider).toBe("openai");
@ -229,7 +239,8 @@ describe("AgentLoopImpl", () => {
it("rejects a turn with no input messages", async () => {
const { loop } = makeLoop();
await expect(loop.createTurn({ messages: [] }).result).rejects.toThrow();
// invalid input now fails at the call itself — nothing is persisted
await expect(loop.createTurn({ messages: [] })).rejects.toThrow();
});
it("runs tool calls: result and error both append ToolMessages and continue", async () => {
@ -245,7 +256,7 @@ describe("AgentLoopImpl", () => {
}),
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(runner.ran).toEqual(["tc1", "tc2"]);
expect(toolMessages(turn)).toEqual([
@ -267,7 +278,7 @@ describe("AgentLoopImpl", () => {
runner: new FakeToolRunner({ "ask-human": () => ({ type: "pending" }) }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(deriveTurnStatus(waiting)).toBe("waiting");
expect(waiting.dispatchedTools.map((t) => t.toolCallId)).toEqual(["tc1"]);
expect(adapter.calls).toBe(1); // no model call while waiting
@ -291,7 +302,7 @@ describe("AgentLoopImpl", () => {
runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(waiting.dispatchedTools).toHaveLength(2);
const stillWaiting = await loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "r1" }).result;
@ -309,7 +320,7 @@ describe("AgentLoopImpl", () => {
runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
await expect(
loop.setToolResult(waiting.id, { toolCallId: "bogus", result: "x" }).result,
).rejects.toThrow("not awaiting an external result");
@ -327,7 +338,7 @@ describe("AgentLoopImpl", () => {
gate: new FakePermissionGate({ required: () => true }),
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(deriveTurnStatus(turn)).toBe("waiting");
expect(turn.permissionRequests.map((r) => r.toolCallId)).toEqual(["tc1", "tc2"]);
@ -351,7 +362,7 @@ describe("AgentLoopImpl", () => {
gate: new FakePermissionGate({ required: () => true }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
const afterDeny = await loop
.respondToPermission(waiting.id, "tc2", "denied", "nope").result;
@ -376,7 +387,7 @@ describe("AgentLoopImpl", () => {
steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }],
gate: new FakePermissionGate({ required: () => true }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
await expect(
loop.respondToPermission(waiting.id, "bogus", "granted").result,
).rejects.toThrow("No open permission request");
@ -413,7 +424,7 @@ describe("AgentLoopImpl", () => {
gate,
steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }],
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(deriveTurnStatus(turn)).toBe("waiting");
expect(gate.classifyCalls).toBe(0);
});
@ -437,10 +448,10 @@ describe("AgentLoopImpl", () => {
}),
});
const turn = await loop.createTurn({
const turn = await (await loop.createTurn({
messages: [userMsg("go")],
permissionMode: "auto",
}).result;
})).result;
expect(deriveTurnStatus(turn)).toBe("completed");
expect(runner.ran).toEqual(["tc1"]);
@ -465,10 +476,10 @@ describe("AgentLoopImpl", () => {
],
});
const waiting = await loop.createTurn({
const waiting = await (await loop.createTurn({
messages: [userMsg("go")],
permissionMode: "auto",
}).result;
})).result;
expect(deriveTurnStatus(waiting)).toBe("waiting");
expect(gate.classifyCalls).toBe(1);
@ -520,15 +531,10 @@ describe("AgentLoopImpl", () => {
});
});
it("stopTurn mid-stream persists no partial message; turn is idle and resumable", async () => {
const { loop } = makeLoop({
steps: [
{ kind: "hang" },
{ kind: "message", message: assistantText("second try") },
],
});
it("stopTurn mid-stream persists no partial message; the stop is a terminal error", async () => {
const { loop } = makeLoop({ steps: [{ kind: "hang" }] });
const handle = loop.createTurn({ messages: [userMsg("go")] });
const handle = await loop.createTurn({ messages: [userMsg("go")] });
// wait for the first streamed delta, then stop
const iterator = handle.events[Symbol.asyncIterator]();
const first = await iterator.next();
@ -536,13 +542,12 @@ describe("AgentLoopImpl", () => {
const stopped = await loop.stopTurn(handle.id);
expect(stopped.messages).toEqual([userMsg("go")]); // no partial persisted
expect(stopped.error).toBeNull();
expect(deriveTurnStatus(stopped)).toBe("idle");
expect(stopped.error?.code).toBe("stopped");
expect(deriveTurnStatus(stopped)).toBe("error");
await expect(handle.result).resolves.toBeTruthy(); // original handle also rests
const resumed = await loop.resumeTurn(handle.id).result;
expect(resumed.messages).toEqual([userMsg("go"), assistantText("second try")]);
expect(deriveTurnStatus(resumed)).toBe("completed");
// stop is terminal: the turn can never be resumed or mutated
await expect(loop.resumeTurn(handle.id).result).rejects.toThrow("terminal error");
});
it("stopTurn immediately after createTurn aborts the queued advance", async () => {
@ -550,17 +555,71 @@ describe("AgentLoopImpl", () => {
steps: [{ kind: "message", message: assistantText("should not run") }],
});
// No await between create and stop — the advance is still queued
// Stop as soon as the handle exists — the advance is still queued
// behind the mutex and its controller must already be abortable.
const handle = loop.createTurn({ messages: [userMsg("go")] });
const handle = await loop.createTurn({ messages: [userMsg("go")] });
const stopped = await loop.stopTurn(handle.id);
expect(adapter.calls).toBe(0); // model never called
expect(stopped.messages).toEqual([userMsg("go")]); // input fact persisted
expect(deriveTurnStatus(stopped)).toBe("idle");
expect(stopped.error?.code).toBe("stopped");
expect(deriveTurnStatus(stopped)).toBe("error");
});
const resumed = await loop.resumeTurn(handle.id).result;
expect(deriveTurnStatus(resumed)).toBe("completed");
it("stopTurn never overwrites a finished turn's outcome", async () => {
const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi") }] });
const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
const stopped = await loop.stopTurn(turn.id);
expect(stopped.error).toBeNull();
expect(deriveTurnStatus(stopped)).toBe("completed");
// completed is just as terminal as stopped: no resume either
await expect(loop.resumeTurn(turn.id).result).rejects.toThrow("already completed");
});
it("a stopped turn rejects every mutation: tool results, permission responses, resume", async () => {
const { loop } = makeLoop({
steps: [{
kind: "message",
message: assistantToolCalls(toolCall("tc1", "job"), toolCall("tc2", "write")),
}],
runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }),
gate: new FakePermissionGate({ required: (name) => name === "write" }),
});
// turn waits on tc2's permission; tc1 is deferred behind the batch
const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(deriveTurnStatus(waiting)).toBe("waiting");
const stopped = await loop.stopTurn(waiting.id);
expect(stopped.error?.code).toBe("stopped");
await expect(
loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "late" }).result,
).rejects.toThrow("terminal error");
await expect(
loop.respondToPermission(waiting.id, "tc2", "granted").result,
).rejects.toThrow("terminal error");
await expect(loop.resumeTurn(waiting.id).result).rejects.toThrow("terminal error");
// and none of it mutated the stopped turn
const after = await loop.getTurn(waiting.id);
expect(after).toEqual(stopped);
});
it("stopTurn terminates a waiting turn so it can be abandoned", async () => {
const { loop } = makeLoop({
steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }],
gate: new FakePermissionGate({ required: () => true }),
});
const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(deriveTurnStatus(waiting)).toBe("waiting");
const stopped = await loop.stopTurn(waiting.id);
expect(stopped.error?.code).toBe("stopped");
expect(deriveTurnStatus(stopped)).toBe("error");
await expect(loop.respondToPermission(waiting.id, "tc1", "granted").result)
.rejects.toThrow("terminal error");
});
it("a throwing tool runner becomes an error ToolMessage, not a turn error", async () => {
@ -574,7 +633,7 @@ describe("AgentLoopImpl", () => {
}),
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(turn.error).toBeNull();
expect(toolMessages(turn)[0]?.content).toBe("Tool execution failed: ECONNRESET");
expect(deriveTurnStatus(turn)).toBe("completed");
@ -602,7 +661,7 @@ describe("AgentLoopImpl", () => {
it("model failure is a terminal turn error; further mutations reject", async () => {
const { loop } = makeLoop({ steps: [{ kind: "error", error: new Error("rate limited") }] });
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(deriveTurnStatus(turn)).toBe("error");
expect(turn.error?.message).toBe("rate limited");
@ -623,7 +682,7 @@ describe("AgentLoopImpl", () => {
],
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(deriveTurnStatus(turn)).toBe("error");
expect(turn.error?.message).toContain("exceeded 3 iterations");
});
@ -640,7 +699,7 @@ describe("AgentLoopImpl", () => {
runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
const waiting = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
const [a, b] = await Promise.all([
loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "r1" }).result,
@ -664,7 +723,7 @@ describe("AgentLoopImpl", () => {
],
});
const handle = loop.createTurn({ messages: [userMsg("go")] });
const handle = await loop.createTurn({ messages: [userMsg("go")] });
const types: string[] = [];
for await (const event of handle.events) types.push(event.type);
@ -683,7 +742,7 @@ describe("AgentLoopImpl", () => {
it("getTurn returns the persisted turn; unknown ids reject", async () => {
const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi") }] });
const created = await loop.createTurn({ messages: [userMsg("hello")] }).result;
const created = await (await loop.createTurn({ messages: [userMsg("hello")] })).result;
const fetched = await loop.getTurn(created.id);
expect(fetched).toEqual(created);

View file

@ -3,6 +3,7 @@ import { z } from "zod";
import { ToolCallPart, ToolMessage } from "@x/shared/dist/message.js";
import { EventStream } from "./event-stream.js";
import type { ModelAdapter } from "./model-adapter.js";
import { KeyedMutex } from "./mutex.js";
import type { PermissionGate } from "./permission-gate.js";
import type { ToolRunner, ToolRunResult } from "./tool-runner.js";
import type { TurnStore } from "./turn-store.js";
@ -23,7 +24,10 @@ export type TurnHandle = {
};
export interface AgentLoop {
createTurn(input: z.infer<typeof AgentLoopInput>): TurnHandle;
// Async: the turn row is persisted before the handle is returned, so the
// turn (and its session seq, if any) is visible to other readers the
// moment the caller has the handle. The advance itself runs in background.
createTurn(input: z.infer<typeof AgentLoopInput>): Promise<TurnHandle>;
respondToPermission(
turnId: string,
toolCallId: string,
@ -31,30 +35,16 @@ export interface AgentLoop {
reason?: string,
): TurnHandle;
setToolResult(turnId: string, r: { toolCallId: string; result: unknown }): TurnHandle;
// Re-enters the reducer on an idle (crashed) or waiting turn. Rejects for
// terminal turns (completed, errored, or stopped) — like all mutations.
resumeTurn(turnId: string): TurnHandle;
getTurn(turnId: string): Promise<z.infer<typeof AgentLoopTurn>>;
// Aborts in-flight work and records the stop as a terminal turn error
// (code "stopped"). A stopped turn is immutable: it cannot be resumed,
// and pending permissions / dispatched results are abandoned.
stopTurn(turnId: string): Promise<z.infer<typeof AgentLoopTurn>>;
}
// Serializes async work per key. Unlike the try-lock in runs/lock.ts, callers
// queue instead of failing — public mutations on the same turn run in order.
class TurnMutex {
private chains = new Map<string, Promise<unknown>>();
run<T>(key: string, fn: () => Promise<T>): Promise<T> {
const prev = this.chains.get(key) ?? Promise.resolve();
const next = prev.then(fn, fn);
const tail: Promise<void> = next
.catch(() => undefined)
.then(() => {
// Drop the entry once the chain is fully drained.
if (this.chains.get(key) === tail) this.chains.delete(key);
});
this.chains.set(key, tail);
return next;
}
}
function nowIso(): string {
return new Date().toISOString();
}
@ -83,7 +73,7 @@ export class AgentLoopImpl implements AgentLoop {
private toolRunner: ToolRunner;
private permissionGate: PermissionGate;
private maxIterations: number;
private mutex = new TurnMutex();
private mutex = new KeyedMutex();
// All not-yet-finished entries per turn (running AND queued behind the
// mutex) — registered synchronously so stopTurn can never race past one.
private active = new Map<string, Set<AbortController>>();
@ -102,28 +92,35 @@ export class AgentLoopImpl implements AgentLoop {
this.maxIterations = deps.maxIterations ?? DEFAULT_MAX_ITERATIONS;
}
createTurn(input: z.infer<typeof AgentLoopInput>): TurnHandle {
async createTurn(input: z.infer<typeof AgentLoopInput>): Promise<TurnHandle> {
const parsed = AgentLoopInput.parse(input);
const turnId = crypto.randomUUID();
return this.enter(turnId, async () => {
const parsed = AgentLoopInput.parse(input);
const now = nowIso();
await this.store.create({
id: turnId,
agentId: parsed.agentId ?? null,
provider: parsed.provider ?? null,
model: parsed.model ?? null,
permissionMode: parsed.permissionMode ?? "manual",
messages: parsed.messages,
permissionRequests: [],
permissionDecisions: [],
startedTools: [],
dispatchedTools: [],
error: null,
completedAt: null,
createdAt: now,
updatedAt: now,
});
const now = nowIso();
// Persist before returning: the id is a fresh UUID so there is no
// contention, and callers (the sessions layer) rely on the row — and
// its claimed session seq — being visible once they hold the handle.
// Between this write and enter() below the turn is store-visible but
// not yet stoppable; acceptable while turn ids only reach callers via
// the returned handle, not via store polling.
await this.store.create({
id: turnId,
agentId: parsed.agentId ?? null,
provider: parsed.provider ?? null,
model: parsed.model ?? null,
permissionMode: parsed.permissionMode ?? "manual",
sessionId: parsed.sessionId ?? null,
sessionSeq: parsed.sessionSeq ?? null,
messages: parsed.messages,
permissionRequests: [],
permissionDecisions: [],
startedTools: [],
dispatchedTools: [],
error: null,
completedAt: null,
createdAt: now,
updatedAt: now,
});
return this.enter(turnId, async () => {});
}
respondToPermission(
@ -174,7 +171,9 @@ export class AgentLoopImpl implements AgentLoop {
resumeTurn(turnId: string): TurnHandle {
return this.enter(turnId, async () => {
await this.mustGet(turnId);
// Resuming a terminal turn is a caller error — reject loudly
// instead of quietly resolving, like every other mutation.
this.assertMutable(await this.mustGet(turnId));
});
}
@ -186,8 +185,21 @@ export class AgentLoopImpl implements AgentLoop {
for (const controller of this.active.get(turnId) ?? []) {
controller.abort();
}
// Queue behind the in-flight advance so it has fully wound down.
return this.mutex.run(turnId, () => this.mustGet(turnId));
// Queue behind the in-flight advance so it has fully wound down, then
// make the stop itself a persisted fact: the turn lands in a terminal
// error and can never be resumed or mutated. A turn that already
// finished keeps its outcome — stop never overwrites it.
return this.mutex.run(turnId, async () => {
const turn = await this.mustGet(turnId);
if (turn.error !== null || turn.completedAt !== null) return turn;
turn.error = {
message: "Stopped by the user",
code: "stopped",
at: nowIso(),
};
await this.persist(turn);
return turn;
});
}
// ── internals ───────────────────────────────────────────────────────────
@ -390,7 +402,9 @@ export class AgentLoopImpl implements AgentLoop {
turn.messages.push(assistantMessage);
await this.persist(turn);
} catch (error) {
if (signal.aborted) return; // stopped: turn stays as persisted (idle)
// stopped: facts stay as persisted; stopTurn's queued job
// records the terminal "stopped" error after this winds down.
if (signal.aborted) return;
await this.setTurnError(turnId, error);
return;
}

View file

@ -9,6 +9,17 @@ export class InMemoryTurnStore implements TurnStore {
if (this.turns.has(turn.id)) {
throw new Error(`Turn already exists: ${turn.id}`);
}
// Mirror the SQLite UNIQUE(session_id, session_seq) tripwire — NULL
// seqs never conflict, matching SQLite's distinct-NULLs semantics.
if (turn.sessionId !== null && turn.sessionSeq !== null) {
for (const existing of this.turns.values()) {
if (existing.sessionId === turn.sessionId && existing.sessionSeq === turn.sessionSeq) {
throw new Error(
`Turn with session seq already exists: ${turn.sessionId}#${turn.sessionSeq}`,
);
}
}
}
this.turns.set(turn.id, structuredClone(turn));
}
@ -23,4 +34,16 @@ export class InMemoryTurnStore implements TurnStore {
}
this.turns.set(turn.id, structuredClone(turn));
}
async latestForSession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn> | null> {
const turns = await this.listBySession(sessionId);
return turns.length > 0 ? turns[turns.length - 1] : null;
}
async listBySession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn>[]> {
return [...this.turns.values()]
.filter((turn) => turn.sessionId === sessionId)
.sort((a, b) => (a.sessionSeq ?? 0) - (b.sessionSeq ?? 0))
.map((turn) => structuredClone(turn));
}
}

View file

@ -0,0 +1,19 @@
// Serializes async work per key. Unlike the try-lock in runs/lock.ts, callers
// queue instead of failing — operations on the same key run in order. Used
// per-turn by the agent loop and per-session by the sessions layer.
export class KeyedMutex {
private chains = new Map<string, Promise<unknown>>();
run<T>(key: string, fn: () => Promise<T>): Promise<T> {
const prev = this.chains.get(key) ?? Promise.resolve();
const next = prev.then(fn, fn);
const tail: Promise<void> = next
.catch(() => undefined)
.then(() => {
// Drop the entry once the chain is fully drained.
if (this.chains.get(key) === tail) this.chains.delete(key);
});
this.chains.set(key, tail);
return next;
}
}

View file

@ -44,13 +44,18 @@ async function loadStore() {
return { store: new SqliteTurnStore(storageModule.getDb()), db: storageModule.getDb() };
}
function sampleTurn(id: string): z.infer<typeof AgentLoopTurn> {
function sampleTurn(
id: string,
overrides: Partial<z.infer<typeof AgentLoopTurn>> = {},
): z.infer<typeof AgentLoopTurn> {
return {
id,
agentId: "agent-1",
provider: "openai",
model: "gpt-x",
permissionMode: "auto",
sessionId: null,
sessionSeq: null,
messages: [
{ role: "user", content: "hello" },
{
@ -80,6 +85,7 @@ function sampleTurn(id: string): z.infer<typeof AgentLoopTurn> {
completedAt: null,
createdAt: "2026-06-12T00:00:00Z",
updatedAt: "2026-06-12T00:00:03Z",
...overrides,
};
}
@ -124,6 +130,35 @@ describe("SqliteTurnStore", () => {
await expect(store.update(sampleTurn("missing"))).rejects.toThrow("Turn not found");
});
it("round-trips session linkage and queries turns by session in seq order", async () => {
const { store } = await loadStore();
const t1 = sampleTurn("t1", { sessionId: "s1", sessionSeq: 1 });
const t2 = sampleTurn("t2", { sessionId: "s1", sessionSeq: 2 });
const other = sampleTurn("t3", { sessionId: "s2", sessionSeq: 1 });
// insert out of order to prove ordering comes from seq
await store.create(t2);
await store.create(t1);
await store.create(other);
await store.create(sampleTurn("standalone"));
expect(await store.get("t1")).toEqual(t1);
expect(await store.listBySession("s1")).toEqual([t1, t2]);
expect(await store.latestForSession("s1")).toEqual(t2);
expect(await store.latestForSession("missing")).toBeNull();
expect(await store.listBySession("missing")).toEqual([]);
});
it("rejects a duplicate session seq via the unique index", async () => {
const { store } = await loadStore();
await store.create(sampleTurn("t1", { sessionId: "s1", sessionSeq: 1 }));
await expect(
store.create(sampleTurn("t2", { sessionId: "s1", sessionSeq: 1 })),
).rejects.toThrow();
// standalone turns never conflict (NULL session_id)
await store.create(sampleTurn("t3"));
await store.create(sampleTurn("t4"));
});
it("fails loudly on a corrupted JSON column", async () => {
const { store, db } = await loadStore();
await store.create(sampleTurn("t1"));

View file

@ -46,6 +46,27 @@ export class SqliteTurnStore implements TurnStore {
throw new Error(`Turn not found: ${id}`);
}
}
async latestForSession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn> | null> {
const row = await this.db
.selectFrom("agent_loop_turns")
.selectAll()
.where("session_id", "=", sessionId)
.orderBy("session_seq", "desc")
.limit(1)
.executeTakeFirst();
return row ? fromRow(row) : null;
}
async listBySession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn>[]> {
const rows = await this.db
.selectFrom("agent_loop_turns")
.selectAll()
.where("session_id", "=", sessionId)
.orderBy("session_seq", "asc")
.execute();
return rows.map(fromRow);
}
}
function toRow(turn: z.infer<typeof AgentLoopTurn>): Insertable<AgentLoopTurnsTable> {
@ -55,6 +76,8 @@ function toRow(turn: z.infer<typeof AgentLoopTurn>): Insertable<AgentLoopTurnsTa
provider: turn.provider,
model: turn.model,
permission_mode: turn.permissionMode,
session_id: turn.sessionId,
session_seq: turn.sessionSeq,
messages: JSON.stringify(turn.messages),
permission_requests: JSON.stringify(turn.permissionRequests),
permission_decisions: JSON.stringify(turn.permissionDecisions),
@ -74,6 +97,8 @@ function fromRow(row: Selectable<AgentLoopTurnsTable>): z.infer<typeof AgentLoop
provider: row.provider,
model: row.model,
permissionMode: PermissionMode.parse(row.permission_mode),
sessionId: row.session_id,
sessionSeq: row.session_seq,
messages: MessageList.parse(JSON.parse(row.messages)),
permissionRequests: z.array(PermissionRequest).parse(JSON.parse(row.permission_requests)),
permissionDecisions: z.array(PermissionDecision).parse(JSON.parse(row.permission_decisions)),

View file

@ -7,4 +7,7 @@ export interface TurnStore {
create(turn: z.infer<typeof AgentLoopTurn>): Promise<void>;
get(id: string): Promise<z.infer<typeof AgentLoopTurn> | null>;
update(turn: z.infer<typeof AgentLoopTurn>): Promise<void>;
// Session linkage queries (used by the sessions layer); ordered by sessionSeq.
latestForSession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn> | null>;
listBySession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn>[]>;
}

View file

@ -63,6 +63,11 @@ export const AgentLoopTurn = z.object({
model: z.string().nullable(),
permissionMode: PermissionMode,
// Session linkage — opaque to the loop (the sessions layer owns the
// meaning). seq is the turn's 1-based position within its session.
sessionId: z.string().nullable(),
sessionSeq: z.number().int().positive().nullable(),
// append-only fact logs
messages: MessageList,
permissionRequests: z.array(PermissionRequest),
@ -83,9 +88,14 @@ export const AgentLoopInput = z.object({
provider: z.string().nullable().optional(),
model: z.string().nullable().optional(),
permissionMode: PermissionMode.optional(),
sessionId: z.string().nullable().optional(),
sessionSeq: z.number().int().positive().nullable().optional(),
// May include prior-conversation history; turns are self-contained by design.
messages: MessageList.min(1),
});
}).refine(
(input) => (input.sessionId == null) === (input.sessionSeq == null),
{ message: "sessionId and sessionSeq must be set together" },
);
// ─── Tool definitions (environment, not turn state) ────────────────────────

View file

@ -0,0 +1,33 @@
import { z } from "zod";
import type { SessionStore } from "./session-store.js";
import { Session } from "./types.js";
export class InMemorySessionStore implements SessionStore {
private sessions = new Map<string, z.infer<typeof Session>>();
async create(session: z.infer<typeof Session>): Promise<void> {
if (this.sessions.has(session.id)) {
throw new Error(`Session already exists: ${session.id}`);
}
this.sessions.set(session.id, structuredClone(session));
}
async get(id: string): Promise<z.infer<typeof Session> | null> {
const session = this.sessions.get(id);
return session ? structuredClone(session) : null;
}
async list(filter?: { agentId?: string }): Promise<z.infer<typeof Session>[]> {
return [...this.sessions.values()]
.filter((s) => filter?.agentId === undefined || s.agentId === filter.agentId)
.sort((a, b) => b.updatedAt.localeCompare(a.updatedAt))
.map((s) => structuredClone(s));
}
async update(session: z.infer<typeof Session>): Promise<void> {
if (!this.sessions.has(session.id)) {
throw new Error(`Session not found: ${session.id}`);
}
this.sessions.set(session.id, structuredClone(session));
}
}

View file

@ -0,0 +1,5 @@
export * from "./types.js";
export * from "./session-store.js";
export * from "./in-memory-session-store.js";
export * from "./sqlite-session-store.js";
export * from "./sessions.js";

View file

@ -0,0 +1,12 @@
import { z } from "zod";
import { Session } from "./types.js";
// Durable storage for session rows. The per-session mutex lives ABOVE the
// store (in SessionsImpl), not in it.
export interface SessionStore {
create(session: z.infer<typeof Session>): Promise<void>;
get(id: string): Promise<z.infer<typeof Session> | null>;
// Most recently active first (updatedAt descending).
list(filter?: { agentId?: string }): Promise<z.infer<typeof Session>[]>;
update(session: z.infer<typeof Session>): Promise<void>;
}

View file

@ -0,0 +1,399 @@
import { describe, expect, it } from "vitest";
import { z } from "zod";
import {
AssistantMessage,
Message,
ToolCallPart,
} from "@x/shared/dist/message.js";
import { AgentLoopImpl } from "../agent-loop/agent-loop.js";
import { EventStream } from "../agent-loop/event-stream.js";
import { InMemoryTurnStore } from "../agent-loop/in-memory-turn-store.js";
import type { ModelAdapter, ModelStreamRequest } from "../agent-loop/model-adapter.js";
import type { PermissionGate } from "../agent-loop/permission-gate.js";
import type { ToolRunner, ToolRunResult } from "../agent-loop/tool-runner.js";
import {
AgentLoopTurn,
deriveTurnStatus,
type ModelStreamEvent,
} from "../agent-loop/types.js";
import { InMemorySessionStore } from "./in-memory-session-store.js";
import { SessionsImpl } from "./sessions.js";
// ─── helpers ────────────────────────────────────────────────────────────────
function userMsg(text: string): z.infer<typeof Message> {
return { role: "user", content: text };
}
function assistantText(text: string): z.infer<typeof AssistantMessage> {
return { role: "assistant", content: text };
}
function toolCall(toolCallId: string, toolName: string): z.infer<typeof ToolCallPart> {
return { type: "tool-call", toolCallId, toolName, arguments: {} };
}
function assistantToolCalls(
...calls: z.infer<typeof ToolCallPart>[]
): z.infer<typeof AssistantMessage> {
return { role: "assistant", content: calls };
}
type ModelStep =
| { kind: "message"; message: z.infer<typeof AssistantMessage> }
| { kind: "hang" };
class FakeModelAdapter implements ModelAdapter {
calls = 0;
constructor(private steps: ModelStep[]) {}
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>> {
this.calls++;
const out = new EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>();
const step = this.steps.shift();
void (async () => {
await Promise.resolve();
if (!step) {
const error = new Error("FakeModelAdapter: no scripted step left");
out.fail(error);
return;
}
if (step.kind === "hang") {
req.signal.addEventListener("abort", () => {
out.fail(new Error("aborted"));
}, { once: true });
return;
}
out.push({ type: "finish", message: step.message });
out.end(step.message);
})();
return out;
}
}
class FakeToolRunner implements ToolRunner {
ran: string[] = [];
definitions() {
return [];
}
async run(call: z.infer<typeof ToolCallPart>): Promise<ToolRunResult> {
this.ran.push(call.toolCallId);
return { type: "result", value: `ok:${call.toolName}` };
}
}
class FakePermissionGate implements PermissionGate {
constructor(private required: (toolName: string) => boolean = () => false) {}
async check(call: z.infer<typeof ToolCallPart>) {
return this.required(call.toolName)
? { required: true as const, request: { tool: call.toolName } }
: { required: false as const };
}
async classify(): Promise<{ decision: "abstained"; reason: string }> {
return { decision: "abstained", reason: "unsure" };
}
}
function makeSessions(opts: { steps?: ModelStep[]; gate?: FakePermissionGate } = {}) {
const turnStore = new InMemoryTurnStore();
const sessionStore = new InMemorySessionStore();
const adapter = new FakeModelAdapter(opts.steps ?? []);
const runner = new FakeToolRunner();
const loop = new AgentLoopImpl({
store: turnStore,
modelAdapter: adapter,
toolRunner: runner,
permissionGate: opts.gate ?? new FakePermissionGate(),
});
const sessions = new SessionsImpl({ sessionStore, turnStore, agentLoop: loop });
return { sessions, loop, turnStore, sessionStore, adapter, runner };
}
function turnFixture(
id: string,
overrides: Partial<z.infer<typeof AgentLoopTurn>> = {},
): z.infer<typeof AgentLoopTurn> {
const now = new Date().toISOString();
return {
id,
agentId: null,
provider: null,
model: null,
permissionMode: "manual",
sessionId: null,
sessionSeq: null,
messages: [],
permissionRequests: [],
permissionDecisions: [],
startedTools: [],
dispatchedTools: [],
error: null,
completedAt: null,
createdAt: now,
updatedAt: now,
...overrides,
};
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// ─── tests ──────────────────────────────────────────────────────────────────
describe("SessionsImpl", () => {
it("creates, fetches, and lists sessions with an agent filter", async () => {
const { sessions } = makeSessions();
const s1 = await sessions.createSession({ agentId: "a1", title: "first" });
const s2 = await sessions.createSession({ agentId: "a2" });
expect(await sessions.getSession(s1.id)).toEqual(s1);
expect(s2.title).toBeNull();
expect((await sessions.listSessions()).map((s) => s.id).sort())
.toEqual([s1.id, s2.id].sort());
expect(await sessions.listSessions({ agentId: "a1" })).toEqual([s1]);
await expect(sessions.getSession("missing")).rejects.toThrow("Session not found");
});
it("first sendMessage creates turn 1 with session linkage and per-call config", async () => {
const { sessions } = makeSessions({
steps: [{ kind: "message", message: assistantText("hi!") }],
});
const session = await sessions.createSession({ agentId: "a1" });
const handle = await sessions.sendMessage(
session.id,
[userMsg("hello")],
{ provider: "openai", model: "gpt-x" },
);
const turn = await handle.result;
expect(turn.sessionId).toBe(session.id);
expect(turn.sessionSeq).toBe(1);
expect(turn.agentId).toBe("a1"); // stamped from the session
expect(turn.provider).toBe("openai");
expect(turn.model).toBe("gpt-x");
expect(turn.messages).toEqual([userMsg("hello"), assistantText("hi!")]);
expect(deriveTurnStatus(turn)).toBe("completed");
});
it("copies the previous transcript forward into the next turn", async () => {
const { sessions } = makeSessions({
steps: [
{ kind: "message", message: assistantText("answer one") },
{ kind: "message", message: assistantText("answer two") },
],
});
const session = await sessions.createSession();
await (await sessions.sendMessage(session.id, [userMsg("one")])).result;
const turn2 = await (await sessions.sendMessage(session.id, [userMsg("two")])).result;
expect(turn2.sessionSeq).toBe(2);
expect(turn2.messages).toEqual([
userMsg("one"),
assistantText("answer one"),
userMsg("two"),
assistantText("answer two"),
]);
expect(await sessions.getHistory(session.id)).toEqual(turn2.messages);
expect((await sessions.listTurns(session.id)).map((t) => t.sessionSeq)).toEqual([1, 2]);
});
it("rejects empty messages and unknown sessions", async () => {
const { sessions } = makeSessions();
const session = await sessions.createSession();
await expect(sessions.sendMessage(session.id, [])).rejects.toThrow();
await expect(sessions.sendMessage("missing", [userMsg("hi")]))
.rejects.toThrow("Session not found");
});
it("rejects while a turn is in flight; a stopped turn can be superseded", async () => {
const { sessions, loop } = makeSessions({
steps: [
{ kind: "hang" },
{ kind: "message", message: assistantText("fresh start") },
],
});
const session = await sessions.createSession();
const first = await sessions.sendMessage(session.id, [userMsg("one")]);
await expect(sessions.sendMessage(session.id, [userMsg("two")]))
.rejects.toThrow("not finished");
await loop.stopTurn(first.id); // terminal "stopped" error, nothing partial persisted
const turn2 = await (await sessions.sendMessage(session.id, [userMsg("two")])).result;
expect(turn2.sessionSeq).toBe(2);
expect(turn2.messages).toEqual([
userMsg("one"), // stopped turn's input is still part of the transcript
userMsg("two"),
assistantText("fresh start"),
]);
});
it("rejects while the latest turn waits on a permission", async () => {
const { sessions, loop } = makeSessions({
steps: [
{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) },
{ kind: "message", message: assistantText("done") },
{ kind: "message", message: assistantText("next answer") },
],
gate: new FakePermissionGate(() => true),
});
const session = await sessions.createSession();
const first = await sessions.sendMessage(session.id, [userMsg("one")]);
const waiting = await first.result;
expect(deriveTurnStatus(waiting)).toBe("waiting");
await expect(sessions.sendMessage(session.id, [userMsg("two")]))
.rejects.toThrow("not finished");
const done = await loop.respondToPermission(waiting.id, "tc1", "granted").result;
expect(deriveTurnStatus(done)).toBe("completed");
const turn2 = await (await sessions.sendMessage(session.id, [userMsg("two")])).result;
expect(turn2.sessionSeq).toBe(2);
});
it("a crashed turn blocks the session until stopped; its calls are closed out", async () => {
const { sessions, loop, turnStore, runner } = makeSessions({
steps: [{ kind: "message", message: assistantText("moving on") }],
});
const session = await sessions.createSession();
// crafted idle turn (crash): tc1 never evaluated, tc2 started then crashed
await turnStore.create(turnFixture("t1", {
sessionId: session.id,
sessionSeq: 1,
messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "calc"), toolCall("tc2", "send-email"))],
startedTools: [{ toolCallId: "tc2", startedAt: "2026-06-12T00:00:00Z" }],
}));
// an idle (crashed) turn must be explicitly resumed or stopped first
await expect(sessions.sendMessage(session.id, [userMsg("never mind")]))
.rejects.toThrow("not finished");
await loop.stopTurn("t1");
const closures = [
{
role: "tool" as const,
content: "Tool was not executed: the turn was stopped before this call ran.",
toolCallId: "tc1",
toolName: "calc",
},
{
role: "tool" as const,
content: "Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran.",
toolCallId: "tc2",
toolName: "send-email",
},
];
// getHistory shows the same closed-out transcript the next turn will use
expect(await sessions.getHistory(session.id)).toEqual([
userMsg("go"),
assistantToolCalls(toolCall("tc1", "calc"), toolCall("tc2", "send-email")),
...closures,
]);
const turn2 = await (await sessions.sendMessage(session.id, [userMsg("never mind")])).result;
expect(runner.ran).toEqual([]); // stale calls are closed out, never executed
expect(turn2.messages).toEqual([
userMsg("go"),
assistantToolCalls(toolCall("tc1", "calc"), toolCall("tc2", "send-email")),
...closures,
userMsg("never mind"),
assistantText("moving on"),
]);
expect(deriveTurnStatus(turn2)).toBe("completed");
});
it("a stopped waiting turn can be superseded; the abandoned call never runs", async () => {
const { sessions, loop, runner } = makeSessions({
steps: [
{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) },
{ kind: "message", message: assistantText("skipped it") },
],
gate: new FakePermissionGate(() => true),
});
const session = await sessions.createSession();
const waiting = await (await sessions.sendMessage(session.id, [userMsg("one")])).result;
expect(deriveTurnStatus(waiting)).toBe("waiting");
await loop.stopTurn(waiting.id); // abandon instead of answering the permission
const turn2 = await (await sessions.sendMessage(session.id, [userMsg("skip it")])).result;
expect(runner.ran).toEqual([]);
expect(turn2.sessionSeq).toBe(2);
expect(turn2.messages).toEqual([
userMsg("one"),
assistantToolCalls(toolCall("tc1", "write")),
{
role: "tool",
content: "Tool was not executed: the turn was stopped before this call ran.",
toolCallId: "tc1",
toolName: "write",
},
userMsg("skip it"),
assistantText("skipped it"),
]);
});
it("builds on an errored turn's persisted transcript", async () => {
const { sessions, turnStore } = makeSessions({
steps: [{ kind: "message", message: assistantText("recovered") }],
});
const session = await sessions.createSession();
await turnStore.create(turnFixture("t1", {
sessionId: session.id,
sessionSeq: 1,
messages: [userMsg("go")],
error: { message: "rate limited", at: "2026-06-12T00:00:00Z" },
}));
const turn2 = await (await sessions.sendMessage(session.id, [userMsg("retry")])).result;
expect(turn2.sessionSeq).toBe(2);
expect(turn2.messages).toEqual([userMsg("go"), userMsg("retry"), assistantText("recovered")]);
// the errored turn is untouched — error is terminal per turn, not per session
expect((await turnStore.get("t1"))?.error?.message).toBe("rate limited");
});
it("serializes concurrent sends: exactly one wins while a turn is in flight", async () => {
const { sessions } = makeSessions({ steps: [{ kind: "hang" }] });
const session = await sessions.createSession();
const [a, b] = await Promise.allSettled([
sessions.sendMessage(session.id, [userMsg("one")]),
sessions.sendMessage(session.id, [userMsg("two")]),
]);
expect(a.status).toBe("fulfilled");
expect(b.status).toBe("rejected");
const latest = await sessions.getHistory(session.id);
expect(latest).toEqual([userMsg("one")]);
});
it("sendMessage bumps the session's recency for listSessions ordering", async () => {
const { sessions } = makeSessions({
steps: [{ kind: "message", message: assistantText("hi") }],
});
const s1 = await sessions.createSession();
await sleep(10);
const s2 = await sessions.createSession();
expect((await sessions.listSessions()).map((s) => s.id)).toEqual([s2.id, s1.id]);
await sleep(10);
await (await sessions.sendMessage(s1.id, [userMsg("hello")])).result;
expect((await sessions.listSessions()).map((s) => s.id)).toEqual([s1.id, s2.id]);
});
});

View file

@ -0,0 +1,162 @@
import crypto from "node:crypto";
import { z } from "zod";
import { Message, MessageList } from "@x/shared/dist/message.js";
import type { AgentLoop, TurnHandle } from "../agent-loop/agent-loop.js";
import { KeyedMutex } from "../agent-loop/mutex.js";
import type { TurnStore } from "../agent-loop/turn-store.js";
import {
AgentLoopTurn,
deriveToolCallState,
deriveTurnStatus,
unresolvedToolCalls,
} from "../agent-loop/types.js";
import type { SessionStore } from "./session-store.js";
import { CreateSessionInput, SendMessageOptions, Session } from "./types.js";
// A thin layer above the agent loop: a session is an ordered chain of
// self-contained turns. sendMessage builds the next turn's input from the
// previous turn's full transcript (copy-forward history) — the loop itself
// never learns that sessions exist.
export interface Sessions {
createSession(input?: z.infer<typeof CreateSessionInput>): Promise<z.infer<typeof Session>>;
getSession(sessionId: string): Promise<z.infer<typeof Session>>;
listSessions(filter?: { agentId?: string }): Promise<z.infer<typeof Session>[]>;
sendMessage(
sessionId: string,
messages: z.infer<typeof MessageList>,
options?: z.infer<typeof SendMessageOptions>,
): Promise<TurnHandle>;
getHistory(sessionId: string): Promise<z.infer<typeof MessageList>>;
listTurns(sessionId: string): Promise<z.infer<typeof AgentLoopTurn>[]>;
}
function nowIso(): string {
return new Date().toISOString();
}
export class SessionsImpl implements Sessions {
private sessionStore: SessionStore;
private turnStore: TurnStore;
private agentLoop: AgentLoop;
private mutex = new KeyedMutex();
constructor(deps: {
sessionStore: SessionStore;
turnStore: TurnStore;
agentLoop: AgentLoop;
}) {
this.sessionStore = deps.sessionStore;
this.turnStore = deps.turnStore;
this.agentLoop = deps.agentLoop;
}
async createSession(
input: z.infer<typeof CreateSessionInput> = {},
): Promise<z.infer<typeof Session>> {
const parsed = CreateSessionInput.parse(input);
const now = nowIso();
const session: z.infer<typeof Session> = {
id: crypto.randomUUID(),
agentId: parsed.agentId ?? null,
title: parsed.title ?? null,
createdAt: now,
updatedAt: now,
};
await this.sessionStore.create(session);
return session;
}
async getSession(sessionId: string): Promise<z.infer<typeof Session>> {
return this.mustGetSession(sessionId);
}
async listSessions(filter?: { agentId?: string }): Promise<z.infer<typeof Session>[]> {
return this.sessionStore.list(filter);
}
async sendMessage(
sessionId: string,
messages: z.infer<typeof MessageList>,
options: z.infer<typeof SendMessageOptions> = {},
): Promise<TurnHandle> {
// Validate the NEW messages alone — combined with history they would
// pass the loop's min(1) even when empty.
const newMessages = MessageList.min(1).parse(messages);
const parsedOptions = SendMessageOptions.parse(options);
return this.mutex.run(sessionId, async () => {
const session = await this.mustGetSession(sessionId);
const latest = await this.turnStore.latestForSession(sessionId);
if (latest) {
// A session only ever chains on TERMINAL turns. Anything else
// (running, waiting on a permission/tool result, or idle after
// a crash) must be resolved first: wait for it, respond to
// it, resume it, or stopTurn it — a stop is itself terminal.
// Terminal turns are immutable, so the snapshot we copy
// forward below can never go stale or be re-activated.
const status = deriveTurnStatus(latest);
if (status !== "completed" && status !== "error") {
throw new Error(
`Session's latest turn is not finished: ${latest.id} (status: ${status})`,
);
}
}
// Bump recency BEFORE creating the turn: if this write fails, no
// orphan turn is left running with its handle lost to the caller.
session.updatedAt = nowIso();
await this.sessionStore.update(session);
return this.agentLoop.createTurn({
agentId: session.agentId,
provider: parsedOptions.provider ?? null,
model: parsedOptions.model ?? null,
...(parsedOptions.permissionMode !== undefined
? { permissionMode: parsedOptions.permissionMode }
: {}),
sessionId,
sessionSeq: (latest?.sessionSeq ?? 0) + 1,
messages: [...(latest ? historyFrom(latest) : []), ...newMessages],
});
});
}
// The transcript as the next turn will see it: a stopped turn's dangling
// tool calls appear closed out, matching what sendMessage actually sends.
async getHistory(sessionId: string): Promise<z.infer<typeof MessageList>> {
await this.mustGetSession(sessionId);
const latest = await this.turnStore.latestForSession(sessionId);
return latest ? historyFrom(latest) : [];
}
async listTurns(sessionId: string): Promise<z.infer<typeof AgentLoopTurn>[]> {
await this.mustGetSession(sessionId);
return this.turnStore.listBySession(sessionId);
}
private async mustGetSession(sessionId: string): Promise<z.infer<typeof Session>> {
const session = await this.sessionStore.get(sessionId);
if (!session) throw new Error(`Session not found: ${sessionId}`);
return session;
}
}
// Copy-forward history: the next turn's input is the previous turn's full
// transcript. A stopped turn can carry unresolved tool calls; they are closed
// out with synthetic ToolMessages so the new turn never re-executes — or
// hangs on — stale calls. This is the sessions-layer analogue of the
// reducer's interrupted-call handling.
function historyFrom(
turn: z.infer<typeof AgentLoopTurn>,
): z.infer<typeof Message>[] {
const messages = [...turn.messages];
for (const call of unresolvedToolCalls(turn)) {
const interrupted = deriveToolCallState(turn, call.toolCallId) === "interrupted";
messages.push({
role: "tool",
content: interrupted
? "Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran."
: "Tool was not executed: the turn was stopped before this call ran.",
toolCallId: call.toolCallId,
toolName: call.toolName,
});
}
return messages;
}

View file

@ -0,0 +1,115 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { sql } from "kysely";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { z } from "zod";
import type { Session } from "./types.js";
let tmpDir: string;
let workspaceDir: string;
let storageModule: typeof import("../storage/index.js") | null = null;
beforeEach(async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "rowboat-sessions-test-"));
workspaceDir = path.join(tmpDir, "workspace");
process.env.ROWBOAT_WORKDIR = workspaceDir;
vi.resetModules();
// config.ts kicks off knowledge-repo init as an import side effect; mock it
// out so tests don't touch git (same pattern as storage/storage.test.ts).
vi.doMock("../knowledge/version_history.js", () => ({
initRepo: vi.fn(async () => undefined),
}));
vi.doMock("../knowledge/deprecate_today_note.js", () => ({
deprecateTodayNote: vi.fn(async () => undefined),
}));
});
afterEach(async () => {
if (storageModule) {
await storageModule.shutdownStorage().catch(() => undefined);
storageModule = null;
}
delete process.env.ROWBOAT_WORKDIR;
vi.doUnmock("../knowledge/version_history.js");
vi.doUnmock("../knowledge/deprecate_today_note.js");
vi.resetModules();
await fs.rm(tmpDir, { recursive: true, force: true });
});
async function loadStore() {
storageModule = await import("../storage/index.js");
await storageModule.initStorage();
const { SqliteSessionStore } = await import("./sqlite-session-store.js");
return { store: new SqliteSessionStore(storageModule.getDb()), db: storageModule.getDb() };
}
function sampleSession(
id: string,
overrides: Partial<z.infer<typeof Session>> = {},
): z.infer<typeof Session> {
return {
id,
agentId: "agent-1",
title: "a chat",
createdAt: "2026-06-12T00:00:00Z",
updatedAt: "2026-06-12T00:00:00Z",
...overrides,
};
}
describe("SqliteSessionStore", () => {
it("migration creates the sessions table and index", async () => {
const { db } = await loadStore();
const tables = await sql<{ name: string }>`
select name from sqlite_master
where type = 'table' and name = 'sessions'
`.execute(db);
expect(tables.rows).toHaveLength(1);
const indexes = await sql<{ name: string }>`
select name from sqlite_master
where type = 'index' and name = 'sessions_updated_at_idx'
`.execute(db);
expect(indexes.rows).toHaveLength(1);
});
it("round-trips every column", async () => {
const { store } = await loadStore();
const session = sampleSession("s1");
await store.create(session);
expect(await store.get("s1")).toEqual(session);
const updated = { ...session, title: "renamed", updatedAt: "2026-06-12T01:00:00Z" };
await store.update(updated);
expect(await store.get("s1")).toEqual(updated);
const nulls = sampleSession("s2", { agentId: null, title: null });
await store.create(nulls);
expect(await store.get("s2")).toEqual(nulls);
});
it("lists most recently updated first, with an optional agent filter", async () => {
const { store } = await loadStore();
const older = sampleSession("s1", { updatedAt: "2026-06-12T00:00:01Z" });
const newer = sampleSession("s2", { updatedAt: "2026-06-12T00:00:02Z" });
const otherAgent = sampleSession("s3", {
agentId: "agent-2",
updatedAt: "2026-06-12T00:00:03Z",
});
await store.create(older);
await store.create(newer);
await store.create(otherAgent);
expect((await store.list()).map((s) => s.id)).toEqual(["s3", "s2", "s1"]);
expect((await store.list({ agentId: "agent-1" })).map((s) => s.id)).toEqual(["s2", "s1"]);
});
it("returns null for unknown ids and rejects updates to missing sessions", async () => {
const { store } = await loadStore();
expect(await store.get("missing")).toBeNull();
await expect(store.update(sampleSession("missing"))).rejects.toThrow("Session not found");
});
});

View file

@ -0,0 +1,71 @@
import type { Insertable, Kysely, Selectable } from "kysely";
import { z } from "zod";
import type { Database, SessionsTable } from "../storage/schema.js";
import type { SessionStore } from "./session-store.js";
import { Session } from "./types.js";
// Accepts a Kysely<Database> from the existing getDb(); it does not own the
// storage lifecycle (never calls initStorage()).
export class SqliteSessionStore implements SessionStore {
constructor(private db: Kysely<Database>) {}
async create(session: z.infer<typeof Session>): Promise<void> {
await this.db
.insertInto("sessions")
.values(toRow(session))
.execute();
}
async get(id: string): Promise<z.infer<typeof Session> | null> {
const row = await this.db
.selectFrom("sessions")
.selectAll()
.where("id", "=", id)
.executeTakeFirst();
return row ? fromRow(row) : null;
}
async list(filter?: { agentId?: string }): Promise<z.infer<typeof Session>[]> {
let query = this.db
.selectFrom("sessions")
.selectAll()
.orderBy("updated_at", "desc");
if (filter?.agentId !== undefined) {
query = query.where("agent_id", "=", filter.agentId);
}
const rows = await query.execute();
return rows.map(fromRow);
}
async update(session: z.infer<typeof Session>): Promise<void> {
const { id, ...rest } = toRow(session);
const result = await this.db
.updateTable("sessions")
.set(rest)
.where("id", "=", id)
.executeTakeFirst();
if (result.numUpdatedRows === 0n) {
throw new Error(`Session not found: ${id}`);
}
}
}
function toRow(session: z.infer<typeof Session>): Insertable<SessionsTable> {
return {
id: session.id,
agent_id: session.agentId,
title: session.title,
created_at: session.createdAt,
updated_at: session.updatedAt,
};
}
function fromRow(row: Selectable<SessionsTable>): z.infer<typeof Session> {
return {
id: row.id,
agentId: row.agent_id,
title: row.title,
createdAt: row.created_at,
updatedAt: row.updated_at,
};
}

View file

@ -0,0 +1,27 @@
import { z } from "zod";
import { PermissionMode } from "../agent-loop/types.js";
// A session is a grouping label plus a title — an ordered chain of turns,
// linked via the turn's sessionId/sessionSeq. All configuration (provider,
// model, permission mode) flows through sendMessage at the moment it is used
// and lands on the turn row as the durable record; the session deliberately
// stores none of it. agentId is the exception: a session is a conversation
// WITH an agent, and "list sessions for agent X" is a session-level query.
export const Session = z.object({
id: z.string(),
agentId: z.string().nullable(),
title: z.string().nullable(),
createdAt: z.string(),
updatedAt: z.string(),
});
export const CreateSessionInput = z.object({
agentId: z.string().nullable().optional(),
title: z.string().nullable().optional(),
});
export const SendMessageOptions = z.object({
provider: z.string().nullable().optional(),
model: z.string().nullable().optional(),
permissionMode: PermissionMode.optional(),
});

View file

@ -53,6 +53,53 @@ const migrations: Record<string, Migration> = {
await db.schema.dropTable("agent_loop_turns").ifExists().execute();
},
},
"2026-06-12_0003_sessions": {
async up(db: MigrationDb): Promise<void> {
await db.schema
.createTable("sessions")
.ifNotExists()
.addColumn("id", "text", (col) => col.primaryKey())
.addColumn("agent_id", "text")
.addColumn("title", "text")
.addColumn("created_at", "text", (col) => col.notNull())
.addColumn("updated_at", "text", (col) => col.notNull())
.execute();
await db.schema
.createIndex("sessions_updated_at_idx")
.ifNotExists()
.on("sessions")
.column("updated_at")
.execute();
await db.schema
.alterTable("agent_loop_turns")
.addColumn("session_id", "text")
.execute();
await db.schema
.alterTable("agent_loop_turns")
.addColumn("session_seq", "integer")
.execute();
// Tripwire: a second writer racing past the per-session mutex must
// fail loudly instead of silently forking the turn chain. NULL
// session_ids never conflict (standalone turns).
await db.schema
.createIndex("agent_loop_turns_session_seq_uniq")
.ifNotExists()
.unique()
.on("agent_loop_turns")
.columns(["session_id", "session_seq"])
.execute();
},
async down(db: MigrationDb): Promise<void> {
await db.schema.dropIndex("agent_loop_turns_session_seq_uniq").ifExists().execute();
await db.schema.alterTable("agent_loop_turns").dropColumn("session_seq").execute();
await db.schema.alterTable("agent_loop_turns").dropColumn("session_id").execute();
await db.schema.dropIndex("sessions_updated_at_idx").ifExists().execute();
await db.schema.dropTable("sessions").ifExists().execute();
},
},
};
class InCodeMigrationProvider implements MigrationProvider {

View file

@ -14,6 +14,8 @@ export interface AgentLoopTurnsTable {
provider: string | null;
model: string | null;
permission_mode: string;
session_id: string | null;
session_seq: number | null;
messages: string; // JSON: MessageList
permission_requests: string; // JSON: PermissionRequest[]
permission_decisions: string; // JSON: PermissionDecision[]
@ -25,7 +27,16 @@ export interface AgentLoopTurnsTable {
completed_at: string | null;
}
export interface SessionsTable {
id: string;
agent_id: string | null;
title: string | null;
created_at: TimestampColumn;
updated_at: TimestampColumn;
}
export interface Database {
storage_metadata: StorageMetadataTable;
agent_loop_turns: AgentLoopTurnsTable;
sessions: SessionsTable;
}