From 91bb3cc8cdef9d940092e7380151939e3dfc4af5 Mon Sep 17 00:00:00 2001 From: Ramnique Singh <30795890+ramnique@users.noreply.github.com> Date: Fri, 12 Jun 2026 21:44:53 +0530 Subject: [PATCH] Dedup turn transcripts, record model usage, make event delivery live Three follow-ups from the runtime design review: Transcript prefix dedup (storage-level, transparent): - A session turn's input starts with the previous turn's closed transcript (copy-forward); storing it again made session storage quadratic and every fact append rewrote the whole transcript. Stores now keep only the suffix past prefix_length and recompute the prefix on read from the immutable previous turn (closedTranscript moved to agent-loop/types.ts; shared split/join helpers in prefix-dedup.ts with loud tripwires). - Opportunistic: input that doesn't extend the previous transcript (e.g. future compaction summaries) falls back to whole-row storage. Nothing above the stores changed; InMemoryTurnStore mirrors SQLite exactly. - Migration 0005 adds prefix_length (default 0 = stored whole). Per-model-call usage recording: - New modelUsage fact log on the turn: one entry per model call, committed in the same write as the assistant message it paid for. ModelAdapter results now carry {message, usage}; VercelModelAdapter treats usage reporting failures as null rather than failing a completed step. - totalUsage() derives turn aggregates; null means never reported. - Migration 0004 adds the model_usage column. EventStream: replay removed, bus-style live delivery: - Events go to consumers attached at push time and are dropped otherwise, matching the runtime's IBus philosophy (facts persisted, deltas cosmetic, renderer reconciles from snapshots). No buffering without consumers; per-iterator queues bounded by consumer lag. Iterators attach synchronously at creation so the loop's own for-await never misses events. Also: dispatched-call closures get honest wording ("may have completed externally") distinct from interrupted and never-ran; SqliteTurnStore.update keeps the no-op write guard alongside the prefix_length read. 8 new tests (87 total). Co-Authored-By: Claude Fable 5 --- .../core/src/agent-loop/agent-loop.test.ts | 65 ++++++++++- .../core/src/agent-loop/agent-loop.ts | 10 +- .../core/src/agent-loop/event-stream.test.ts | 20 +++- .../core/src/agent-loop/event-stream.ts | 49 +++++--- .../src/agent-loop/in-memory-turn-store.ts | 79 +++++++++++-- .../core/src/agent-loop/model-adapter.ts | 37 ++++-- .../core/src/agent-loop/prefix-dedup.ts | 63 +++++++++++ .../src/agent-loop/sqlite-turn-store.test.ts | 102 +++++++++++++++++ .../core/src/agent-loop/sqlite-turn-store.ts | 105 +++++++++++++++--- apps/x/packages/core/src/agent-loop/types.ts | 67 +++++++++++ .../core/src/sessions/sessions.test.ts | 44 +++++++- apps/x/packages/core/src/sessions/sessions.ts | 32 +----- .../x/packages/core/src/storage/migrations.ts | 25 +++++ apps/x/packages/core/src/storage/schema.ts | 4 +- 14 files changed, 608 insertions(+), 94 deletions(-) create mode 100644 apps/x/packages/core/src/agent-loop/prefix-dedup.ts diff --git a/apps/x/packages/core/src/agent-loop/agent-loop.test.ts b/apps/x/packages/core/src/agent-loop/agent-loop.test.ts index 37503dd2..fca16bb4 100644 --- a/apps/x/packages/core/src/agent-loop/agent-loop.test.ts +++ b/apps/x/packages/core/src/agent-loop/agent-loop.test.ts @@ -8,13 +8,19 @@ import { import { AgentLoopImpl } from "./agent-loop.js"; import { EventStream } from "./event-stream.js"; import { InMemoryTurnStore } from "./in-memory-turn-store.js"; -import type { ModelAdapter, ModelStreamRequest } from "./model-adapter.js"; +import type { + ModelAdapter, + ModelStepResult, + ModelStepUsage, + ModelStreamRequest, +} from "./model-adapter.js"; import type { PermissionClassification, PermissionGate } from "./permission-gate.js"; import type { ToolRunner, ToolRunResult } from "./tool-runner.js"; import type { TurnStore } from "./turn-store.js"; import { AgentLoopTurn, deriveTurnStatus, + totalUsage, type ModelStreamEvent, } from "./types.js"; @@ -39,7 +45,12 @@ function assistantToolCalls( } type ModelStep = - | { kind: "message"; message: z.infer; deltas?: string[] } + | { + kind: "message"; + message: z.infer; + deltas?: string[]; + usage?: ModelStepUsage; + } | { kind: "error"; error: unknown } | { kind: "hang" }; @@ -48,9 +59,9 @@ class FakeModelAdapter implements ModelAdapter { constructor(private steps: ModelStep[]) {} - stream(req: ModelStreamRequest): EventStream> { + stream(req: ModelStreamRequest): EventStream { this.calls++; - const out = new EventStream>(); + const out = new EventStream(); const step = this.steps.shift(); void (async () => { await Promise.resolve(); @@ -86,7 +97,7 @@ class FakeModelAdapter implements ModelAdapter { } } out.push({ type: "finish", message: step.message }); - out.end(step.message); + out.end({ message: step.message, usage: step.usage ?? null }); })(); return out; } @@ -203,6 +214,7 @@ function emptyTurn( permissionDecisions: [], startedTools: [], dispatchedTools: [], + modelUsage: [], error: null, completedAt: null, createdAt: now, @@ -740,6 +752,49 @@ describe("AgentLoopImpl", () => { expect(deriveTurnStatus(turn)).toBe("completed"); }); + it("records one usage fact per model call and derives the turn total", async () => { + const usage = (inputTokens: number, outputTokens: number): ModelStepUsage => ({ + inputTokens, + outputTokens, + totalTokens: inputTokens + outputTokens, + reasoningTokens: null, + cachedInputTokens: null, + }); + const { loop } = makeLoop({ + steps: [ + { + kind: "message", + message: assistantToolCalls(toolCall("tc1", "calc")), + usage: usage(100, 20), + }, + { kind: "message", message: assistantText("done"), usage: usage(150, 30) }, + ], + }); + + const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result; + + expect(turn.modelUsage).toHaveLength(2); + expect(turn.modelUsage[0]).toMatchObject({ inputTokens: 100, outputTokens: 20 }); + expect(turn.modelUsage[1]).toMatchObject({ inputTokens: 150, outputTokens: 30 }); + expect(turn.modelUsage.every((u) => typeof u.at === "string")).toBe(true); + expect(totalUsage(turn)).toEqual({ + inputTokens: 250, + outputTokens: 50, + totalTokens: 300, + reasoningTokens: null, + cachedInputTokens: null, + }); + }); + + it("a model step without reported usage records no usage fact", async () => { + const { loop } = makeLoop({ + steps: [{ kind: "message", message: assistantText("hi") }], + }); + const turn = await (await loop.createTurn({ messages: [userMsg("go")] })).result; + expect(turn.modelUsage).toEqual([]); + expect(totalUsage(turn).totalTokens).toBeNull(); + }); + it("getTurn returns the persisted turn; unknown ids reject", async () => { const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi") }] }); const created = await (await loop.createTurn({ messages: [userMsg("hello")] })).result; diff --git a/apps/x/packages/core/src/agent-loop/agent-loop.ts b/apps/x/packages/core/src/agent-loop/agent-loop.ts index 68579efc..283aec23 100644 --- a/apps/x/packages/core/src/agent-loop/agent-loop.ts +++ b/apps/x/packages/core/src/agent-loop/agent-loop.ts @@ -115,6 +115,7 @@ export class AgentLoopImpl implements AgentLoop { permissionDecisions: [], startedTools: [], dispatchedTools: [], + modelUsage: [], error: null, completedAt: null, createdAt: now, @@ -398,8 +399,13 @@ export class AgentLoopImpl implements AgentLoop { for await (const event of modelStream) { stream.push(event); } - const assistantMessage = await modelStream.result; - turn.messages.push(assistantMessage); + const step = await modelStream.result; + turn.messages.push(step.message); + // The step's usage is a fact like any other — committed in the + // same write as the message it paid for. + if (step.usage !== null) { + turn.modelUsage.push({ ...step.usage, at: nowIso() }); + } await this.persist(turn); } catch (error) { // stopped: facts stay as persisted; stopTurn's queued job diff --git a/apps/x/packages/core/src/agent-loop/event-stream.test.ts b/apps/x/packages/core/src/agent-loop/event-stream.test.ts index 805b895a..4400ca70 100644 --- a/apps/x/packages/core/src/agent-loop/event-stream.test.ts +++ b/apps/x/packages/core/src/agent-loop/event-stream.test.ts @@ -20,13 +20,27 @@ describe("EventStream", () => { expect(await stream.result).toBe("done"); }); - it("delivers events buffered before iteration starts", async () => { + it("is live: events pushed before a consumer attaches are dropped", async () => { const stream = new EventStream(); - stream.push(1); + stream.push(1); // nobody listening — dropped, not buffered + + const collecting = collect(stream); stream.push(2); stream.end("done"); - expect(await collect(stream)).toEqual([1, 2]); + expect(await collecting).toEqual([2]); + }); + + it("delivers to every consumer attached at push time", async () => { + const stream = new EventStream(); + const a = collect(stream); + stream.push(1); + const b = collect(stream); // late subscriber: gets only what follows + stream.push(2); + stream.end("done"); + + expect(await a).toEqual([1, 2]); + expect(await b).toEqual([2]); }); it("resolves result without any event consumer", async () => { diff --git a/apps/x/packages/core/src/agent-loop/event-stream.ts b/apps/x/packages/core/src/agent-loop/event-stream.ts index 1f1db7a9..f5a963fe 100644 --- a/apps/x/packages/core/src/agent-loop/event-stream.ts +++ b/apps/x/packages/core/src/agent-loop/event-stream.ts @@ -1,9 +1,17 @@ // Tiny EventStream: push events, complete with a result. -// Consumers can iterate events (streaming) or just await the result -// (await-to-rest); consuming events is never required for correctness. +// +// Live, bus-style delivery (same philosophy as the runtime's IBus): an event +// is delivered to the consumers attached at the moment it is pushed, and +// dropped otherwise — there is NO replay for late subscribers and no buffering +// when nobody listens. Events are cosmetic; every fact is persisted, so a +// consumer that attaches late (or misses events entirely) reconciles from the +// stored turn. Awaiting `result` never requires consuming events. +// +// Memory: a push with no consumers costs nothing; an attached consumer buffers +// only its own lag, freed as it iterates. export class EventStream implements AsyncIterable { - private buffer: TEvent[] = []; + private listeners = new Set(); private waiters: Array<() => void> = []; private done = false; @@ -24,7 +32,7 @@ export class EventStream implements AsyncIterable { push(event: TEvent): void { if (this.done) return; - this.buffer.push(event); + for (const queue of this.listeners) queue.push(event); this.wake(); } @@ -48,14 +56,29 @@ export class EventStream implements AsyncIterable { for (const waiter of waiters) waiter(); } - async *[Symbol.asyncIterator](): AsyncIterator { - let index = 0; - for (;;) { - while (index < this.buffer.length) { - yield this.buffer[index++]; - } - if (this.done) return; - await new Promise((resolve) => this.waiters.push(resolve)); - } + // Hand-rolled (not an async generator) so the consumer attaches + // SYNCHRONOUSLY when the iterator is created — `for await` does this at + // loop entry. A generator body would only run on the first next(), one + // microtask later, silently losing the events pushed in between. + [Symbol.asyncIterator](): AsyncIterator { + const queue: TEvent[] = []; + this.listeners.add(queue); + const detach = async (): Promise> => { + this.listeners.delete(queue); + return { value: undefined, done: true }; + }; + return { + next: async (): Promise> => { + for (;;) { + if (queue.length > 0) { + return { value: queue.shift()!, done: false }; + } + if (this.done) return detach(); + await new Promise((resolve) => this.waiters.push(resolve)); + } + }, + // for-await calls this on break/throw — drop the queue eagerly + return: detach, + }; } } diff --git a/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts b/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts index a6898259..f88e3f3a 100644 --- a/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts +++ b/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts @@ -1,18 +1,27 @@ import { z } from "zod"; import { AgentLoopTurn } from "./types.js"; +import { joinTranscript, splitTranscript } from "./prefix-dedup.js"; import type { TurnStore } from "./turn-store.js"; +type StoredTurn = { + // turn with `messages` holding only the delta past prefixLength + turn: z.infer; + prefixLength: number; +}; + +// Mirrors SqliteTurnStore's behavior — including transcript prefix dedup — +// so unit tests exercise the same storage semantics as production. export class InMemoryTurnStore implements TurnStore { - private turns = new Map>(); + private rows = new Map(); async create(turn: z.infer): Promise { - if (this.turns.has(turn.id)) { + if (this.rows.has(turn.id)) { throw new Error(`Turn already exists: ${turn.id}`); } // Mirror the SQLite UNIQUE(session_id, session_seq) tripwire — NULL // seqs never conflict, matching SQLite's distinct-NULLs semantics. if (turn.sessionId !== null && turn.sessionSeq !== null) { - for (const existing of this.turns.values()) { + for (const { turn: existing } of this.rows.values()) { if (existing.sessionId === turn.sessionId && existing.sessionSeq === turn.sessionSeq) { throw new Error( `Turn with session seq already exists: ${turn.sessionId}#${turn.sessionSeq}`, @@ -20,19 +29,31 @@ export class InMemoryTurnStore implements TurnStore { } } } - this.turns.set(turn.id, structuredClone(turn)); + const prev = this.previousTurn(turn.sessionId, turn.sessionSeq); + const { prefixLength, delta } = splitTranscript(turn, prev); + this.rows.set(turn.id, { + turn: structuredClone({ ...turn, messages: delta }), + prefixLength, + }); } async get(id: string): Promise | null> { - const turn = this.turns.get(id); - return turn ? structuredClone(turn) : null; + const row = this.rows.get(id); + return row ? this.materialize(row) : null; } async update(turn: z.infer): Promise { - if (!this.turns.has(turn.id)) { + const row = this.rows.get(turn.id); + if (!row) { throw new Error(`Turn not found: ${turn.id}`); } - this.turns.set(turn.id, structuredClone(turn)); + if (turn.messages.length < row.prefixLength) { + throw new Error(`Turn ${turn.id} shrank below its stored transcript prefix`); + } + this.rows.set(turn.id, { + turn: structuredClone({ ...turn, messages: turn.messages.slice(row.prefixLength) }), + prefixLength: row.prefixLength, + }); } async latestForSession(sessionId: string): Promise | null> { @@ -41,9 +62,43 @@ export class InMemoryTurnStore implements TurnStore { } async listBySession(sessionId: string): Promise[]> { - return [...this.turns.values()] - .filter((turn) => turn.sessionId === sessionId) - .sort((a, b) => (a.sessionSeq ?? 0) - (b.sessionSeq ?? 0)) - .map((turn) => structuredClone(turn)); + const rows = [...this.rows.values()] + .filter(({ turn }) => turn.sessionId === sessionId) + .sort((a, b) => (a.turn.sessionSeq ?? 0) - (b.turn.sessionSeq ?? 0)); + // Fold forward: each materialized turn is the prefix source for the next. + const out: z.infer[] = []; + let prev: z.infer | null = null; + for (const row of rows) { + const turn = structuredClone(row.turn); + const prefixSource = + prev !== null && prev.sessionSeq === (turn.sessionSeq ?? 0) - 1 ? prev : null; + turn.messages = joinTranscript(turn.id, prefixSource, row.prefixLength, turn.messages); + out.push(turn); + prev = turn; + } + return out; + } + + private materialize(row: StoredTurn): z.infer { + const turn = structuredClone(row.turn); + if (row.prefixLength > 0) { + // recursion bottoms out at seq 1 (or the first non-deduped turn) + const prev = this.previousTurn(turn.sessionId, turn.sessionSeq); + turn.messages = joinTranscript(turn.id, prev, row.prefixLength, turn.messages); + } + return turn; + } + + private previousTurn( + sessionId: string | null, + sessionSeq: number | null, + ): z.infer | null { + if (sessionId === null || sessionSeq === null || sessionSeq <= 1) return null; + for (const row of this.rows.values()) { + if (row.turn.sessionId === sessionId && row.turn.sessionSeq === sessionSeq - 1) { + return this.materialize(row); + } + } + return null; } } diff --git a/apps/x/packages/core/src/agent-loop/model-adapter.ts b/apps/x/packages/core/src/agent-loop/model-adapter.ts index a9a90a93..733b9c82 100644 --- a/apps/x/packages/core/src/agent-loop/model-adapter.ts +++ b/apps/x/packages/core/src/agent-loop/model-adapter.ts @@ -10,7 +10,7 @@ import { convertFromMessages } from "../agents/runtime.js"; import { createProvider } from "../models/models.js"; import { resolveProviderConfig } from "../models/defaults.js"; import { EventStream } from "./event-stream.js"; -import type { ModelStreamEvent, ToolDefinition } from "./types.js"; +import type { ModelStreamEvent, ModelUsage, ToolDefinition } from "./types.js"; export type ModelStreamRequest = { provider: string | null; @@ -20,24 +20,33 @@ export type ModelStreamRequest = { signal: AbortSignal; }; +// Usage as reported by the provider for one model step; null when the +// provider reported nothing (the loop then records no usage fact). +export type ModelStepUsage = Omit, "at">; + +export type ModelStepResult = { + message: z.infer; + usage: ModelStepUsage | null; +}; + // Streams one model step. Iterate for deltas, or just await `.result` for the -// final complete AssistantMessage. The loop commits only the complete message; -// deltas are never persisted. +// final complete AssistantMessage + usage. The loop commits only the complete +// message; deltas are never persisted. // // Contract: `.result` is authoritative — it MUST resolve with the complete // message or reject on failure/abort (the loop distinguishes the two via its // own AbortSignal). `error` events are observational only; the loop ignores // them. export interface ModelAdapter { - stream(req: ModelStreamRequest): EventStream>; + stream(req: ModelStreamRequest): EventStream; } // Thin adapter over the existing provider factory + Vercel AI SDK streamText. // All retry/failover policy stays out of the agent loop; if a step fails, the // stream emits an `error` event and the loop records a turn-level error. export class VercelModelAdapter implements ModelAdapter { - stream(req: ModelStreamRequest): EventStream> { - const out = new EventStream>(); + stream(req: ModelStreamRequest): EventStream { + const out = new EventStream(); void this.run(req, out).catch((error: unknown) => { out.push({ type: "error", error }); out.fail(error); @@ -47,7 +56,7 @@ export class VercelModelAdapter implements ModelAdapter { private async run( req: ModelStreamRequest, - out: EventStream>, + out: EventStream, ): Promise { if (!req.provider || !req.model) { throw new Error("Agent loop turn has no provider/model configured"); @@ -126,8 +135,20 @@ export class VercelModelAdapter implements ModelAdapter { role: "assistant", content: parts.length > 0 ? parts : "", }; + // Usage is best-effort: a provider that fails to report it must not + // fail the step the model itself completed. + const usage = await result.usage.then( + (u) => ({ + inputTokens: u.inputTokens ?? null, + outputTokens: u.outputTokens ?? null, + totalTokens: u.totalTokens ?? null, + reasoningTokens: u.reasoningTokens ?? null, + cachedInputTokens: u.cachedInputTokens ?? null, + }), + () => null, + ); out.push({ type: "finish", message }); - out.end(message); + out.end({ message, usage }); } } diff --git a/apps/x/packages/core/src/agent-loop/prefix-dedup.ts b/apps/x/packages/core/src/agent-loop/prefix-dedup.ts new file mode 100644 index 00000000..0a52cefd --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/prefix-dedup.ts @@ -0,0 +1,63 @@ +import { z } from "zod"; +import { Message } from "@x/shared/dist/message.js"; +import { AgentLoopTurn, closedTranscript } from "./types.js"; + +// Transcript prefix dedup, shared by the turn stores. +// +// A session turn's input begins with the previous turn's closed transcript +// (copy-forward history). Storing that prefix again in every turn makes +// session storage quadratic and rewrites the whole transcript on every fact +// append. Instead, stores keep only the suffix the turn adds, plus the prefix +// LENGTH; on read, the prefix is recomputed from the (immutable, terminal) +// previous turn via closedTranscript and re-attached. +// +// The dedup is OPPORTUNISTIC: if a turn's messages do not start with exactly +// the previous turn's closed transcript (e.g. a future compaction feature +// sends a summary instead), the full messages are stored and nothing breaks. +// Callers above the store never see any of this — turns read back whole. + +export type SplitTranscript = { + prefixLength: number; + delta: z.infer[]; +}; + +// prev is the MATERIALIZED previous session turn (seq - 1), or null if there +// is none. Returns the storable suffix; prefixLength 0 means "stored whole". +export function splitTranscript( + turn: z.infer, + prev: z.infer | null, +): SplitTranscript { + if (prev === null) return { prefixLength: 0, delta: turn.messages }; + const closed = closedTranscript(prev); + if (closed.length === 0 || closed.length > turn.messages.length) { + return { prefixLength: 0, delta: turn.messages }; + } + const head = turn.messages.slice(0, closed.length); + if (JSON.stringify(head) !== JSON.stringify(closed)) { + return { prefixLength: 0, delta: turn.messages }; + } + return { prefixLength: closed.length, delta: turn.messages.slice(closed.length) }; +} + +// Inverse of splitTranscript. prev must be the materialized previous session +// turn whenever prefixLength > 0 — its absence means the chain is broken +// (a deleted or missing predecessor), which must fail loudly, never return a +// transcript with a silently missing prefix. +export function joinTranscript( + turnId: string, + prev: z.infer | null, + prefixLength: number, + delta: z.infer[], +): z.infer[] { + if (prefixLength === 0) return delta; + if (prev === null) { + throw new Error(`Turn ${turnId} requires its previous session turn to materialize`); + } + const closed = closedTranscript(prev); + if (closed.length !== prefixLength) { + throw new Error( + `Transcript prefix mismatch for turn ${turnId}: stored ${prefixLength}, derived ${closed.length}`, + ); + } + return [...closed, ...delta]; +} diff --git a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts index b56276d6..6c8e281c 100644 --- a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts +++ b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts @@ -81,6 +81,16 @@ function sampleTurn( ], startedTools: [{ toolCallId: "tc1", startedAt: "2026-06-12T00:00:02Z" }], dispatchedTools: [], + modelUsage: [ + { + inputTokens: 120, + outputTokens: 45, + totalTokens: 165, + reasoningTokens: null, + cachedInputTokens: 80, + at: "2026-06-12T00:00:03Z", + }, + ], error: null, completedAt: null, createdAt: "2026-06-12T00:00:00Z", @@ -159,6 +169,98 @@ describe("SqliteTurnStore", () => { await store.create(sampleTurn("t4")); }); + describe("transcript prefix dedup", () => { + // t1's tool call is resolved, so its closed transcript IS its messages + // (3 of them) — t2 extends it with a new exchange. + function chainTurns() { + const t1 = sampleTurn("t1", { sessionId: "s1", sessionSeq: 1 }); + const t2 = sampleTurn("t2", { + sessionId: "s1", + sessionSeq: 2, + messages: [ + ...t1.messages, + { role: "user", content: "second question" }, + { role: "assistant", content: "second answer" }, + ], + }); + return { t1, t2 }; + } + + it("stores only the delta at rest; reads materialize transparently", async () => { + const { store, db } = await loadStore(); + const { t1, t2 } = chainTurns(); + await store.create(t1); + await store.create(t2); + + expect(await store.get("t2")).toEqual(t2); + expect(await store.listBySession("s1")).toEqual([t1, t2]); + expect(await store.latestForSession("s1")).toEqual(t2); + + const raw = await db + .selectFrom("agent_loop_turns") + .select(["messages", "prefix_length"]) + .where("id", "=", "t2") + .executeTakeFirstOrThrow(); + expect(raw.prefix_length).toBe(3); + expect(JSON.parse(raw.messages)).toHaveLength(2); // only the new exchange + expect(raw.messages).not.toContain("let me check"); // t1 content not duplicated + }); + + it("updates rewrite only the delta; the prefix stays deduped", async () => { + const { store, db } = await loadStore(); + const { t1, t2 } = chainTurns(); + await store.create(t1); + await store.create(t2); + + const updated = { + ...t2, + messages: [...t2.messages, { role: "user" as const, content: "follow-up" }], + updatedAt: "2026-06-12T00:01:00Z", + }; + await store.update(updated); + + expect(await store.get("t2")).toEqual(updated); + const raw = await db + .selectFrom("agent_loop_turns") + .select(["messages", "prefix_length"]) + .where("id", "=", "t2") + .executeTakeFirstOrThrow(); + expect(raw.prefix_length).toBe(3); + expect(JSON.parse(raw.messages)).toHaveLength(3); + }); + + it("stores the whole transcript when the input does not extend the previous turn", async () => { + const { store, db } = await loadStore(); + const { t1 } = chainTurns(); + await store.create(t1); + // compaction-style input: a summary instead of the prior transcript + const t2 = sampleTurn("t2", { + sessionId: "s1", + sessionSeq: 2, + messages: [{ role: "user", content: "summary of the conversation so far" }], + }); + await store.create(t2); + + expect(await store.get("t2")).toEqual(t2); + const raw = await db + .selectFrom("agent_loop_turns") + .select("prefix_length") + .where("id", "=", "t2") + .executeTakeFirstOrThrow(); + expect(raw.prefix_length).toBe(0); + }); + + it("fails loudly when a deduped turn's predecessor is missing", async () => { + const { store, db } = await loadStore(); + const { t1, t2 } = chainTurns(); + await store.create(t1); + await store.create(t2); + + await db.deleteFrom("agent_loop_turns").where("id", "=", "t1").execute(); + await expect(store.get("t2")).rejects.toThrow("previous session turn"); + }); + }); + it("fails loudly on a corrupted JSON column", async () => { const { store, db } = await loadStore(); await store.create(sampleTurn("t1")); diff --git a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts index 2ff90bb1..57974ff3 100644 --- a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts +++ b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts @@ -1,12 +1,14 @@ import type { Insertable, Kysely, Selectable } from "kysely"; import { z } from "zod"; -import { MessageList } from "@x/shared/dist/message.js"; +import { Message, MessageList } from "@x/shared/dist/message.js"; import type { AgentLoopTurnsTable, Database } from "../storage/schema.js"; +import { joinTranscript, splitTranscript } from "./prefix-dedup.js"; import type { TurnStore } from "./turn-store.js"; import { AgentLoopError, AgentLoopTurn, DispatchedTool, + ModelUsage, PermissionDecision, PermissionMode, PermissionRequest, @@ -16,13 +18,19 @@ import { // Accepts a Kysely from the existing getDb(); it does not own the // storage lifecycle (never calls initStorage()). Every JSON column is // zod-parsed on read so schema drift fails loudly at the boundary. +// +// Session turns are stored with their copy-forward prefix deduplicated (see +// prefix-dedup.ts): the messages column holds only the suffix past +// prefix_length, and reads re-attach the prefix from the previous turn. export class SqliteTurnStore implements TurnStore { constructor(private db: Kysely) {} async create(turn: z.infer): Promise { + const prev = await this.previousTurn(turn.sessionId, turn.sessionSeq); + const { prefixLength, delta } = splitTranscript(turn, prev); await this.db .insertInto("agent_loop_turns") - .values(toRow(turn)) + .values(toRow(turn, delta, prefixLength)) .execute(); } @@ -32,44 +40,100 @@ export class SqliteTurnStore implements TurnStore { .selectAll() .where("id", "=", id) .executeTakeFirst(); - return row ? fromRow(row) : null; + if (!row) return null; + if (row.prefix_length === 0) return fromRow(row); + if (row.session_id === null || row.session_seq === null) { + // only session turns are ever stored deduped + throw new Error(`Turn ${id} has a transcript prefix but no session linkage`); + } + // Materializing requires the chain up to this turn; fold forward. + const chain = await this.foldSession(row.session_id, row.session_seq); + const turn = chain[chain.length - 1]; + if (!turn || turn.id !== id) { + throw new Error(`Turn ${id} requires its previous session turn to materialize`); + } + return turn; } async update(turn: z.infer): Promise { - const { id, ...rest } = toRow(turn); + const existing = await this.db + .selectFrom("agent_loop_turns") + .select("prefix_length") + .where("id", "=", turn.id) + .executeTakeFirst(); + if (!existing) { + throw new Error(`Turn not found: ${turn.id}`); + } + if (turn.messages.length < existing.prefix_length) { + throw new Error(`Turn ${turn.id} shrank below its stored transcript prefix`); + } + const delta = turn.messages.slice(existing.prefix_length); + const { id, ...rest } = toRow(turn, delta, existing.prefix_length); const result = await this.db .updateTable("agent_loop_turns") .set(rest) .where("id", "=", id) .executeTakeFirst(); + // The SELECT above proved existence, but keep the write itself honest: + // a row vanishing in between must never silently no-op. if (result.numUpdatedRows === 0n) { throw new Error(`Turn not found: ${id}`); } } async latestForSession(sessionId: string): Promise | null> { - const row = await this.db - .selectFrom("agent_loop_turns") - .selectAll() - .where("session_id", "=", sessionId) - .orderBy("session_seq", "desc") - .limit(1) - .executeTakeFirst(); - return row ? fromRow(row) : null; + const turns = await this.foldSession(sessionId, null); + return turns.length > 0 ? turns[turns.length - 1] : null; } async listBySession(sessionId: string): Promise[]> { - const rows = await this.db + return this.foldSession(sessionId, null); + } + + // Loads a session's turns in seq order (up to and including uptoSeq, or + // all) and materializes each transcript from the previous turn's. + private async foldSession( + sessionId: string, + uptoSeq: number | null, + ): Promise[]> { + let query = this.db .selectFrom("agent_loop_turns") .selectAll() .where("session_id", "=", sessionId) - .orderBy("session_seq", "asc") - .execute(); - return rows.map(fromRow); + .orderBy("session_seq", "asc"); + if (uptoSeq !== null) { + query = query.where("session_seq", "<=", uptoSeq); + } + const rows = await query.execute(); + const out: z.infer[] = []; + let prev: z.infer | null = null; + for (const row of rows) { + const turn = fromRow(row); + const prefixSource = + prev !== null && prev.sessionSeq === (turn.sessionSeq ?? 0) - 1 ? prev : null; + turn.messages = joinTranscript(turn.id, prefixSource, row.prefix_length, turn.messages); + out.push(turn); + prev = turn; + } + return out; + } + + private async previousTurn( + sessionId: string | null, + sessionSeq: number | null, + ): Promise | null> { + if (sessionId === null || sessionSeq === null || sessionSeq <= 1) return null; + const chain = await this.foldSession(sessionId, sessionSeq - 1); + const prev = chain[chain.length - 1]; + return prev && prev.sessionSeq === sessionSeq - 1 ? prev : null; } } -function toRow(turn: z.infer): Insertable { +function toRow( + turn: z.infer, + delta: z.infer[], + prefixLength: number, +): Insertable { return { id: turn.id, agent_id: turn.agentId, @@ -78,11 +142,13 @@ function toRow(turn: z.infer): Insertable): Insertable): z.infer { return { id: row.id, @@ -104,6 +172,7 @@ function fromRow(row: Selectable): z.infer): TurnStatu } return "idle"; } + +// The transcript as a successor turn would see it: a terminal turn's dangling +// tool calls are closed out with synthetic ToolMessages so a follow-up never +// re-executes — or hangs on — stale calls. Pure and deterministic over an +// immutable (terminal) turn, which is what lets the sessions layer build the +// next turn's input from it AND lets stores reproduce it byte-for-byte. +export function closedTranscript( + turn: z.infer, +): z.infer[] { + const messages = [...turn.messages]; + for (const call of unresolvedToolCalls(turn)) { + messages.push({ + role: "tool", + content: closureContent(deriveToolCallState(turn, call.toolCallId)), + toolCallId: call.toolCallId, + toolName: call.toolName, + }); + } + return messages; +} + +// Honest per-state wording for a dangling call: how far did it actually get? +function closureContent(state: ToolCallState): string { + switch (state) { + case "interrupted": + // execution began in-process; the side effect may have landed + return "Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran."; + case "dispatched": + // delegated to an external runner; it may still finish out there + return "Tool was dispatched but its result never arrived; it may have completed externally. Do not assume it ran or that it failed."; + default: + // never reached execution (unevaluated / awaiting permission / cleared-but-not-started) + return "Tool was not executed: the turn was stopped before this call ran."; + } +} + +// Sum of all model calls in the turn. A field is null only if no call +// reported it; otherwise unreported entries count as 0 toward the sum. +export function totalUsage( + turn: z.infer, +): Omit, "at"> { + const sum = (field: "inputTokens" | "outputTokens" | "totalTokens" | "reasoningTokens" | "cachedInputTokens") => { + const reported = turn.modelUsage.map((u) => u[field]).filter((v) => v !== null); + if (reported.length === 0) return null; + return reported.reduce((a, b) => a + b, 0); + }; + return { + inputTokens: sum("inputTokens"), + outputTokens: sum("outputTokens"), + totalTokens: sum("totalTokens"), + reasoningTokens: sum("reasoningTokens"), + cachedInputTokens: sum("cachedInputTokens"), + }; +} diff --git a/apps/x/packages/core/src/sessions/sessions.test.ts b/apps/x/packages/core/src/sessions/sessions.test.ts index 8d3cc85c..e0c0f01c 100644 --- a/apps/x/packages/core/src/sessions/sessions.test.ts +++ b/apps/x/packages/core/src/sessions/sessions.test.ts @@ -8,7 +8,11 @@ import { import { AgentLoopImpl } from "../agent-loop/agent-loop.js"; import { EventStream } from "../agent-loop/event-stream.js"; import { InMemoryTurnStore } from "../agent-loop/in-memory-turn-store.js"; -import type { ModelAdapter, ModelStreamRequest } from "../agent-loop/model-adapter.js"; +import type { + ModelAdapter, + ModelStepResult, + ModelStreamRequest, +} from "../agent-loop/model-adapter.js"; import type { PermissionGate } from "../agent-loop/permission-gate.js"; import type { ToolRunner, ToolRunResult } from "../agent-loop/tool-runner.js"; import { @@ -48,9 +52,9 @@ class FakeModelAdapter implements ModelAdapter { constructor(private steps: ModelStep[]) {} - stream(req: ModelStreamRequest): EventStream> { + stream(req: ModelStreamRequest): EventStream { this.calls++; - const out = new EventStream>(); + const out = new EventStream(); const step = this.steps.shift(); void (async () => { await Promise.resolve(); @@ -66,7 +70,7 @@ class FakeModelAdapter implements ModelAdapter { return; } out.push({ type: "finish", message: step.message }); - out.end(step.message); + out.end({ message: step.message, usage: null }); })(); return out; } @@ -132,6 +136,7 @@ function turnFixture( permissionDecisions: [], startedTools: [], dispatchedTools: [], + modelUsage: [], error: null, completedAt: null, createdAt: now, @@ -348,6 +353,37 @@ describe("SessionsImpl", () => { ]); }); + it("a stopped turn's dispatched call is closed out as possibly completed externally", async () => { + const { sessions, loop, turnStore } = makeSessions({ + steps: [{ kind: "message", message: assistantText("noted") }], + }); + const session = await sessions.createSession(); + + // crafted waiting turn: tc1 was delegated to an external runner + // (pending), then the user stopped the turn before the result arrived + await turnStore.create(turnFixture("t1", { + sessionId: session.id, + sessionSeq: 1, + messages: [userMsg("run the job"), assistantToolCalls(toolCall("tc1", "background-job"))], + startedTools: [{ toolCallId: "tc1", startedAt: "2026-06-12T00:00:00Z" }], + dispatchedTools: [{ toolCallId: "tc1", dispatchedAt: "2026-06-12T00:00:01Z" }], + })); + await loop.stopTurn("t1"); + + const closure = { + role: "tool" as const, + content: "Tool was dispatched but its result never arrived; it may have completed externally. Do not assume it ran or that it failed.", + toolCallId: "tc1", + toolName: "background-job", + }; + const history = await sessions.getHistory(session.id); + expect(history[history.length - 1]).toEqual(closure); + + // the next turn carries the same closure forward + const turn2 = await (await sessions.sendMessage(session.id, [userMsg("ok")])).result; + expect(turn2.messages).toEqual([...history, userMsg("ok"), assistantText("noted")]); + }); + it("builds on an errored turn's persisted transcript", async () => { const { sessions, turnStore } = makeSessions({ steps: [{ kind: "message", message: assistantText("recovered") }], diff --git a/apps/x/packages/core/src/sessions/sessions.ts b/apps/x/packages/core/src/sessions/sessions.ts index c5eae7f2..da9b781e 100644 --- a/apps/x/packages/core/src/sessions/sessions.ts +++ b/apps/x/packages/core/src/sessions/sessions.ts @@ -1,14 +1,13 @@ import crypto from "node:crypto"; import { z } from "zod"; -import { Message, MessageList } from "@x/shared/dist/message.js"; +import { MessageList } from "@x/shared/dist/message.js"; import type { AgentLoop, TurnHandle } from "../agent-loop/agent-loop.js"; import { KeyedMutex } from "../agent-loop/mutex.js"; import type { TurnStore } from "../agent-loop/turn-store.js"; import { AgentLoopTurn, - deriveToolCallState, + closedTranscript, deriveTurnStatus, - unresolvedToolCalls, } from "../agent-loop/types.js"; import type { SessionStore } from "./session-store.js"; import { CreateSessionInput, SendMessageOptions, Session } from "./types.js"; @@ -113,7 +112,7 @@ export class SessionsImpl implements Sessions { : {}), sessionId, sessionSeq: (latest?.sessionSeq ?? 0) + 1, - messages: [...(latest ? historyFrom(latest) : []), ...newMessages], + messages: [...(latest ? closedTranscript(latest) : []), ...newMessages], }); }); } @@ -123,7 +122,7 @@ export class SessionsImpl implements Sessions { async getHistory(sessionId: string): Promise> { await this.mustGetSession(sessionId); const latest = await this.turnStore.latestForSession(sessionId); - return latest ? historyFrom(latest) : []; + return latest ? closedTranscript(latest) : []; } async listTurns(sessionId: string): Promise[]> { @@ -137,26 +136,3 @@ export class SessionsImpl implements Sessions { return session; } } - -// Copy-forward history: the next turn's input is the previous turn's full -// transcript. A stopped turn can carry unresolved tool calls; they are closed -// out with synthetic ToolMessages so the new turn never re-executes — or -// hangs on — stale calls. This is the sessions-layer analogue of the -// reducer's interrupted-call handling. -function historyFrom( - turn: z.infer, -): z.infer[] { - const messages = [...turn.messages]; - for (const call of unresolvedToolCalls(turn)) { - const interrupted = deriveToolCallState(turn, call.toolCallId) === "interrupted"; - messages.push({ - role: "tool", - content: interrupted - ? "Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran." - : "Tool was not executed: the turn was stopped before this call ran.", - toolCallId: call.toolCallId, - toolName: call.toolName, - }); - } - return messages; -} diff --git a/apps/x/packages/core/src/storage/migrations.ts b/apps/x/packages/core/src/storage/migrations.ts index d298536b..e605b516 100644 --- a/apps/x/packages/core/src/storage/migrations.ts +++ b/apps/x/packages/core/src/storage/migrations.ts @@ -100,6 +100,31 @@ const migrations: Record = { await db.schema.dropTable("sessions").ifExists().execute(); }, }, + "2026-06-12_0004_turn_model_usage": { + async up(db: MigrationDb): Promise { + await db.schema + .alterTable("agent_loop_turns") + .addColumn("model_usage", "text", (col) => col.notNull().defaultTo("[]")) + .execute(); + }, + async down(db: MigrationDb): Promise { + await db.schema.alterTable("agent_loop_turns").dropColumn("model_usage").execute(); + }, + }, + "2026-06-12_0005_turn_transcript_dedup": { + async up(db: MigrationDb): Promise { + // 0 = messages stored whole; N = the first N messages are the + // previous session turn's closed transcript, recomputed on read. + // Existing rows default to 0, so they keep reading back unchanged. + await db.schema + .alterTable("agent_loop_turns") + .addColumn("prefix_length", "integer", (col) => col.notNull().defaultTo(0)) + .execute(); + }, + async down(db: MigrationDb): Promise { + await db.schema.alterTable("agent_loop_turns").dropColumn("prefix_length").execute(); + }, + }, }; class InCodeMigrationProvider implements MigrationProvider { diff --git a/apps/x/packages/core/src/storage/schema.ts b/apps/x/packages/core/src/storage/schema.ts index 5fd9ec52..dfc53c11 100644 --- a/apps/x/packages/core/src/storage/schema.ts +++ b/apps/x/packages/core/src/storage/schema.ts @@ -16,11 +16,13 @@ export interface AgentLoopTurnsTable { permission_mode: string; session_id: string | null; session_seq: number | null; - messages: string; // JSON: MessageList + messages: string; // JSON: MessageList (delta past prefix_length) + prefix_length: number; // copy-forward prefix deduped at rest; 0 = stored whole permission_requests: string; // JSON: PermissionRequest[] permission_decisions: string; // JSON: PermissionDecision[] started_tools: string; // JSON: StartedTool[] dispatched_tools: string; // JSON: DispatchedTool[] + model_usage: string; // JSON: ModelUsage[] error: string | null; // JSON: AgentLoopError created_at: TimestampColumn; updated_at: TimestampColumn;