Dedup turn transcripts, record model usage, make event delivery live

Three follow-ups from the runtime design review:

Transcript prefix dedup (storage-level, transparent):
- A session turn's input starts with the previous turn's closed transcript
  (copy-forward); storing it again made session storage quadratic and every
  fact append rewrote the whole transcript. Stores now keep only the suffix
  past prefix_length and recompute the prefix on read from the immutable
  previous turn (closedTranscript moved to agent-loop/types.ts; shared
  split/join helpers in prefix-dedup.ts with loud tripwires).
- Opportunistic: input that doesn't extend the previous transcript (e.g.
  future compaction summaries) falls back to whole-row storage. Nothing
  above the stores changed; InMemoryTurnStore mirrors SQLite exactly.
- Migration 0005 adds prefix_length (default 0 = stored whole).

Per-model-call usage recording:
- New modelUsage fact log on the turn: one entry per model call, committed
  in the same write as the assistant message it paid for. ModelAdapter
  results now carry {message, usage}; VercelModelAdapter treats usage
  reporting failures as null rather than failing a completed step.
- totalUsage() derives turn aggregates; null means never reported.
- Migration 0004 adds the model_usage column.

EventStream: replay removed, bus-style live delivery:
- Events go to consumers attached at push time and are dropped otherwise,
  matching the runtime's IBus philosophy (facts persisted, deltas cosmetic,
  renderer reconciles from snapshots). No buffering without consumers;
  per-iterator queues bounded by consumer lag. Iterators attach
  synchronously at creation so the loop's own for-await never misses events.

Also: dispatched-call closures get honest wording ("may have completed
externally") distinct from interrupted and never-ran; SqliteTurnStore.update
keeps the no-op write guard alongside the prefix_length read.

8 new tests (87 total).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Ramnique Singh 2026-06-12 21:44:53 +05:30
parent 8a5427e841
commit 91bb3cc8cd
14 changed files with 608 additions and 94 deletions

View file

@ -8,13 +8,19 @@ import {
import { AgentLoopImpl } from "./agent-loop.js";
import { EventStream } from "./event-stream.js";
import { InMemoryTurnStore } from "./in-memory-turn-store.js";
import type { ModelAdapter, ModelStreamRequest } from "./model-adapter.js";
import type {
ModelAdapter,
ModelStepResult,
ModelStepUsage,
ModelStreamRequest,
} from "./model-adapter.js";
import type { PermissionClassification, PermissionGate } from "./permission-gate.js";
import type { ToolRunner, ToolRunResult } from "./tool-runner.js";
import type { TurnStore } from "./turn-store.js";
import {
AgentLoopTurn,
deriveTurnStatus,
totalUsage,
type ModelStreamEvent,
} from "./types.js";
@ -39,7 +45,12 @@ function assistantToolCalls(
}
type ModelStep =
| { kind: "message"; message: z.infer<typeof AssistantMessage>; deltas?: string[] }
| {
kind: "message";
message: z.infer<typeof AssistantMessage>;
deltas?: string[];
usage?: ModelStepUsage;
}
| { kind: "error"; error: unknown }
| { kind: "hang" };
@ -48,9 +59,9 @@ class FakeModelAdapter implements ModelAdapter {
constructor(private steps: ModelStep[]) {}
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>> {
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, ModelStepResult> {
this.calls++;
const out = new EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>();
const out = new EventStream<ModelStreamEvent, ModelStepResult>();
const step = this.steps.shift();
void (async () => {
await Promise.resolve();
@ -86,7 +97,7 @@ class FakeModelAdapter implements ModelAdapter {
}
}
out.push({ type: "finish", message: step.message });
out.end(step.message);
out.end({ message: step.message, usage: step.usage ?? null });
})();
return out;
}
@ -203,6 +214,7 @@ function emptyTurn(
permissionDecisions: [],
startedTools: [],
dispatchedTools: [],
modelUsage: [],
error: null,
completedAt: null,
createdAt: now,
@ -740,6 +752,49 @@ describe("AgentLoopImpl", () => {
expect(deriveTurnStatus(turn)).toBe("completed");
});
it("records one usage fact per model call and derives the turn total", async () => {
const usage = (inputTokens: number, outputTokens: number): ModelStepUsage => ({
inputTokens,
outputTokens,
totalTokens: inputTokens + outputTokens,
reasoningTokens: null,
cachedInputTokens: null,
});
const { loop } = makeLoop({
steps: [
{
kind: "message",
message: assistantToolCalls(toolCall("tc1", "calc")),
usage: usage(100, 20),
},
{ kind: "message", message: assistantText("done"), usage: usage(150, 30) },
],
});
const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(turn.modelUsage).toHaveLength(2);
expect(turn.modelUsage[0]).toMatchObject({ inputTokens: 100, outputTokens: 20 });
expect(turn.modelUsage[1]).toMatchObject({ inputTokens: 150, outputTokens: 30 });
expect(turn.modelUsage.every((u) => typeof u.at === "string")).toBe(true);
expect(totalUsage(turn)).toEqual({
inputTokens: 250,
outputTokens: 50,
totalTokens: 300,
reasoningTokens: null,
cachedInputTokens: null,
});
});
it("a model step without reported usage records no usage fact", async () => {
const { loop } = makeLoop({
steps: [{ kind: "message", message: assistantText("hi") }],
});
const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result;
expect(turn.modelUsage).toEqual([]);
expect(totalUsage(turn).totalTokens).toBeNull();
});
it("getTurn returns the persisted turn; unknown ids reject", async () => {
const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi") }] });
const created = await (await loop.createTurn({ messages: [userMsg("hello")] })).result;

View file

@ -115,6 +115,7 @@ export class AgentLoopImpl implements AgentLoop {
permissionDecisions: [],
startedTools: [],
dispatchedTools: [],
modelUsage: [],
error: null,
completedAt: null,
createdAt: now,
@ -398,8 +399,13 @@ export class AgentLoopImpl implements AgentLoop {
for await (const event of modelStream) {
stream.push(event);
}
const assistantMessage = await modelStream.result;
turn.messages.push(assistantMessage);
const step = await modelStream.result;
turn.messages.push(step.message);
// The step's usage is a fact like any other — committed in the
// same write as the message it paid for.
if (step.usage !== null) {
turn.modelUsage.push({ ...step.usage, at: nowIso() });
}
await this.persist(turn);
} catch (error) {
// stopped: facts stay as persisted; stopTurn's queued job

View file

@ -20,13 +20,27 @@ describe("EventStream", () => {
expect(await stream.result).toBe("done");
});
it("delivers events buffered before iteration starts", async () => {
it("is live: events pushed before a consumer attaches are dropped", async () => {
const stream = new EventStream<number, string>();
stream.push(1);
stream.push(1); // nobody listening — dropped, not buffered
const collecting = collect(stream);
stream.push(2);
stream.end("done");
expect(await collect(stream)).toEqual([1, 2]);
expect(await collecting).toEqual([2]);
});
it("delivers to every consumer attached at push time", async () => {
const stream = new EventStream<number, string>();
const a = collect(stream);
stream.push(1);
const b = collect(stream); // late subscriber: gets only what follows
stream.push(2);
stream.end("done");
expect(await a).toEqual([1, 2]);
expect(await b).toEqual([2]);
});
it("resolves result without any event consumer", async () => {

View file

@ -1,9 +1,17 @@
// Tiny EventStream<TEvent, TResult>: push events, complete with a result.
// Consumers can iterate events (streaming) or just await the result
// (await-to-rest); consuming events is never required for correctness.
//
// Live, bus-style delivery (same philosophy as the runtime's IBus): an event
// is delivered to the consumers attached at the moment it is pushed, and
// dropped otherwise — there is NO replay for late subscribers and no buffering
// when nobody listens. Events are cosmetic; every fact is persisted, so a
// consumer that attaches late (or misses events entirely) reconciles from the
// stored turn. Awaiting `result` never requires consuming events.
//
// Memory: a push with no consumers costs nothing; an attached consumer buffers
// only its own lag, freed as it iterates.
export class EventStream<TEvent, TResult> implements AsyncIterable<TEvent> {
private buffer: TEvent[] = [];
private listeners = new Set<TEvent[]>();
private waiters: Array<() => void> = [];
private done = false;
@ -24,7 +32,7 @@ export class EventStream<TEvent, TResult> implements AsyncIterable<TEvent> {
push(event: TEvent): void {
if (this.done) return;
this.buffer.push(event);
for (const queue of this.listeners) queue.push(event);
this.wake();
}
@ -48,14 +56,29 @@ export class EventStream<TEvent, TResult> implements AsyncIterable<TEvent> {
for (const waiter of waiters) waiter();
}
async *[Symbol.asyncIterator](): AsyncIterator<TEvent> {
let index = 0;
for (;;) {
while (index < this.buffer.length) {
yield this.buffer[index++];
}
if (this.done) return;
await new Promise<void>((resolve) => this.waiters.push(resolve));
}
// Hand-rolled (not an async generator) so the consumer attaches
// SYNCHRONOUSLY when the iterator is created — `for await` does this at
// loop entry. A generator body would only run on the first next(), one
// microtask later, silently losing the events pushed in between.
[Symbol.asyncIterator](): AsyncIterator<TEvent> {
const queue: TEvent[] = [];
this.listeners.add(queue);
const detach = async (): Promise<IteratorResult<TEvent>> => {
this.listeners.delete(queue);
return { value: undefined, done: true };
};
return {
next: async (): Promise<IteratorResult<TEvent>> => {
for (;;) {
if (queue.length > 0) {
return { value: queue.shift()!, done: false };
}
if (this.done) return detach();
await new Promise<void>((resolve) => this.waiters.push(resolve));
}
},
// for-await calls this on break/throw — drop the queue eagerly
return: detach,
};
}
}

View file

@ -1,18 +1,27 @@
import { z } from "zod";
import { AgentLoopTurn } from "./types.js";
import { joinTranscript, splitTranscript } from "./prefix-dedup.js";
import type { TurnStore } from "./turn-store.js";
type StoredTurn = {
// turn with `messages` holding only the delta past prefixLength
turn: z.infer<typeof AgentLoopTurn>;
prefixLength: number;
};
// Mirrors SqliteTurnStore's behavior — including transcript prefix dedup —
// so unit tests exercise the same storage semantics as production.
export class InMemoryTurnStore implements TurnStore {
private turns = new Map<string, z.infer<typeof AgentLoopTurn>>();
private rows = new Map<string, StoredTurn>();
async create(turn: z.infer<typeof AgentLoopTurn>): Promise<void> {
if (this.turns.has(turn.id)) {
if (this.rows.has(turn.id)) {
throw new Error(`Turn already exists: ${turn.id}`);
}
// Mirror the SQLite UNIQUE(session_id, session_seq) tripwire — NULL
// seqs never conflict, matching SQLite's distinct-NULLs semantics.
if (turn.sessionId !== null && turn.sessionSeq !== null) {
for (const existing of this.turns.values()) {
for (const { turn: existing } of this.rows.values()) {
if (existing.sessionId === turn.sessionId && existing.sessionSeq === turn.sessionSeq) {
throw new Error(
`Turn with session seq already exists: ${turn.sessionId}#${turn.sessionSeq}`,
@ -20,19 +29,31 @@ export class InMemoryTurnStore implements TurnStore {
}
}
}
this.turns.set(turn.id, structuredClone(turn));
const prev = this.previousTurn(turn.sessionId, turn.sessionSeq);
const { prefixLength, delta } = splitTranscript(turn, prev);
this.rows.set(turn.id, {
turn: structuredClone({ ...turn, messages: delta }),
prefixLength,
});
}
async get(id: string): Promise<z.infer<typeof AgentLoopTurn> | null> {
const turn = this.turns.get(id);
return turn ? structuredClone(turn) : null;
const row = this.rows.get(id);
return row ? this.materialize(row) : null;
}
async update(turn: z.infer<typeof AgentLoopTurn>): Promise<void> {
if (!this.turns.has(turn.id)) {
const row = this.rows.get(turn.id);
if (!row) {
throw new Error(`Turn not found: ${turn.id}`);
}
this.turns.set(turn.id, structuredClone(turn));
if (turn.messages.length < row.prefixLength) {
throw new Error(`Turn ${turn.id} shrank below its stored transcript prefix`);
}
this.rows.set(turn.id, {
turn: structuredClone({ ...turn, messages: turn.messages.slice(row.prefixLength) }),
prefixLength: row.prefixLength,
});
}
async latestForSession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn> | null> {
@ -41,9 +62,43 @@ export class InMemoryTurnStore implements TurnStore {
}
async listBySession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn>[]> {
return [...this.turns.values()]
.filter((turn) => turn.sessionId === sessionId)
.sort((a, b) => (a.sessionSeq ?? 0) - (b.sessionSeq ?? 0))
.map((turn) => structuredClone(turn));
const rows = [...this.rows.values()]
.filter(({ turn }) => turn.sessionId === sessionId)
.sort((a, b) => (a.turn.sessionSeq ?? 0) - (b.turn.sessionSeq ?? 0));
// Fold forward: each materialized turn is the prefix source for the next.
const out: z.infer<typeof AgentLoopTurn>[] = [];
let prev: z.infer<typeof AgentLoopTurn> | null = null;
for (const row of rows) {
const turn = structuredClone(row.turn);
const prefixSource =
prev !== null && prev.sessionSeq === (turn.sessionSeq ?? 0) - 1 ? prev : null;
turn.messages = joinTranscript(turn.id, prefixSource, row.prefixLength, turn.messages);
out.push(turn);
prev = turn;
}
return out;
}
private materialize(row: StoredTurn): z.infer<typeof AgentLoopTurn> {
const turn = structuredClone(row.turn);
if (row.prefixLength > 0) {
// recursion bottoms out at seq 1 (or the first non-deduped turn)
const prev = this.previousTurn(turn.sessionId, turn.sessionSeq);
turn.messages = joinTranscript(turn.id, prev, row.prefixLength, turn.messages);
}
return turn;
}
private previousTurn(
sessionId: string | null,
sessionSeq: number | null,
): z.infer<typeof AgentLoopTurn> | null {
if (sessionId === null || sessionSeq === null || sessionSeq <= 1) return null;
for (const row of this.rows.values()) {
if (row.turn.sessionId === sessionId && row.turn.sessionSeq === sessionSeq - 1) {
return this.materialize(row);
}
}
return null;
}
}

View file

@ -10,7 +10,7 @@ import { convertFromMessages } from "../agents/runtime.js";
import { createProvider } from "../models/models.js";
import { resolveProviderConfig } from "../models/defaults.js";
import { EventStream } from "./event-stream.js";
import type { ModelStreamEvent, ToolDefinition } from "./types.js";
import type { ModelStreamEvent, ModelUsage, ToolDefinition } from "./types.js";
export type ModelStreamRequest = {
provider: string | null;
@ -20,24 +20,33 @@ export type ModelStreamRequest = {
signal: AbortSignal;
};
// Usage as reported by the provider for one model step; null when the
// provider reported nothing (the loop then records no usage fact).
export type ModelStepUsage = Omit<z.infer<typeof ModelUsage>, "at">;
export type ModelStepResult = {
message: z.infer<typeof AssistantMessage>;
usage: ModelStepUsage | null;
};
// Streams one model step. Iterate for deltas, or just await `.result` for the
// final complete AssistantMessage. The loop commits only the complete message;
// deltas are never persisted.
// final complete AssistantMessage + usage. The loop commits only the complete
// message; deltas are never persisted.
//
// Contract: `.result` is authoritative — it MUST resolve with the complete
// message or reject on failure/abort (the loop distinguishes the two via its
// own AbortSignal). `error` events are observational only; the loop ignores
// them.
export interface ModelAdapter {
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>;
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, ModelStepResult>;
}
// Thin adapter over the existing provider factory + Vercel AI SDK streamText.
// All retry/failover policy stays out of the agent loop; if a step fails, the
// stream emits an `error` event and the loop records a turn-level error.
export class VercelModelAdapter implements ModelAdapter {
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>> {
const out = new EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>();
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, ModelStepResult> {
const out = new EventStream<ModelStreamEvent, ModelStepResult>();
void this.run(req, out).catch((error: unknown) => {
out.push({ type: "error", error });
out.fail(error);
@ -47,7 +56,7 @@ export class VercelModelAdapter implements ModelAdapter {
private async run(
req: ModelStreamRequest,
out: EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>,
out: EventStream<ModelStreamEvent, ModelStepResult>,
): Promise<void> {
if (!req.provider || !req.model) {
throw new Error("Agent loop turn has no provider/model configured");
@ -126,8 +135,20 @@ export class VercelModelAdapter implements ModelAdapter {
role: "assistant",
content: parts.length > 0 ? parts : "",
};
// Usage is best-effort: a provider that fails to report it must not
// fail the step the model itself completed.
const usage = await result.usage.then(
(u) => ({
inputTokens: u.inputTokens ?? null,
outputTokens: u.outputTokens ?? null,
totalTokens: u.totalTokens ?? null,
reasoningTokens: u.reasoningTokens ?? null,
cachedInputTokens: u.cachedInputTokens ?? null,
}),
() => null,
);
out.push({ type: "finish", message });
out.end(message);
out.end({ message, usage });
}
}

View file

@ -0,0 +1,63 @@
import { z } from "zod";
import { Message } from "@x/shared/dist/message.js";
import { AgentLoopTurn, closedTranscript } from "./types.js";
// Transcript prefix dedup, shared by the turn stores.
//
// A session turn's input begins with the previous turn's closed transcript
// (copy-forward history). Storing that prefix again in every turn makes
// session storage quadratic and rewrites the whole transcript on every fact
// append. Instead, stores keep only the suffix the turn adds, plus the prefix
// LENGTH; on read, the prefix is recomputed from the (immutable, terminal)
// previous turn via closedTranscript and re-attached.
//
// The dedup is OPPORTUNISTIC: if a turn's messages do not start with exactly
// the previous turn's closed transcript (e.g. a future compaction feature
// sends a summary instead), the full messages are stored and nothing breaks.
// Callers above the store never see any of this — turns read back whole.
export type SplitTranscript = {
prefixLength: number;
delta: z.infer<typeof Message>[];
};
// prev is the MATERIALIZED previous session turn (seq - 1), or null if there
// is none. Returns the storable suffix; prefixLength 0 means "stored whole".
export function splitTranscript(
turn: z.infer<typeof AgentLoopTurn>,
prev: z.infer<typeof AgentLoopTurn> | null,
): SplitTranscript {
if (prev === null) return { prefixLength: 0, delta: turn.messages };
const closed = closedTranscript(prev);
if (closed.length === 0 || closed.length > turn.messages.length) {
return { prefixLength: 0, delta: turn.messages };
}
const head = turn.messages.slice(0, closed.length);
if (JSON.stringify(head) !== JSON.stringify(closed)) {
return { prefixLength: 0, delta: turn.messages };
}
return { prefixLength: closed.length, delta: turn.messages.slice(closed.length) };
}
// Inverse of splitTranscript. prev must be the materialized previous session
// turn whenever prefixLength > 0 — its absence means the chain is broken
// (a deleted or missing predecessor), which must fail loudly, never return a
// transcript with a silently missing prefix.
export function joinTranscript(
turnId: string,
prev: z.infer<typeof AgentLoopTurn> | null,
prefixLength: number,
delta: z.infer<typeof Message>[],
): z.infer<typeof Message>[] {
if (prefixLength === 0) return delta;
if (prev === null) {
throw new Error(`Turn ${turnId} requires its previous session turn to materialize`);
}
const closed = closedTranscript(prev);
if (closed.length !== prefixLength) {
throw new Error(
`Transcript prefix mismatch for turn ${turnId}: stored ${prefixLength}, derived ${closed.length}`,
);
}
return [...closed, ...delta];
}

View file

@ -81,6 +81,16 @@ function sampleTurn(
],
startedTools: [{ toolCallId: "tc1", startedAt: "2026-06-12T00:00:02Z" }],
dispatchedTools: [],
modelUsage: [
{
inputTokens: 120,
outputTokens: 45,
totalTokens: 165,
reasoningTokens: null,
cachedInputTokens: 80,
at: "2026-06-12T00:00:03Z",
},
],
error: null,
completedAt: null,
createdAt: "2026-06-12T00:00:00Z",
@ -159,6 +169,98 @@ describe("SqliteTurnStore", () => {
await store.create(sampleTurn("t4"));
});
describe("transcript prefix dedup", () => {
// t1's tool call is resolved, so its closed transcript IS its messages
// (3 of them) — t2 extends it with a new exchange.
function chainTurns() {
const t1 = sampleTurn("t1", { sessionId: "s1", sessionSeq: 1 });
const t2 = sampleTurn("t2", {
sessionId: "s1",
sessionSeq: 2,
messages: [
...t1.messages,
{ role: "user", content: "second question" },
{ role: "assistant", content: "second answer" },
],
});
return { t1, t2 };
}
it("stores only the delta at rest; reads materialize transparently", async () => {
const { store, db } = await loadStore();
const { t1, t2 } = chainTurns();
await store.create(t1);
await store.create(t2);
expect(await store.get("t2")).toEqual(t2);
expect(await store.listBySession("s1")).toEqual([t1, t2]);
expect(await store.latestForSession("s1")).toEqual(t2);
const raw = await db
.selectFrom("agent_loop_turns")
.select(["messages", "prefix_length"])
.where("id", "=", "t2")
.executeTakeFirstOrThrow();
expect(raw.prefix_length).toBe(3);
expect(JSON.parse(raw.messages)).toHaveLength(2); // only the new exchange
expect(raw.messages).not.toContain("let me check"); // t1 content not duplicated
});
it("updates rewrite only the delta; the prefix stays deduped", async () => {
const { store, db } = await loadStore();
const { t1, t2 } = chainTurns();
await store.create(t1);
await store.create(t2);
const updated = {
...t2,
messages: [...t2.messages, { role: "user" as const, content: "follow-up" }],
updatedAt: "2026-06-12T00:01:00Z",
};
await store.update(updated);
expect(await store.get("t2")).toEqual(updated);
const raw = await db
.selectFrom("agent_loop_turns")
.select(["messages", "prefix_length"])
.where("id", "=", "t2")
.executeTakeFirstOrThrow();
expect(raw.prefix_length).toBe(3);
expect(JSON.parse(raw.messages)).toHaveLength(3);
});
it("stores the whole transcript when the input does not extend the previous turn", async () => {
const { store, db } = await loadStore();
const { t1 } = chainTurns();
await store.create(t1);
// compaction-style input: a summary instead of the prior transcript
const t2 = sampleTurn("t2", {
sessionId: "s1",
sessionSeq: 2,
messages: [{ role: "user", content: "summary of the conversation so far" }],
});
await store.create(t2);
expect(await store.get("t2")).toEqual(t2);
const raw = await db
.selectFrom("agent_loop_turns")
.select("prefix_length")
.where("id", "=", "t2")
.executeTakeFirstOrThrow();
expect(raw.prefix_length).toBe(0);
});
it("fails loudly when a deduped turn's predecessor is missing", async () => {
const { store, db } = await loadStore();
const { t1, t2 } = chainTurns();
await store.create(t1);
await store.create(t2);
await db.deleteFrom("agent_loop_turns").where("id", "=", "t1").execute();
await expect(store.get("t2")).rejects.toThrow("previous session turn");
});
});
it("fails loudly on a corrupted JSON column", async () => {
const { store, db } = await loadStore();
await store.create(sampleTurn("t1"));

View file

@ -1,12 +1,14 @@
import type { Insertable, Kysely, Selectable } from "kysely";
import { z } from "zod";
import { MessageList } from "@x/shared/dist/message.js";
import { Message, MessageList } from "@x/shared/dist/message.js";
import type { AgentLoopTurnsTable, Database } from "../storage/schema.js";
import { joinTranscript, splitTranscript } from "./prefix-dedup.js";
import type { TurnStore } from "./turn-store.js";
import {
AgentLoopError,
AgentLoopTurn,
DispatchedTool,
ModelUsage,
PermissionDecision,
PermissionMode,
PermissionRequest,
@ -16,13 +18,19 @@ import {
// Accepts a Kysely<Database> from the existing getDb(); it does not own the
// storage lifecycle (never calls initStorage()). Every JSON column is
// zod-parsed on read so schema drift fails loudly at the boundary.
//
// Session turns are stored with their copy-forward prefix deduplicated (see
// prefix-dedup.ts): the messages column holds only the suffix past
// prefix_length, and reads re-attach the prefix from the previous turn.
export class SqliteTurnStore implements TurnStore {
constructor(private db: Kysely<Database>) {}
async create(turn: z.infer<typeof AgentLoopTurn>): Promise<void> {
const prev = await this.previousTurn(turn.sessionId, turn.sessionSeq);
const { prefixLength, delta } = splitTranscript(turn, prev);
await this.db
.insertInto("agent_loop_turns")
.values(toRow(turn))
.values(toRow(turn, delta, prefixLength))
.execute();
}
@ -32,44 +40,100 @@ export class SqliteTurnStore implements TurnStore {
.selectAll()
.where("id", "=", id)
.executeTakeFirst();
return row ? fromRow(row) : null;
if (!row) return null;
if (row.prefix_length === 0) return fromRow(row);
if (row.session_id === null || row.session_seq === null) {
// only session turns are ever stored deduped
throw new Error(`Turn ${id} has a transcript prefix but no session linkage`);
}
// Materializing requires the chain up to this turn; fold forward.
const chain = await this.foldSession(row.session_id, row.session_seq);
const turn = chain[chain.length - 1];
if (!turn || turn.id !== id) {
throw new Error(`Turn ${id} requires its previous session turn to materialize`);
}
return turn;
}
async update(turn: z.infer<typeof AgentLoopTurn>): Promise<void> {
const { id, ...rest } = toRow(turn);
const existing = await this.db
.selectFrom("agent_loop_turns")
.select("prefix_length")
.where("id", "=", turn.id)
.executeTakeFirst();
if (!existing) {
throw new Error(`Turn not found: ${turn.id}`);
}
if (turn.messages.length < existing.prefix_length) {
throw new Error(`Turn ${turn.id} shrank below its stored transcript prefix`);
}
const delta = turn.messages.slice(existing.prefix_length);
const { id, ...rest } = toRow(turn, delta, existing.prefix_length);
const result = await this.db
.updateTable("agent_loop_turns")
.set(rest)
.where("id", "=", id)
.executeTakeFirst();
// The SELECT above proved existence, but keep the write itself honest:
// a row vanishing in between must never silently no-op.
if (result.numUpdatedRows === 0n) {
throw new Error(`Turn not found: ${id}`);
}
}
async latestForSession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn> | null> {
const row = await this.db
.selectFrom("agent_loop_turns")
.selectAll()
.where("session_id", "=", sessionId)
.orderBy("session_seq", "desc")
.limit(1)
.executeTakeFirst();
return row ? fromRow(row) : null;
const turns = await this.foldSession(sessionId, null);
return turns.length > 0 ? turns[turns.length - 1] : null;
}
async listBySession(sessionId: string): Promise<z.infer<typeof AgentLoopTurn>[]> {
const rows = await this.db
return this.foldSession(sessionId, null);
}
// Loads a session's turns in seq order (up to and including uptoSeq, or
// all) and materializes each transcript from the previous turn's.
private async foldSession(
sessionId: string,
uptoSeq: number | null,
): Promise<z.infer<typeof AgentLoopTurn>[]> {
let query = this.db
.selectFrom("agent_loop_turns")
.selectAll()
.where("session_id", "=", sessionId)
.orderBy("session_seq", "asc")
.execute();
return rows.map(fromRow);
.orderBy("session_seq", "asc");
if (uptoSeq !== null) {
query = query.where("session_seq", "<=", uptoSeq);
}
const rows = await query.execute();
const out: z.infer<typeof AgentLoopTurn>[] = [];
let prev: z.infer<typeof AgentLoopTurn> | null = null;
for (const row of rows) {
const turn = fromRow(row);
const prefixSource =
prev !== null && prev.sessionSeq === (turn.sessionSeq ?? 0) - 1 ? prev : null;
turn.messages = joinTranscript(turn.id, prefixSource, row.prefix_length, turn.messages);
out.push(turn);
prev = turn;
}
return out;
}
private async previousTurn(
sessionId: string | null,
sessionSeq: number | null,
): Promise<z.infer<typeof AgentLoopTurn> | null> {
if (sessionId === null || sessionSeq === null || sessionSeq <= 1) return null;
const chain = await this.foldSession(sessionId, sessionSeq - 1);
const prev = chain[chain.length - 1];
return prev && prev.sessionSeq === sessionSeq - 1 ? prev : null;
}
}
function toRow(turn: z.infer<typeof AgentLoopTurn>): Insertable<AgentLoopTurnsTable> {
function toRow(
turn: z.infer<typeof AgentLoopTurn>,
delta: z.infer<typeof Message>[],
prefixLength: number,
): Insertable<AgentLoopTurnsTable> {
return {
id: turn.id,
agent_id: turn.agentId,
@ -78,11 +142,13 @@ function toRow(turn: z.infer<typeof AgentLoopTurn>): Insertable<AgentLoopTurnsTa
permission_mode: turn.permissionMode,
session_id: turn.sessionId,
session_seq: turn.sessionSeq,
messages: JSON.stringify(turn.messages),
messages: JSON.stringify(delta),
prefix_length: prefixLength,
permission_requests: JSON.stringify(turn.permissionRequests),
permission_decisions: JSON.stringify(turn.permissionDecisions),
started_tools: JSON.stringify(turn.startedTools),
dispatched_tools: JSON.stringify(turn.dispatchedTools),
model_usage: JSON.stringify(turn.modelUsage),
error: turn.error === null ? null : JSON.stringify(turn.error),
created_at: turn.createdAt,
updated_at: turn.updatedAt,
@ -90,6 +156,8 @@ function toRow(turn: z.infer<typeof AgentLoopTurn>): Insertable<AgentLoopTurnsTa
};
}
// Note: `messages` holds only the stored delta — callers must re-attach the
// prefix via joinTranscript before handing the turn out.
function fromRow(row: Selectable<AgentLoopTurnsTable>): z.infer<typeof AgentLoopTurn> {
return {
id: row.id,
@ -104,6 +172,7 @@ function fromRow(row: Selectable<AgentLoopTurnsTable>): z.infer<typeof AgentLoop
permissionDecisions: z.array(PermissionDecision).parse(JSON.parse(row.permission_decisions)),
startedTools: z.array(StartedTool).parse(JSON.parse(row.started_tools)),
dispatchedTools: z.array(DispatchedTool).parse(JSON.parse(row.dispatched_tools)),
modelUsage: z.array(ModelUsage).parse(JSON.parse(row.model_usage)),
error: row.error === null ? null : AgentLoopError.parse(JSON.parse(row.error)),
createdAt: row.created_at,
updatedAt: row.updated_at,

View file

@ -1,6 +1,7 @@
import { z } from "zod";
import {
AssistantMessage,
Message,
MessageList,
ToolCallPart,
} from "@x/shared/dist/message.js";
@ -47,6 +48,17 @@ export const DispatchedTool = z.object({
dispatchedAt: z.string(),
});
// One entry per model call. Token counts are as reported by the provider —
// null when the provider did not report that field. Aggregate via totalUsage.
export const ModelUsage = z.object({
inputTokens: z.number().nullable(),
outputTokens: z.number().nullable(),
totalTokens: z.number().nullable(),
reasoningTokens: z.number().nullable(),
cachedInputTokens: z.number().nullable(),
at: z.string(),
});
export const AgentLoopError = z.object({
message: z.string(),
code: z.string().optional(),
@ -74,6 +86,7 @@ export const AgentLoopTurn = z.object({
permissionDecisions: z.array(PermissionDecision),
startedTools: z.array(StartedTool),
dispatchedTools: z.array(DispatchedTool),
modelUsage: z.array(ModelUsage),
// set-once scalars
error: AgentLoopError.nullable(),
@ -200,3 +213,57 @@ export function deriveTurnStatus(turn: z.infer<typeof AgentLoopTurn>): TurnStatu
}
return "idle";
}
// The transcript as a successor turn would see it: a terminal turn's dangling
// tool calls are closed out with synthetic ToolMessages so a follow-up never
// re-executes — or hangs on — stale calls. Pure and deterministic over an
// immutable (terminal) turn, which is what lets the sessions layer build the
// next turn's input from it AND lets stores reproduce it byte-for-byte.
export function closedTranscript(
turn: z.infer<typeof AgentLoopTurn>,
): z.infer<typeof Message>[] {
const messages = [...turn.messages];
for (const call of unresolvedToolCalls(turn)) {
messages.push({
role: "tool",
content: closureContent(deriveToolCallState(turn, call.toolCallId)),
toolCallId: call.toolCallId,
toolName: call.toolName,
});
}
return messages;
}
// Honest per-state wording for a dangling call: how far did it actually get?
function closureContent(state: ToolCallState): string {
switch (state) {
case "interrupted":
// execution began in-process; the side effect may have landed
return "Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran.";
case "dispatched":
// delegated to an external runner; it may still finish out there
return "Tool was dispatched but its result never arrived; it may have completed externally. Do not assume it ran or that it failed.";
default:
// never reached execution (unevaluated / awaiting permission / cleared-but-not-started)
return "Tool was not executed: the turn was stopped before this call ran.";
}
}
// Sum of all model calls in the turn. A field is null only if no call
// reported it; otherwise unreported entries count as 0 toward the sum.
export function totalUsage(
turn: z.infer<typeof AgentLoopTurn>,
): Omit<z.infer<typeof ModelUsage>, "at"> {
const sum = (field: "inputTokens" | "outputTokens" | "totalTokens" | "reasoningTokens" | "cachedInputTokens") => {
const reported = turn.modelUsage.map((u) => u[field]).filter((v) => v !== null);
if (reported.length === 0) return null;
return reported.reduce((a, b) => a + b, 0);
};
return {
inputTokens: sum("inputTokens"),
outputTokens: sum("outputTokens"),
totalTokens: sum("totalTokens"),
reasoningTokens: sum("reasoningTokens"),
cachedInputTokens: sum("cachedInputTokens"),
};
}

View file

@ -8,7 +8,11 @@ import {
import { AgentLoopImpl } from "../agent-loop/agent-loop.js";
import { EventStream } from "../agent-loop/event-stream.js";
import { InMemoryTurnStore } from "../agent-loop/in-memory-turn-store.js";
import type { ModelAdapter, ModelStreamRequest } from "../agent-loop/model-adapter.js";
import type {
ModelAdapter,
ModelStepResult,
ModelStreamRequest,
} from "../agent-loop/model-adapter.js";
import type { PermissionGate } from "../agent-loop/permission-gate.js";
import type { ToolRunner, ToolRunResult } from "../agent-loop/tool-runner.js";
import {
@ -48,9 +52,9 @@ class FakeModelAdapter implements ModelAdapter {
constructor(private steps: ModelStep[]) {}
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>> {
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, ModelStepResult> {
this.calls++;
const out = new EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>();
const out = new EventStream<ModelStreamEvent, ModelStepResult>();
const step = this.steps.shift();
void (async () => {
await Promise.resolve();
@ -66,7 +70,7 @@ class FakeModelAdapter implements ModelAdapter {
return;
}
out.push({ type: "finish", message: step.message });
out.end(step.message);
out.end({ message: step.message, usage: null });
})();
return out;
}
@ -132,6 +136,7 @@ function turnFixture(
permissionDecisions: [],
startedTools: [],
dispatchedTools: [],
modelUsage: [],
error: null,
completedAt: null,
createdAt: now,
@ -348,6 +353,37 @@ describe("SessionsImpl", () => {
]);
});
it("a stopped turn's dispatched call is closed out as possibly completed externally", async () => {
const { sessions, loop, turnStore } = makeSessions({
steps: [{ kind: "message", message: assistantText("noted") }],
});
const session = await sessions.createSession();
// crafted waiting turn: tc1 was delegated to an external runner
// (pending), then the user stopped the turn before the result arrived
await turnStore.create(turnFixture("t1", {
sessionId: session.id,
sessionSeq: 1,
messages: [userMsg("run the job"), assistantToolCalls(toolCall("tc1", "background-job"))],
startedTools: [{ toolCallId: "tc1", startedAt: "2026-06-12T00:00:00Z" }],
dispatchedTools: [{ toolCallId: "tc1", dispatchedAt: "2026-06-12T00:00:01Z" }],
}));
await loop.stopTurn("t1");
const closure = {
role: "tool" as const,
content: "Tool was dispatched but its result never arrived; it may have completed externally. Do not assume it ran or that it failed.",
toolCallId: "tc1",
toolName: "background-job",
};
const history = await sessions.getHistory(session.id);
expect(history[history.length - 1]).toEqual(closure);
// the next turn carries the same closure forward
const turn2 = await (await sessions.sendMessage(session.id, [userMsg("ok")])).result;
expect(turn2.messages).toEqual([...history, userMsg("ok"), assistantText("noted")]);
});
it("builds on an errored turn's persisted transcript", async () => {
const { sessions, turnStore } = makeSessions({
steps: [{ kind: "message", message: assistantText("recovered") }],

View file

@ -1,14 +1,13 @@
import crypto from "node:crypto";
import { z } from "zod";
import { Message, MessageList } from "@x/shared/dist/message.js";
import { MessageList } from "@x/shared/dist/message.js";
import type { AgentLoop, TurnHandle } from "../agent-loop/agent-loop.js";
import { KeyedMutex } from "../agent-loop/mutex.js";
import type { TurnStore } from "../agent-loop/turn-store.js";
import {
AgentLoopTurn,
deriveToolCallState,
closedTranscript,
deriveTurnStatus,
unresolvedToolCalls,
} from "../agent-loop/types.js";
import type { SessionStore } from "./session-store.js";
import { CreateSessionInput, SendMessageOptions, Session } from "./types.js";
@ -113,7 +112,7 @@ export class SessionsImpl implements Sessions {
: {}),
sessionId,
sessionSeq: (latest?.sessionSeq ?? 0) + 1,
messages: [...(latest ? historyFrom(latest) : []), ...newMessages],
messages: [...(latest ? closedTranscript(latest) : []), ...newMessages],
});
});
}
@ -123,7 +122,7 @@ export class SessionsImpl implements Sessions {
async getHistory(sessionId: string): Promise<z.infer<typeof MessageList>> {
await this.mustGetSession(sessionId);
const latest = await this.turnStore.latestForSession(sessionId);
return latest ? historyFrom(latest) : [];
return latest ? closedTranscript(latest) : [];
}
async listTurns(sessionId: string): Promise<z.infer<typeof AgentLoopTurn>[]> {
@ -137,26 +136,3 @@ export class SessionsImpl implements Sessions {
return session;
}
}
// Copy-forward history: the next turn's input is the previous turn's full
// transcript. A stopped turn can carry unresolved tool calls; they are closed
// out with synthetic ToolMessages so the new turn never re-executes — or
// hangs on — stale calls. This is the sessions-layer analogue of the
// reducer's interrupted-call handling.
function historyFrom(
turn: z.infer<typeof AgentLoopTurn>,
): z.infer<typeof Message>[] {
const messages = [...turn.messages];
for (const call of unresolvedToolCalls(turn)) {
const interrupted = deriveToolCallState(turn, call.toolCallId) === "interrupted";
messages.push({
role: "tool",
content: interrupted
? "Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran."
: "Tool was not executed: the turn was stopped before this call ran.",
toolCallId: call.toolCallId,
toolName: call.toolName,
});
}
return messages;
}

View file

@ -100,6 +100,31 @@ const migrations: Record<string, Migration> = {
await db.schema.dropTable("sessions").ifExists().execute();
},
},
"2026-06-12_0004_turn_model_usage": {
async up(db: MigrationDb): Promise<void> {
await db.schema
.alterTable("agent_loop_turns")
.addColumn("model_usage", "text", (col) => col.notNull().defaultTo("[]"))
.execute();
},
async down(db: MigrationDb): Promise<void> {
await db.schema.alterTable("agent_loop_turns").dropColumn("model_usage").execute();
},
},
"2026-06-12_0005_turn_transcript_dedup": {
async up(db: MigrationDb): Promise<void> {
// 0 = messages stored whole; N = the first N messages are the
// previous session turn's closed transcript, recomputed on read.
// Existing rows default to 0, so they keep reading back unchanged.
await db.schema
.alterTable("agent_loop_turns")
.addColumn("prefix_length", "integer", (col) => col.notNull().defaultTo(0))
.execute();
},
async down(db: MigrationDb): Promise<void> {
await db.schema.alterTable("agent_loop_turns").dropColumn("prefix_length").execute();
},
},
};
class InCodeMigrationProvider implements MigrationProvider {

View file

@ -16,11 +16,13 @@ export interface AgentLoopTurnsTable {
permission_mode: string;
session_id: string | null;
session_seq: number | null;
messages: string; // JSON: MessageList
messages: string; // JSON: MessageList (delta past prefix_length)
prefix_length: number; // copy-forward prefix deduped at rest; 0 = stored whole
permission_requests: string; // JSON: PermissionRequest[]
permission_decisions: string; // JSON: PermissionDecision[]
started_tools: string; // JSON: StartedTool[]
dispatched_tools: string; // JSON: DispatchedTool[]
model_usage: string; // JSON: ModelUsage[]
error: string | null; // JSON: AgentLoopError
created_at: TimestampColumn;
updated_at: TimestampColumn;