diff --git a/apps/x/packages/core/src/agent-loop/agent-loop.test.ts b/apps/x/packages/core/src/agent-loop/agent-loop.test.ts new file mode 100644 index 00000000..708d4acb --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/agent-loop.test.ts @@ -0,0 +1,692 @@ +import { describe, expect, it } from "vitest"; +import { z } from "zod"; +import { + AssistantMessage, + Message, + ToolCallPart, +} from "@x/shared/dist/message.js"; +import { AgentLoopImpl } from "./agent-loop.js"; +import { EventStream } from "./event-stream.js"; +import { InMemoryTurnStore } from "./in-memory-turn-store.js"; +import type { ModelAdapter, ModelStreamRequest } from "./model-adapter.js"; +import type { PermissionClassification, PermissionGate } from "./permission-gate.js"; +import type { ToolRunner, ToolRunResult } from "./tool-runner.js"; +import type { TurnStore } from "./turn-store.js"; +import { + AgentLoopTurn, + deriveTurnStatus, + type ModelStreamEvent, +} from "./types.js"; + +// ─── helpers ──────────────────────────────────────────────────────────────── + +function userMsg(text: string): z.infer { + return { role: "user", content: text }; +} + +function assistantText(text: string): z.infer { + return { role: "assistant", content: text }; +} + +function toolCall(toolCallId: string, toolName: string): z.infer { + return { type: "tool-call", toolCallId, toolName, arguments: {} }; +} + +function assistantToolCalls( + ...calls: z.infer[] +): z.infer { + return { role: "assistant", content: calls }; +} + +type ModelStep = + | { kind: "message"; message: z.infer; deltas?: string[] } + | { kind: "error"; error: unknown } + | { kind: "hang" }; + +class FakeModelAdapter implements ModelAdapter { + calls = 0; + + constructor(private steps: ModelStep[]) {} + + stream(req: ModelStreamRequest): EventStream> { + this.calls++; + const out = new EventStream>(); + const step = this.steps.shift(); + void (async () => { + await Promise.resolve(); + if (!step) { + const error = new Error("FakeModelAdapter: no scripted step left"); + out.push({ type: "error", error }); + out.fail(error); + return; + } + if (step.kind === "hang") { + out.push({ type: "text-delta", delta: "partial" }); + // Mirror VercelModelAdapter's abort behavior: push an error + // event AND fail the result (the loop must rely only on the + // latter — regression for the stop-bricks-turn bug). + req.signal.addEventListener("abort", () => { + const error = new Error("aborted"); + out.push({ type: "error", error }); + out.fail(error); + }, { once: true }); + return; + } + if (step.kind === "error") { + out.push({ type: "error", error: step.error }); + out.fail(step.error); + return; + } + for (const delta of step.deltas ?? []) { + out.push({ type: "text-delta", delta }); + } + if (typeof step.message.content !== "string") { + for (const part of step.message.content) { + if (part.type === "tool-call") out.push({ type: "tool-call", toolCall: part }); + } + } + out.push({ type: "finish", message: step.message }); + out.end(step.message); + })(); + return out; + } +} + +class FakeToolRunner implements ToolRunner { + ran: string[] = []; + + constructor( + private behaviors: Record) => ToolRunResult> = {}, + ) {} + + definitions() { + return []; + } + + async run(call: z.infer): Promise { + this.ran.push(call.toolCallId); + const behavior = this.behaviors[call.toolName]; + return behavior ? behavior(call) : { type: "result", value: `ok:${call.toolName}` }; + } +} + +class FakePermissionGate implements PermissionGate { + checkCalls = 0; + classifyCalls = 0; + + constructor( + private opts: { + required?: (toolName: string) => boolean; + classify?: (call: z.infer) => PermissionClassification; + } = {}, + ) {} + + async check(call: z.infer) { + this.checkCalls++; + return this.opts.required?.(call.toolName) + ? { required: true as const, request: { tool: call.toolName } } + : { required: false as const }; + } + + async classify(call: z.infer): Promise { + this.classifyCalls++; + return this.opts.classify + ? this.opts.classify(call) + : { decision: "abstained", reason: "unsure" }; + } +} + +// Records a snapshot per store write so tests can assert write batching. +class SnapshottingStore implements TurnStore { + private inner = new InMemoryTurnStore(); + snapshots: z.infer[] = []; + + async create(turn: z.infer) { + this.snapshots.push(structuredClone(turn)); + await this.inner.create(turn); + } + + async get(id: string) { + return this.inner.get(id); + } + + async update(turn: z.infer) { + this.snapshots.push(structuredClone(turn)); + await this.inner.update(turn); + } +} + +function makeLoop(opts: { + steps?: ModelStep[]; + runner?: FakeToolRunner; + gate?: FakePermissionGate; + store?: TurnStore; + maxIterations?: number; +} = {}) { + const store = opts.store ?? new InMemoryTurnStore(); + const adapter = new FakeModelAdapter(opts.steps ?? []); + const runner = opts.runner ?? new FakeToolRunner(); + const gate = opts.gate ?? new FakePermissionGate(); + const loop = new AgentLoopImpl({ + store, + modelAdapter: adapter, + toolRunner: runner, + permissionGate: gate, + ...(opts.maxIterations !== undefined ? { maxIterations: opts.maxIterations } : {}), + }); + return { loop, store, adapter, runner, gate }; +} + +function emptyTurn( + id: string, + overrides: Partial> = {}, +): z.infer { + const now = new Date().toISOString(); + return { + id, + agentId: null, + provider: null, + model: null, + permissionMode: "manual", + messages: [], + permissionRequests: [], + permissionDecisions: [], + startedTools: [], + dispatchedTools: [], + error: null, + completedAt: null, + createdAt: now, + updatedAt: now, + ...overrides, + }; +} + +function toolMessages(turn: z.infer) { + return turn.messages.filter((m) => m.role === "tool"); +} + +// ─── tests ────────────────────────────────────────────────────────────────── + +describe("AgentLoopImpl", () => { + it("completes a simple text turn and persists input metadata", async () => { + const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi!") }] }); + + const turn = await loop.createTurn({ + agentId: "a1", + provider: "openai", + model: "gpt-x", + messages: [userMsg("hello")], + }).result; + + expect(turn.agentId).toBe("a1"); + expect(turn.provider).toBe("openai"); + expect(turn.model).toBe("gpt-x"); + expect(turn.permissionMode).toBe("manual"); + expect(turn.messages).toEqual([userMsg("hello"), assistantText("hi!")]); + expect(turn.completedAt).not.toBeNull(); + expect(deriveTurnStatus(turn)).toBe("completed"); + }); + + it("rejects a turn with no input messages", async () => { + const { loop } = makeLoop(); + await expect(loop.createTurn({ messages: [] }).result).rejects.toThrow(); + }); + + it("runs tool calls: result and error both append ToolMessages and continue", async () => { + const calls = [toolCall("tc1", "good"), toolCall("tc2", "bad")]; + const { loop, runner } = makeLoop({ + steps: [ + { kind: "message", message: assistantToolCalls(...calls) }, + { kind: "message", message: assistantText("done") }, + ], + runner: new FakeToolRunner({ + good: () => ({ type: "result", value: { answer: 42 } }), + bad: () => ({ type: "error", value: "boom" }), + }), + }); + + const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + + expect(runner.ran).toEqual(["tc1", "tc2"]); + expect(toolMessages(turn)).toEqual([ + { role: "tool", content: '{"answer":42}', toolCallId: "tc1", toolName: "good" }, + { role: "tool", content: "boom", toolCallId: "tc2", toolName: "bad" }, + ]); + // tool error is NOT a turn error + expect(turn.error).toBeNull(); + expect(deriveTurnStatus(turn)).toBe("completed"); + expect(turn.startedTools.map((t) => t.toolCallId)).toEqual(["tc1", "tc2"]); + }); + + it("suspends on a pending tool and resumes via setToolResult", async () => { + const { loop, adapter } = makeLoop({ + steps: [ + { kind: "message", message: assistantToolCalls(toolCall("tc1", "ask-human")) }, + { kind: "message", message: assistantText("thanks") }, + ], + runner: new FakeToolRunner({ "ask-human": () => ({ type: "pending" }) }), + }); + + const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + expect(deriveTurnStatus(waiting)).toBe("waiting"); + expect(waiting.dispatchedTools.map((t) => t.toolCallId)).toEqual(["tc1"]); + expect(adapter.calls).toBe(1); // no model call while waiting + + const done = await loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "blue" }).result; + expect(toolMessages(done)).toEqual([ + { role: "tool", content: "blue", toolCallId: "tc1", toolName: "ask-human" }, + ]); + expect(deriveTurnStatus(done)).toBe("completed"); + }); + + it("requires all dispatched results before the loop continues", async () => { + const { loop, adapter } = makeLoop({ + steps: [ + { + kind: "message", + message: assistantToolCalls(toolCall("tc1", "job"), toolCall("tc2", "job")), + }, + { kind: "message", message: assistantText("done") }, + ], + runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }), + }); + + const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + expect(waiting.dispatchedTools).toHaveLength(2); + + const stillWaiting = await loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "r1" }).result; + expect(deriveTurnStatus(stillWaiting)).toBe("waiting"); + expect(adapter.calls).toBe(1); + + const done = await loop.setToolResult(waiting.id, { toolCallId: "tc2", result: "r2" }).result; + expect(deriveTurnStatus(done)).toBe("completed"); + expect(adapter.calls).toBe(2); + }); + + it("rejects setToolResult for a tool call that is not awaiting an external result", async () => { + const { loop } = makeLoop({ + steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "job")) }], + runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }), + }); + + const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + await expect( + loop.setToolResult(waiting.id, { toolCallId: "bogus", result: "x" }).result, + ).rejects.toThrow("not awaiting an external result"); + }); + + describe("permissions (manual mode)", () => { + it("batch-creates all permission requests in one store write", async () => { + const store = new SnapshottingStore(); + const { loop } = makeLoop({ + store, + steps: [{ + kind: "message", + message: assistantToolCalls(toolCall("tc1", "write"), toolCall("tc2", "exec")), + }], + gate: new FakePermissionGate({ required: () => true }), + }); + + const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + expect(deriveTurnStatus(turn)).toBe("waiting"); + expect(turn.permissionRequests.map((r) => r.toolCallId)).toEqual(["tc1", "tc2"]); + + // no snapshot with exactly one request — both landed in a single write + const counts = store.snapshots.map((s) => s.permissionRequests.length); + expect(counts).not.toContain(1); + expect(counts).toContain(2); + }); + + it("executes on grant; denial appends decision + ToolMessage atomically", async () => { + const store = new SnapshottingStore(); + const { loop, runner } = makeLoop({ + store, + steps: [ + { + kind: "message", + message: assistantToolCalls(toolCall("tc1", "write"), toolCall("tc2", "exec")), + }, + { kind: "message", message: assistantText("done") }, + ], + gate: new FakePermissionGate({ required: () => true }), + }); + + const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + + const afterDeny = await loop + .respondToPermission(waiting.id, "tc2", "denied", "nope").result; + expect(deriveTurnStatus(afterDeny)).toBe("waiting"); // tc1 still open + expect(toolMessages(afterDeny)).toEqual([ + { role: "tool", content: "Permission denied by the user: nope", toolCallId: "tc2", toolName: "exec" }, + ]); + // decision + denial ToolMessage landed in the same write + const denyWrite = store.snapshots.find((s) => + s.permissionDecisions.some((d) => d.toolCallId === "tc2")); + expect(denyWrite?.messages.some((m) => m.role === "tool" && m.toolCallId === "tc2")).toBe(true); + + const done = await loop.respondToPermission(waiting.id, "tc1", "granted").result; + expect(runner.ran).toEqual(["tc1"]); // denied tool never ran + expect(deriveTurnStatus(done)).toBe("completed"); + expect(done.permissionDecisions).toHaveLength(2); + expect(done.permissionDecisions.every((d) => d.decidedBy === "user")).toBe(true); + }); + + it("rejects a response for a call without an open request", async () => { + const { loop } = makeLoop({ + steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }], + gate: new FakePermissionGate({ required: () => true }), + }); + const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + await expect( + loop.respondToPermission(waiting.id, "bogus", "granted").result, + ).rejects.toThrow("No open permission request"); + }); + + it("does not re-ask after a grant: granted fact persists across resume", async () => { + const store = new InMemoryTurnStore(); + const gate = new FakePermissionGate({ required: () => true }); + const { loop, runner } = makeLoop({ + store, + gate, + steps: [{ kind: "message", message: assistantText("done") }], + }); + + // crafted state: granted decision persisted, tool not yet started + await store.create(emptyTurn("t1", { + messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "write"))], + permissionRequests: [{ toolCallId: "tc1", request: { tool: "write" }, requestedAt: "2026-06-12T00:00:00Z" }], + permissionDecisions: [{ + toolCallId: "tc1", decidedBy: "user", decision: "granted", + reason: null, decidedAt: "2026-06-12T00:00:01Z", + }], + })); + + const turn = await loop.resumeTurn("t1").result; + expect(gate.checkCalls).toBe(0); // never re-evaluated + expect(runner.ran).toEqual(["tc1"]); + expect(deriveTurnStatus(turn)).toBe("completed"); + }); + + it("never calls the classifier in manual mode", async () => { + const gate = new FakePermissionGate({ required: () => true }); + const { loop } = makeLoop({ + gate, + steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }], + }); + const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + expect(deriveTurnStatus(turn)).toBe("waiting"); + expect(gate.classifyCalls).toBe(0); + }); + }); + + describe("permissions (auto mode / classifier)", () => { + it("applies classifier grant and deny with reasons", async () => { + const { loop, runner } = makeLoop({ + steps: [ + { + kind: "message", + message: assistantToolCalls(toolCall("tc1", "read"), toolCall("tc2", "exec")), + }, + { kind: "message", message: assistantText("done") }, + ], + gate: new FakePermissionGate({ + required: () => true, + classify: (call) => call.toolName === "read" + ? { decision: "granted", reason: "read-only" } + : { decision: "denied", reason: "destructive" }, + }), + }); + + const turn = await loop.createTurn({ + messages: [userMsg("go")], + permissionMode: "auto", + }).result; + + expect(deriveTurnStatus(turn)).toBe("completed"); + expect(runner.ran).toEqual(["tc1"]); + expect(turn.permissionDecisions).toEqual([ + expect.objectContaining({ toolCallId: "tc1", decidedBy: "classifier", decision: "granted", reason: "read-only" }), + expect.objectContaining({ toolCallId: "tc2", decidedBy: "classifier", decision: "denied", reason: "destructive" }), + ]); + expect(toolMessages(turn).find((m) => m.toolCallId === "tc2")?.content) + .toBe("Permission denied by the auto-permission classifier: destructive"); + }); + + it("abstain waits on the user, persists, and is not re-run on resume", async () => { + const gate = new FakePermissionGate({ + required: () => true, + classify: () => ({ decision: "abstained", reason: "unsure" }), + }); + const { loop, runner } = makeLoop({ + gate, + steps: [ + { kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }, + { kind: "message", message: assistantText("done") }, + ], + }); + + const waiting = await loop.createTurn({ + messages: [userMsg("go")], + permissionMode: "auto", + }).result; + expect(deriveTurnStatus(waiting)).toBe("waiting"); + expect(gate.classifyCalls).toBe(1); + + // resume must not re-run the classifier — the abstention is a fact + const stillWaiting = await loop.resumeTurn(waiting.id).result; + expect(gate.classifyCalls).toBe(1); + expect(deriveTurnStatus(stillWaiting)).toBe("waiting"); + + // user can decide after an abstention + const done = await loop.respondToPermission(waiting.id, "tc1", "granted").result; + expect(runner.ran).toEqual(["tc1"]); + expect(deriveTurnStatus(done)).toBe("completed"); + }); + }); + + describe("crash recovery", () => { + it("resume treats started-but-unresolved tools as interrupted; never re-runs", async () => { + const store = new InMemoryTurnStore(); + const { loop, runner } = makeLoop({ + store, + steps: [{ kind: "message", message: assistantText("recovered") }], + }); + + await store.create(emptyTurn("t1", { + messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "send-email"))], + startedTools: [{ toolCallId: "tc1", startedAt: "2026-06-12T00:00:00Z" }], + })); + + const turn = await loop.resumeTurn("t1").result; + expect(runner.ran).toEqual([]); // NOT re-run + expect(toolMessages(turn)[0]?.content).toContain("interrupted"); + expect(deriveTurnStatus(turn)).toBe("completed"); + }); + + it("resume runs tools that were committed but never started", async () => { + const store = new InMemoryTurnStore(); + const { loop, runner } = makeLoop({ + store, + steps: [{ kind: "message", message: assistantText("done") }], + }); + + await store.create(emptyTurn("t1", { + messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "calc"))], + })); + + const turn = await loop.resumeTurn("t1").result; + expect(runner.ran).toEqual(["tc1"]); + expect(deriveTurnStatus(turn)).toBe("completed"); + }); + }); + + it("stopTurn mid-stream persists no partial message; turn is idle and resumable", async () => { + const { loop } = makeLoop({ + steps: [ + { kind: "hang" }, + { kind: "message", message: assistantText("second try") }, + ], + }); + + const handle = loop.createTurn({ messages: [userMsg("go")] }); + // wait for the first streamed delta, then stop + const iterator = handle.events[Symbol.asyncIterator](); + const first = await iterator.next(); + expect(first.value).toEqual({ type: "text-delta", delta: "partial" }); + + const stopped = await loop.stopTurn(handle.id); + expect(stopped.messages).toEqual([userMsg("go")]); // no partial persisted + expect(stopped.error).toBeNull(); + expect(deriveTurnStatus(stopped)).toBe("idle"); + await expect(handle.result).resolves.toBeTruthy(); // original handle also rests + + const resumed = await loop.resumeTurn(handle.id).result; + expect(resumed.messages).toEqual([userMsg("go"), assistantText("second try")]); + expect(deriveTurnStatus(resumed)).toBe("completed"); + }); + + it("stopTurn immediately after createTurn aborts the queued advance", async () => { + const { loop, adapter } = makeLoop({ + steps: [{ kind: "message", message: assistantText("should not run") }], + }); + + // No await between create and stop — the advance is still queued + // behind the mutex and its controller must already be abortable. + const handle = loop.createTurn({ messages: [userMsg("go")] }); + const stopped = await loop.stopTurn(handle.id); + + expect(adapter.calls).toBe(0); // model never called + expect(stopped.messages).toEqual([userMsg("go")]); // input fact persisted + expect(deriveTurnStatus(stopped)).toBe("idle"); + + const resumed = await loop.resumeTurn(handle.id).result; + expect(deriveTurnStatus(resumed)).toBe("completed"); + }); + + it("a throwing tool runner becomes an error ToolMessage, not a turn error", async () => { + const { loop } = makeLoop({ + steps: [ + { kind: "message", message: assistantToolCalls(toolCall("tc1", "flaky")) }, + { kind: "message", message: assistantText("recovered") }, + ], + runner: new FakeToolRunner({ + flaky: () => { throw new Error("ECONNRESET"); }, + }), + }); + + const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + expect(turn.error).toBeNull(); + expect(toolMessages(turn)[0]?.content).toBe("Tool execution failed: ECONNRESET"); + expect(deriveTurnStatus(turn)).toBe("completed"); + }); + + it("an unpaired denied decision never derives as executable", async () => { + const store = new InMemoryTurnStore(); + const { loop, runner } = makeLoop({ store }); + + // anomalous state: denied decision WITHOUT its paired denial ToolMessage + await store.create(emptyTurn("t1", { + messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "write"))], + permissionRequests: [{ toolCallId: "tc1", request: {}, requestedAt: "2026-06-12T00:00:00Z" }], + permissionDecisions: [{ + toolCallId: "tc1", decidedBy: "user", decision: "denied", + reason: null, decidedAt: "2026-06-12T00:00:01Z", + }], + })); + + const turn = await loop.resumeTurn("t1").result; + expect(runner.ran).toEqual([]); // the denied tool must NOT execute + expect(deriveTurnStatus(turn)).toBe("waiting"); + }); + + it("model failure is a terminal turn error; further mutations reject", async () => { + const { loop } = makeLoop({ steps: [{ kind: "error", error: new Error("rate limited") }] }); + + const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + expect(deriveTurnStatus(turn)).toBe("error"); + expect(turn.error?.message).toBe("rate limited"); + + await expect( + loop.setToolResult(turn.id, { toolCallId: "x", result: "y" }).result, + ).rejects.toThrow("terminal error"); + await expect( + loop.respondToPermission(turn.id, "x", "granted").result, + ).rejects.toThrow("terminal error"); + }); + + it("errors out when the iteration cap is exceeded", async () => { + const { loop } = makeLoop({ + maxIterations: 3, + steps: [ + { kind: "message", message: assistantToolCalls(toolCall("tc1", "calc")) }, + { kind: "message", message: assistantToolCalls(toolCall("tc2", "calc")) }, + ], + }); + + const turn = await loop.createTurn({ messages: [userMsg("go")] }).result; + expect(deriveTurnStatus(turn)).toBe("error"); + expect(turn.error?.message).toContain("exceeded 3 iterations"); + }); + + it("serializes concurrent mutations without losing updates", async () => { + const { loop } = makeLoop({ + steps: [ + { + kind: "message", + message: assistantToolCalls(toolCall("tc1", "job"), toolCall("tc2", "job")), + }, + { kind: "message", message: assistantText("done") }, + ], + runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }), + }); + + const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result; + + const [a, b] = await Promise.all([ + loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "r1" }).result, + loop.setToolResult(waiting.id, { toolCallId: "tc2", result: "r2" }).result, + ]); + + const final = [a, b].find((t) => deriveTurnStatus(t) === "completed"); + expect(final).toBeTruthy(); + expect(toolMessages(final!).map((m) => m.content).sort()).toEqual(["r1", "r2"]); + }); + + it("streams events in order; result resolves identically without a consumer", async () => { + const { loop } = makeLoop({ + steps: [ + { + kind: "message", + message: assistantToolCalls(toolCall("tc1", "calc")), + deltas: ["thinking..."], + }, + { kind: "message", message: assistantText("done"), deltas: ["d", "one"] }, + ], + }); + + const handle = loop.createTurn({ messages: [userMsg("go")] }); + const types: string[] = []; + for await (const event of handle.events) types.push(event.type); + + expect(types).toEqual([ + "text-delta", // thinking... + "tool-call", + "finish", + "tool-execution-start", + "tool-result", + "text-delta", "text-delta", + "finish", + ]); + const turn = await handle.result; + expect(deriveTurnStatus(turn)).toBe("completed"); + }); + + it("getTurn returns the persisted turn; unknown ids reject", async () => { + const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi") }] }); + const created = await loop.createTurn({ messages: [userMsg("hello")] }).result; + + const fetched = await loop.getTurn(created.id); + expect(fetched).toEqual(created); + await expect(loop.getTurn("missing")).rejects.toThrow("Turn not found"); + }); +}); diff --git a/apps/x/packages/core/src/agent-loop/agent-loop.ts b/apps/x/packages/core/src/agent-loop/agent-loop.ts new file mode 100644 index 00000000..5eb05b7c --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/agent-loop.ts @@ -0,0 +1,449 @@ +import crypto from "node:crypto"; +import { z } from "zod"; +import { ToolCallPart, ToolMessage } from "@x/shared/dist/message.js"; +import { EventStream } from "./event-stream.js"; +import type { ModelAdapter } from "./model-adapter.js"; +import type { PermissionGate } from "./permission-gate.js"; +import type { ToolRunner, ToolRunResult } from "./tool-runner.js"; +import type { TurnStore } from "./turn-store.js"; +import { + AgentLoopInput, + AgentLoopTurn, + deriveToolCallState, + unresolvedToolCalls, + type TurnEvent, +} from "./types.js"; + +const DEFAULT_MAX_ITERATIONS = 50; + +export type TurnHandle = { + id: string; + events: AsyncIterable; + result: Promise>; +}; + +export interface AgentLoop { + createTurn(input: z.infer): TurnHandle; + respondToPermission( + turnId: string, + toolCallId: string, + decision: "granted" | "denied", + reason?: string, + ): TurnHandle; + setToolResult(turnId: string, r: { toolCallId: string; result: unknown }): TurnHandle; + resumeTurn(turnId: string): TurnHandle; + getTurn(turnId: string): Promise>; + stopTurn(turnId: string): Promise>; +} + +// Serializes async work per key. Unlike the try-lock in runs/lock.ts, callers +// queue instead of failing — public mutations on the same turn run in order. +class TurnMutex { + private chains = new Map>(); + + run(key: string, fn: () => Promise): Promise { + const prev = this.chains.get(key) ?? Promise.resolve(); + const next = prev.then(fn, fn); + const tail: Promise = next + .catch(() => undefined) + .then(() => { + // Drop the entry once the chain is fully drained. + if (this.chains.get(key) === tail) this.chains.delete(key); + }); + this.chains.set(key, tail); + return next; + } +} + +function nowIso(): string { + return new Date().toISOString(); +} + +function stringifyToolResult(value: unknown): string { + if (typeof value === "string") return value; + const json = JSON.stringify(value); + return json === undefined ? String(value) : json; +} + +function toolMessage( + call: z.infer, + content: string, +): z.infer { + return { + role: "tool", + content, + toolCallId: call.toolCallId, + toolName: call.toolName, + }; +} + +export class AgentLoopImpl implements AgentLoop { + private store: TurnStore; + private modelAdapter: ModelAdapter; + private toolRunner: ToolRunner; + private permissionGate: PermissionGate; + private maxIterations: number; + private mutex = new TurnMutex(); + // All not-yet-finished entries per turn (running AND queued behind the + // mutex) — registered synchronously so stopTurn can never race past one. + private active = new Map>(); + + constructor(deps: { + store: TurnStore; + modelAdapter: ModelAdapter; + toolRunner: ToolRunner; + permissionGate: PermissionGate; + maxIterations?: number; + }) { + this.store = deps.store; + this.modelAdapter = deps.modelAdapter; + this.toolRunner = deps.toolRunner; + this.permissionGate = deps.permissionGate; + this.maxIterations = deps.maxIterations ?? DEFAULT_MAX_ITERATIONS; + } + + createTurn(input: z.infer): TurnHandle { + const turnId = crypto.randomUUID(); + return this.enter(turnId, async () => { + const parsed = AgentLoopInput.parse(input); + const now = nowIso(); + await this.store.create({ + id: turnId, + agentId: parsed.agentId ?? null, + provider: parsed.provider ?? null, + model: parsed.model ?? null, + permissionMode: parsed.permissionMode ?? "manual", + messages: parsed.messages, + permissionRequests: [], + permissionDecisions: [], + startedTools: [], + dispatchedTools: [], + error: null, + completedAt: null, + createdAt: now, + updatedAt: now, + }); + }); + } + + respondToPermission( + turnId: string, + toolCallId: string, + decision: "granted" | "denied", + reason?: string, + ): TurnHandle { + return this.enter(turnId, async () => { + const turn = await this.mustGet(turnId); + this.assertMutable(turn); + const state = deriveToolCallState(turn, toolCallId); + if (state !== "awaiting-user" && state !== "needs-classifier") { + throw new Error(`No open permission request for tool call: ${toolCallId}`); + } + turn.permissionDecisions.push({ + toolCallId, + decidedBy: "user", + decision, + reason: reason ?? null, + decidedAt: nowIso(), + }); + if (decision === "denied") { + // Denial resolves the call: decision + denial ToolMessage in one write. + const call = this.mustFindToolCall(turn, toolCallId); + turn.messages.push(toolMessage( + call, + `Permission denied by the user${reason ? `: ${reason}` : ""}`, + )); + } + await this.persist(turn); + }); + } + + setToolResult(turnId: string, r: { toolCallId: string; result: unknown }): TurnHandle { + return this.enter(turnId, async () => { + const turn = await this.mustGet(turnId); + this.assertMutable(turn); + const state = deriveToolCallState(turn, r.toolCallId); + if (state !== "dispatched") { + throw new Error(`Tool call is not awaiting an external result: ${r.toolCallId}`); + } + const call = this.mustFindToolCall(turn, r.toolCallId); + turn.messages.push(toolMessage(call, stringifyToolResult(r.result))); + await this.persist(turn); + }); + } + + resumeTurn(turnId: string): TurnHandle { + return this.enter(turnId, async () => { + await this.mustGet(turnId); + }); + } + + async getTurn(turnId: string): Promise> { + return this.mustGet(turnId); + } + + async stopTurn(turnId: string): Promise> { + for (const controller of this.active.get(turnId) ?? []) { + controller.abort(); + } + // Queue behind the in-flight advance so it has fully wound down. + return this.mutex.run(turnId, () => this.mustGet(turnId)); + } + + // ── internals ─────────────────────────────────────────────────────────── + + // Every loop-entering method: persist its fact (prepare), then re-enter the + // advance() reducer, all under the per-turn mutex. The handle's result + // resolves when the turn reaches a rest state. + private enter(turnId: string, prepare: () => Promise): TurnHandle { + const stream = new EventStream>(); + const controller = new AbortController(); + const controllers = this.active.get(turnId) ?? new Set(); + controllers.add(controller); + this.active.set(turnId, controllers); + void this.mutex.run(turnId, async () => { + try { + // prepare() persists the entry's fact even if already stopped; + // a stop aborts execution, never the recording of facts. + await prepare(); + await this.advance(turnId, stream, controller.signal); + stream.end(await this.mustGet(turnId)); + } catch (error) { + stream.fail(error); + } finally { + controllers.delete(controller); + if (controllers.size === 0) this.active.delete(turnId); + } + }); + return { id: turnId, events: stream, result: stream.result }; + } + + // The reducer. Driven purely by persisted facts: re-reads the turn from the + // store at the top of every iteration; no in-memory state is carried over. + // Crash recovery is free — this IS the resume function. + private async advance( + turnId: string, + stream: EventStream>, + signal: AbortSignal, + ): Promise { + for (let iteration = 0; iteration < this.maxIterations; iteration++) { + if (signal.aborted) return; + const turn = await this.mustGet(turnId); + + // 1. terminal states + if (turn.error !== null || turn.completedAt !== null) return; + + const unresolved = unresolvedToolCalls(turn); + const stateOf = new Map(unresolved.map((call) => [ + call.toolCallId, + deriveToolCallState(turn, call.toolCallId), + ])); + + // 2. waiting → stop. The "never call the model while anything is + // waiting" invariant lives here and only here. + if ([...stateOf.values()].some((s) => s === "awaiting-user" || s === "dispatched")) { + return; + } + + try { + // 3. classifier (auto mode): classify all, persist in one write + const needsClassifier = unresolved.filter( + (call) => stateOf.get(call.toolCallId) === "needs-classifier", + ); + if (needsClassifier.length > 0) { + for (const call of needsClassifier) { + const request = turn.permissionRequests + .find((r) => r.toolCallId === call.toolCallId)?.request; + const verdict = await this.permissionGate.classify(call, request); + turn.permissionDecisions.push({ + toolCallId: call.toolCallId, + decidedBy: "classifier", + decision: verdict.decision, + reason: verdict.reason, + decidedAt: nowIso(), + }); + if (verdict.decision === "denied") { + turn.messages.push(toolMessage( + call, + `Permission denied by the auto-permission classifier: ${verdict.reason}`, + )); + } + } + await this.persist(turn); + continue; + } + + // 4. permission-not-yet-evaluated calls: batch-evaluate ALL of + // them and append all requests in ONE write — the user sees one + // approval moment, not N serial prompts. + const unevaluated = unresolved.filter( + (call) => stateOf.get(call.toolCallId) === "unevaluated", + ); + const executable = unresolved.filter( + (call) => stateOf.get(call.toolCallId) === "cleared", + ); + if (unevaluated.length > 0) { + const requested: string[] = []; + for (const call of unevaluated) { + const check = await this.permissionGate.check(call); + if (check.required) { + turn.permissionRequests.push({ + toolCallId: call.toolCallId, + request: check.request, + requestedAt: nowIso(), + }); + requested.push(call.toolCallId); + } else { + executable.push(call); + } + } + if (requested.length > 0) { + await this.persist(turn); + for (const toolCallId of requested) { + stream.push({ type: "permission-requested", toolCallId }); + } + continue; // re-derive: waiting (manual) or classifier (auto) + } + } + + // 5. interrupted calls (started, never resolved nor dispatched): + // never silently re-run a started tool — tell the model instead. + const interrupted = unresolved.filter( + (call) => stateOf.get(call.toolCallId) === "interrupted", + ); + if (interrupted.length > 0) { + for (const call of interrupted) { + turn.messages.push(toolMessage( + call, + "Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran.", + )); + } + await this.persist(turn); + continue; + } + + // 6. execute cleared calls + if (executable.length > 0) { + for (const call of executable) { + signal.throwIfAborted(); + // Record the start fact BEFORE side effects — this is what + // makes resume side-effect-safe. + turn.startedTools.push({ + toolCallId: call.toolCallId, + startedAt: nowIso(), + }); + await this.persist(turn); + stream.push({ type: "tool-execution-start", toolCallId: call.toolCallId }); + + // Tool failures are conversational: a throwing runner + // becomes an error ToolMessage the model can react to, + // never a terminal turn error. Aborts still propagate. + const outcome = await this.toolRunner + .run(call, { turnId, signal }) + .catch((error: unknown): ToolRunResult => { + signal.throwIfAborted(); + return { + type: "error", + value: `Tool execution failed: ${error instanceof Error ? error.message : String(error)}`, + }; + }); + if (outcome.type === "pending") { + turn.dispatchedTools.push({ + toolCallId: call.toolCallId, + dispatchedAt: nowIso(), + }); + } else { + turn.messages.push(toolMessage(call, stringifyToolResult(outcome.value))); + } + await this.persist(turn); + if (outcome.type !== "pending") { + stream.push({ type: "tool-result", toolCallId: call.toolCallId }); + } + } + continue; + } + + // 7. nothing unresolved + complete assistant message last → done + const last = turn.messages[turn.messages.length - 1]; + if (last && last.role === "assistant") { + turn.completedAt = nowIso(); + await this.persist(turn); + return; + } + + // 8. model call: accumulate in memory, commit only the complete + // AssistantMessage. Deltas flow to the handle, never to disk. + // The stream's result promise is the single source of failure: + // it rejects on model error AND on abort, and the catch below + // tells those apart via signal.aborted. + const modelStream = this.modelAdapter.stream({ + provider: turn.provider, + model: turn.model, + messages: turn.messages, + tools: this.toolRunner.definitions(), + signal, + }); + for await (const event of modelStream) { + stream.push(event); + } + const assistantMessage = await modelStream.result; + turn.messages.push(assistantMessage); + await this.persist(turn); + } catch (error) { + if (signal.aborted) return; // stopped: turn stays as persisted (idle) + await this.setTurnError(turnId, error); + return; + } + } + + // 9. iteration cap exceeded + await this.setTurnError(turnId, new Error( + `Agent loop exceeded ${this.maxIterations} iterations`, + )); + } + + private async setTurnError(turnId: string, error: unknown): Promise { + const turn = await this.mustGet(turnId); + turn.error = { + message: error instanceof Error ? error.message : String(error), + ...(error instanceof Error && error.cause !== undefined + ? { details: error.cause } + : {}), + at: nowIso(), + }; + await this.persist(turn); + } + + private async persist(turn: z.infer): Promise { + turn.updatedAt = nowIso(); + await this.store.update(turn); + } + + private async mustGet(turnId: string): Promise> { + const turn = await this.store.get(turnId); + if (!turn) throw new Error(`Turn not found: ${turnId}`); + return turn; + } + + private assertMutable(turn: z.infer): void { + if (turn.error !== null) { + throw new Error(`Turn has a terminal error: ${turn.id}`); + } + if (turn.completedAt !== null) { + throw new Error(`Turn is already completed: ${turn.id}`); + } + } + + private mustFindToolCall( + turn: z.infer, + toolCallId: string, + ): z.infer { + for (const msg of turn.messages) { + if (msg.role !== "assistant" || typeof msg.content === "string") continue; + for (const part of msg.content) { + if (part.type === "tool-call" && part.toolCallId === toolCallId) return part; + } + } + throw new Error(`Tool call not found in transcript: ${toolCallId}`); + } +} diff --git a/apps/x/packages/core/src/agent-loop/event-stream.test.ts b/apps/x/packages/core/src/agent-loop/event-stream.test.ts new file mode 100644 index 00000000..805b895a --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/event-stream.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, it } from "vitest"; +import { EventStream } from "./event-stream.js"; + +async function collect(iterable: AsyncIterable): Promise { + const items: T[] = []; + for await (const item of iterable) items.push(item); + return items; +} + +describe("EventStream", () => { + it("yields pushed events and completes on end()", async () => { + const stream = new EventStream(); + const collecting = collect(stream); + + stream.push(1); + stream.push(2); + stream.end("done"); + + expect(await collecting).toEqual([1, 2]); + expect(await stream.result).toBe("done"); + }); + + it("delivers events buffered before iteration starts", async () => { + const stream = new EventStream(); + stream.push(1); + stream.push(2); + stream.end("done"); + + expect(await collect(stream)).toEqual([1, 2]); + }); + + it("resolves result without any event consumer", async () => { + const stream = new EventStream(); + stream.push(1); + stream.end("done"); + + expect(await stream.result).toBe("done"); + }); + + it("rejects result and terminates iteration on fail()", async () => { + const stream = new EventStream(); + const collecting = collect(stream); + + stream.push(1); + stream.fail(new Error("boom")); + + expect(await collecting).toEqual([1]); + await expect(stream.result).rejects.toThrow("boom"); + }); + + it("ignores pushes after completion", async () => { + const stream = new EventStream(); + stream.end("done"); + stream.push(99); + + expect(await collect(stream)).toEqual([]); + }); +}); diff --git a/apps/x/packages/core/src/agent-loop/event-stream.ts b/apps/x/packages/core/src/agent-loop/event-stream.ts new file mode 100644 index 00000000..1f1db7a9 --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/event-stream.ts @@ -0,0 +1,61 @@ +// Tiny EventStream: push events, complete with a result. +// Consumers can iterate events (streaming) or just await the result +// (await-to-rest); consuming events is never required for correctness. + +export class EventStream implements AsyncIterable { + private buffer: TEvent[] = []; + private waiters: Array<() => void> = []; + private done = false; + + readonly result: Promise; + private resolveResult!: (value: TResult) => void; + private rejectResult!: (error: unknown) => void; + + constructor() { + this.result = new Promise((resolve, reject) => { + this.resolveResult = resolve; + this.rejectResult = reject; + }); + // Mark rejections as handled so callers that only iterate events (or + // ignore the handle entirely) don't trigger unhandled-rejection noise. + // Awaiting `result` still rejects as expected. + this.result.catch(() => undefined); + } + + push(event: TEvent): void { + if (this.done) return; + this.buffer.push(event); + this.wake(); + } + + end(result: TResult): void { + if (this.done) return; + this.done = true; + this.resolveResult(result); + this.wake(); + } + + fail(error: unknown): void { + if (this.done) return; + this.done = true; + this.rejectResult(error); + this.wake(); + } + + private wake(): void { + const waiters = this.waiters; + this.waiters = []; + for (const waiter of waiters) waiter(); + } + + async *[Symbol.asyncIterator](): AsyncIterator { + let index = 0; + for (;;) { + while (index < this.buffer.length) { + yield this.buffer[index++]; + } + if (this.done) return; + await new Promise((resolve) => this.waiters.push(resolve)); + } + } +} diff --git a/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts b/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts new file mode 100644 index 00000000..61e0802e --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/in-memory-turn-store.ts @@ -0,0 +1,26 @@ +import { z } from "zod"; +import { AgentLoopTurn } from "./types.js"; +import type { TurnStore } from "./turn-store.js"; + +export class InMemoryTurnStore implements TurnStore { + private turns = new Map>(); + + async create(turn: z.infer): Promise { + if (this.turns.has(turn.id)) { + throw new Error(`Turn already exists: ${turn.id}`); + } + this.turns.set(turn.id, structuredClone(turn)); + } + + async get(id: string): Promise | null> { + const turn = this.turns.get(id); + return turn ? structuredClone(turn) : null; + } + + async update(turn: z.infer): Promise { + if (!this.turns.has(turn.id)) { + throw new Error(`Turn not found: ${turn.id}`); + } + this.turns.set(turn.id, structuredClone(turn)); + } +} diff --git a/apps/x/packages/core/src/agent-loop/index.ts b/apps/x/packages/core/src/agent-loop/index.ts new file mode 100644 index 00000000..035ff1ec --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/index.ts @@ -0,0 +1,9 @@ +export * from "./types.js"; +export * from "./event-stream.js"; +export * from "./turn-store.js"; +export * from "./in-memory-turn-store.js"; +export * from "./tool-runner.js"; +export * from "./permission-gate.js"; +export * from "./model-adapter.js"; +export * from "./agent-loop.js"; +export * from "./sqlite-turn-store.js"; diff --git a/apps/x/packages/core/src/agent-loop/model-adapter.ts b/apps/x/packages/core/src/agent-loop/model-adapter.ts new file mode 100644 index 00000000..a9a90a93 --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/model-adapter.ts @@ -0,0 +1,141 @@ +import { jsonSchema, stepCountIs, streamText, tool, type ToolSet } from "ai"; +import { z } from "zod"; +import { + AssistantContentPart, + AssistantMessage, + MessageList, + ToolCallPart, +} from "@x/shared/dist/message.js"; +import { convertFromMessages } from "../agents/runtime.js"; +import { createProvider } from "../models/models.js"; +import { resolveProviderConfig } from "../models/defaults.js"; +import { EventStream } from "./event-stream.js"; +import type { ModelStreamEvent, ToolDefinition } from "./types.js"; + +export type ModelStreamRequest = { + provider: string | null; + model: string | null; + messages: z.infer; + tools: ToolDefinition[]; + signal: AbortSignal; +}; + +// Streams one model step. Iterate for deltas, or just await `.result` for the +// final complete AssistantMessage. The loop commits only the complete message; +// deltas are never persisted. +// +// Contract: `.result` is authoritative — it MUST resolve with the complete +// message or reject on failure/abort (the loop distinguishes the two via its +// own AbortSignal). `error` events are observational only; the loop ignores +// them. +export interface ModelAdapter { + stream(req: ModelStreamRequest): EventStream>; +} + +// Thin adapter over the existing provider factory + Vercel AI SDK streamText. +// All retry/failover policy stays out of the agent loop; if a step fails, the +// stream emits an `error` event and the loop records a turn-level error. +export class VercelModelAdapter implements ModelAdapter { + stream(req: ModelStreamRequest): EventStream> { + const out = new EventStream>(); + void this.run(req, out).catch((error: unknown) => { + out.push({ type: "error", error }); + out.fail(error); + }); + return out; + } + + private async run( + req: ModelStreamRequest, + out: EventStream>, + ): Promise { + if (!req.provider || !req.model) { + throw new Error("Agent loop turn has no provider/model configured"); + } + const providerConfig = await resolveProviderConfig(req.provider); + const provider = createProvider(providerConfig); + const model = provider.languageModel(req.model); + + const tools: ToolSet = {}; + for (const def of req.tools) { + tools[def.name] = tool({ + ...(def.description ? { description: def.description } : {}), + inputSchema: jsonSchema( + (def.inputSchema ?? { type: "object", properties: {} }) as Parameters[0], + ), + }); + } + + const result = streamText({ + model, + messages: convertFromMessages(req.messages), + tools, + stopWhen: stepCountIs(1), + abortSignal: req.signal, + }); + + // Accumulate complete assistant content parts in stream order. Deltas + // append into the current text/reasoning part; tool calls are discrete. + const parts: z.infer[] = []; + const lastPart = () => parts[parts.length - 1]; + + for await (const event of result.fullStream) { + req.signal.throwIfAborted(); + switch (event.type) { + case "text-delta": { + const last = lastPart(); + if (last?.type === "text") { + last.text += event.text; + } else { + parts.push({ type: "text", text: event.text }); + } + out.push({ type: "text-delta", delta: event.text }); + break; + } + case "reasoning-delta": { + const last = lastPart(); + if (last?.type === "reasoning") { + last.text += event.text; + } else { + parts.push({ type: "reasoning", text: event.text }); + } + out.push({ type: "reasoning-delta", delta: event.text }); + break; + } + case "tool-call": { + const toolCall: z.infer = { + type: "tool-call", + toolCallId: event.toolCallId, + toolName: event.toolName, + arguments: event.input, + }; + parts.push(toolCall); + out.push({ type: "tool-call", toolCall }); + break; + } + case "error": + throw event.error instanceof Error + ? event.error + : new Error(formatStreamError(event.error)); + default: + break; + } + } + + const message: z.infer = { + role: "assistant", + content: parts.length > 0 ? parts : "", + }; + out.push({ type: "finish", message }); + out.end(message); + } +} + +function formatStreamError(error: unknown): string { + if (typeof error === "string") return error; + try { + return JSON.stringify(error); + } catch { + return String(error); + } +} diff --git a/apps/x/packages/core/src/agent-loop/permission-gate.ts b/apps/x/packages/core/src/agent-loop/permission-gate.ts new file mode 100644 index 00000000..184084f1 --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/permission-gate.ts @@ -0,0 +1,22 @@ +import { z } from "zod"; +import { ToolCallPart } from "@x/shared/dist/message.js"; + +export type PermissionCheckResult = + | { required: false } + | { required: true; request: unknown }; + +export type PermissionClassification = { + decision: "granted" | "denied" | "abstained"; + reason: string; +}; + +// Decides whether a tool call needs user approval, and (in auto mode) +// classifies it. The real implementation (bridging getToolPermissionMetadata / +// classifyToolPermissions) is integration-phase work; v1 uses fakes in tests. +export interface PermissionGate { + check(toolCall: z.infer): Promise; + classify( + toolCall: z.infer, + request: unknown, + ): Promise; +} diff --git a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts new file mode 100644 index 00000000..455b2e54 --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.test.ts @@ -0,0 +1,139 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { sql } from "kysely"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { z } from "zod"; +import type { AgentLoopTurn } from "./types.js"; + +let tmpDir: string; +let workspaceDir: string; +let storageModule: typeof import("../storage/index.js") | null = null; + +beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "rowboat-agent-loop-test-")); + workspaceDir = path.join(tmpDir, "workspace"); + process.env.ROWBOAT_WORKDIR = workspaceDir; + vi.resetModules(); + // config.ts kicks off knowledge-repo init as an import side effect; mock it + // out so tests don't touch git (same pattern as storage/storage.test.ts). + vi.doMock("../knowledge/version_history.js", () => ({ + initRepo: vi.fn(async () => undefined), + })); + vi.doMock("../knowledge/deprecate_today_note.js", () => ({ + deprecateTodayNote: vi.fn(async () => undefined), + })); +}); + +afterEach(async () => { + if (storageModule) { + await storageModule.shutdownStorage().catch(() => undefined); + storageModule = null; + } + delete process.env.ROWBOAT_WORKDIR; + vi.doUnmock("../knowledge/version_history.js"); + vi.doUnmock("../knowledge/deprecate_today_note.js"); + vi.resetModules(); + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +async function loadStore() { + storageModule = await import("../storage/index.js"); + await storageModule.initStorage(); + const { SqliteTurnStore } = await import("./sqlite-turn-store.js"); + return { store: new SqliteTurnStore(storageModule.getDb()), db: storageModule.getDb() }; +} + +function sampleTurn(id: string): z.infer { + return { + id, + agentId: "agent-1", + provider: "openai", + model: "gpt-x", + permissionMode: "auto", + messages: [ + { role: "user", content: "hello" }, + { + role: "assistant", + content: [ + { type: "text", text: "let me check" }, + { type: "tool-call", toolCallId: "tc1", toolName: "read", arguments: { path: "/a" } }, + ], + }, + { role: "tool", content: "file contents", toolCallId: "tc1", toolName: "read" }, + ], + permissionRequests: [ + { toolCallId: "tc1", request: { fileAccess: ["/a"] }, requestedAt: "2026-06-12T00:00:00Z" }, + ], + permissionDecisions: [ + { + toolCallId: "tc1", + decidedBy: "classifier", + decision: "granted", + reason: "read-only access", + decidedAt: "2026-06-12T00:00:01Z", + }, + ], + startedTools: [{ toolCallId: "tc1", startedAt: "2026-06-12T00:00:02Z" }], + dispatchedTools: [], + error: null, + completedAt: null, + createdAt: "2026-06-12T00:00:00Z", + updatedAt: "2026-06-12T00:00:03Z", + }; +} + +describe("SqliteTurnStore", () => { + it("migration creates the agent_loop_turns table and index", async () => { + const { db } = await loadStore(); + + const tables = await sql<{ name: string }>` + select name from sqlite_master + where type = 'table' and name = 'agent_loop_turns' + `.execute(db); + expect(tables.rows).toHaveLength(1); + + const indexes = await sql<{ name: string }>` + select name from sqlite_master + where type = 'index' and name = 'agent_loop_turns_created_at_idx' + `.execute(db); + expect(indexes.rows).toHaveLength(1); + }); + + it("round-trips every column", async () => { + const { store } = await loadStore(); + const turn = sampleTurn("t1"); + + await store.create(turn); + expect(await store.get("t1")).toEqual(turn); + + const updated = { + ...turn, + dispatchedTools: [{ toolCallId: "tc1", dispatchedAt: "2026-06-12T00:00:04Z" }], + error: { message: "boom", code: "E1", details: { hint: "x" }, at: "2026-06-12T00:00:05Z" }, + completedAt: "2026-06-12T00:00:06Z", + updatedAt: "2026-06-12T00:00:06Z", + }; + await store.update(updated); + expect(await store.get("t1")).toEqual(updated); + }); + + it("returns null for unknown ids and rejects updates to missing turns", async () => { + const { store } = await loadStore(); + expect(await store.get("missing")).toBeNull(); + await expect(store.update(sampleTurn("missing"))).rejects.toThrow("Turn not found"); + }); + + it("fails loudly on a corrupted JSON column", async () => { + const { store, db } = await loadStore(); + await store.create(sampleTurn("t1")); + + await db + .updateTable("agent_loop_turns") + .set({ permission_decisions: JSON.stringify([{ bogus: true }]) }) + .where("id", "=", "t1") + .execute(); + + await expect(store.get("t1")).rejects.toThrow(); + }); +}); diff --git a/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts new file mode 100644 index 00000000..9ef201bd --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/sqlite-turn-store.ts @@ -0,0 +1,87 @@ +import type { Insertable, Kysely, Selectable } from "kysely"; +import { z } from "zod"; +import { MessageList } from "@x/shared/dist/message.js"; +import type { AgentLoopTurnsTable, Database } from "../storage/schema.js"; +import type { TurnStore } from "./turn-store.js"; +import { + AgentLoopError, + AgentLoopTurn, + DispatchedTool, + PermissionDecision, + PermissionMode, + PermissionRequest, + StartedTool, +} from "./types.js"; + +// Accepts a Kysely from the existing getDb(); it does not own the +// storage lifecycle (never calls initStorage()). Every JSON column is +// zod-parsed on read so schema drift fails loudly at the boundary. +export class SqliteTurnStore implements TurnStore { + constructor(private db: Kysely) {} + + async create(turn: z.infer): Promise { + await this.db + .insertInto("agent_loop_turns") + .values(toRow(turn)) + .execute(); + } + + async get(id: string): Promise | null> { + const row = await this.db + .selectFrom("agent_loop_turns") + .selectAll() + .where("id", "=", id) + .executeTakeFirst(); + return row ? fromRow(row) : null; + } + + async update(turn: z.infer): Promise { + const { id, ...rest } = toRow(turn); + const result = await this.db + .updateTable("agent_loop_turns") + .set(rest) + .where("id", "=", id) + .executeTakeFirst(); + if (result.numUpdatedRows === 0n) { + throw new Error(`Turn not found: ${id}`); + } + } +} + +function toRow(turn: z.infer): Insertable { + return { + id: turn.id, + agent_id: turn.agentId, + provider: turn.provider, + model: turn.model, + permission_mode: turn.permissionMode, + messages: JSON.stringify(turn.messages), + permission_requests: JSON.stringify(turn.permissionRequests), + permission_decisions: JSON.stringify(turn.permissionDecisions), + started_tools: JSON.stringify(turn.startedTools), + dispatched_tools: JSON.stringify(turn.dispatchedTools), + error: turn.error === null ? null : JSON.stringify(turn.error), + created_at: turn.createdAt, + updated_at: turn.updatedAt, + completed_at: turn.completedAt, + }; +} + +function fromRow(row: Selectable): z.infer { + return { + id: row.id, + agentId: row.agent_id, + provider: row.provider, + model: row.model, + permissionMode: PermissionMode.parse(row.permission_mode), + messages: MessageList.parse(JSON.parse(row.messages)), + permissionRequests: z.array(PermissionRequest).parse(JSON.parse(row.permission_requests)), + permissionDecisions: z.array(PermissionDecision).parse(JSON.parse(row.permission_decisions)), + startedTools: z.array(StartedTool).parse(JSON.parse(row.started_tools)), + dispatchedTools: z.array(DispatchedTool).parse(JSON.parse(row.dispatched_tools)), + error: row.error === null ? null : AgentLoopError.parse(JSON.parse(row.error)), + createdAt: row.created_at, + updatedAt: row.updated_at, + completedAt: row.completed_at, + }; +} diff --git a/apps/x/packages/core/src/agent-loop/tool-runner.ts b/apps/x/packages/core/src/agent-loop/tool-runner.ts new file mode 100644 index 00000000..900a5677 --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/tool-runner.ts @@ -0,0 +1,22 @@ +import { z } from "zod"; +import { ToolCallPart } from "@x/shared/dist/message.js"; +import type { ToolDefinition } from "./types.js"; + +export type ToolRunResult = + | { type: "result"; value: unknown } // → ToolMessage + | { type: "error"; value: unknown } // → ToolMessage (model sees it; NOT a turn error) + | { type: "pending" }; // → DispatchedTool; result arrives via setToolResult + +export type ToolRunContext = { + turnId: string; + signal: AbortSignal; +}; + +// Executes tool calls. The real implementation (bridging exec-tool.ts / MCP) +// is integration-phase work; v1 uses fakes in tests. +export interface ToolRunner { + // Tool definitions advertised to the model. Environment, not turn state: + // resume works because the loop is reconstructed with the same runner. + definitions(): ToolDefinition[]; + run(toolCall: z.infer, ctx: ToolRunContext): Promise; +} diff --git a/apps/x/packages/core/src/agent-loop/turn-store.ts b/apps/x/packages/core/src/agent-loop/turn-store.ts new file mode 100644 index 00000000..fc198f08 --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/turn-store.ts @@ -0,0 +1,10 @@ +import { z } from "zod"; +import { AgentLoopTurn } from "./types.js"; + +// Durable storage for turns. The per-turn mutex lives ABOVE the store (in +// AgentLoopImpl), not in it — stores only read and write whole turns. +export interface TurnStore { + create(turn: z.infer): Promise; + get(id: string): Promise | null>; + update(turn: z.infer): Promise; +} diff --git a/apps/x/packages/core/src/agent-loop/types.ts b/apps/x/packages/core/src/agent-loop/types.ts new file mode 100644 index 00000000..73634790 --- /dev/null +++ b/apps/x/packages/core/src/agent-loop/types.ts @@ -0,0 +1,192 @@ +import { z } from "zod"; +import { + AssistantMessage, + MessageList, + ToolCallPart, +} from "@x/shared/dist/message.js"; + +// ─── Persisted fact schemas ───────────────────────────────────────────────── +// +// A turn is five append-only fact logs + set-once scalars. Records are never +// mutated or deleted; every field records exactly one non-derivable fact. +// Everything else (status, per-call lifecycle) is derived. + +export const PermissionRequest = z.object({ + toolCallId: z.string(), + // What the user is approving (file access, command, ...). Computed from + // tool args by the PermissionGate, so it must be persisted to pin down + // exactly what was asked. + request: z.unknown(), + requestedAt: z.string(), +}); + +export const PermissionDecision = z.discriminatedUnion("decidedBy", [ + z.object({ + toolCallId: z.string(), + decidedBy: z.literal("user"), + decision: z.enum(["granted", "denied"]), + reason: z.string().nullable(), + decidedAt: z.string(), + }), + z.object({ + toolCallId: z.string(), + decidedBy: z.literal("classifier"), + decision: z.enum(["granted", "denied", "abstained"]), + reason: z.string(), + decidedAt: z.string(), + }), +]); + +export const StartedTool = z.object({ + toolCallId: z.string(), + startedAt: z.string(), +}); + +export const DispatchedTool = z.object({ + toolCallId: z.string(), + dispatchedAt: z.string(), +}); + +export const AgentLoopError = z.object({ + message: z.string(), + code: z.string().optional(), + details: z.unknown().optional(), + at: z.string(), +}); + +export const PermissionMode = z.enum(["manual", "auto"]); + +export const AgentLoopTurn = z.object({ + id: z.string(), + agentId: z.string().nullable(), + provider: z.string().nullable(), + model: z.string().nullable(), + permissionMode: PermissionMode, + + // append-only fact logs + messages: MessageList, + permissionRequests: z.array(PermissionRequest), + permissionDecisions: z.array(PermissionDecision), + startedTools: z.array(StartedTool), + dispatchedTools: z.array(DispatchedTool), + + // set-once scalars + error: AgentLoopError.nullable(), + completedAt: z.string().nullable(), + + createdAt: z.string(), + updatedAt: z.string(), +}); + +export const AgentLoopInput = z.object({ + agentId: z.string().nullable().optional(), + provider: z.string().nullable().optional(), + model: z.string().nullable().optional(), + permissionMode: PermissionMode.optional(), + // May include prior-conversation history; turns are self-contained by design. + messages: MessageList.min(1), +}); + +// ─── Tool definitions (environment, not turn state) ──────────────────────── + +export type ToolDefinition = { + name: string; + description?: string; + // JSON Schema for the tool input + inputSchema?: unknown; +}; + +// ─── Live (never persisted) event types ───────────────────────────────────── + +export type ModelStreamEvent = + | { type: "text-delta"; delta: string } + | { type: "reasoning-delta"; delta: string } + | { type: "tool-call"; toolCall: z.infer } + | { type: "finish"; message: z.infer } + | { type: "error"; error: unknown }; + +export type TurnEvent = + | ModelStreamEvent + | { type: "tool-execution-start"; toolCallId: string } + | { type: "tool-result"; toolCallId: string } + | { type: "permission-requested"; toolCallId: string }; + +// ─── Derived state ────────────────────────────────────────────────────────── + +export type TurnStatus = "waiting" | "completed" | "error" | "idle"; + +export type ToolCallState = + | "resolved" // matching ToolMessage exists — terminal + | "dispatched" // delegated; result arrives via setToolResult + | "interrupted" // started but never resolved nor dispatched (crash/abort) + | "needs-classifier" // open request, auto mode, classifier has not spoken + | "awaiting-user" // open request, waiting on a user decision + | "cleared" // terminal `granted` decision; ready to execute + | "unevaluated"; // no facts yet; permission gate has not been consulted + +export function toolCallParts( + turn: z.infer, +): z.infer[] { + const parts: z.infer[] = []; + for (const msg of turn.messages) { + if (msg.role !== "assistant" || typeof msg.content === "string") continue; + for (const part of msg.content) { + if (part.type === "tool-call") parts.push(part); + } + } + return parts; +} + +export function resolvedToolCallIds(turn: z.infer): Set { + const ids = new Set(); + for (const msg of turn.messages) { + if (msg.role === "tool") ids.add(msg.toolCallId); + } + return ids; +} + +export function unresolvedToolCalls( + turn: z.infer, +): z.infer[] { + const resolved = resolvedToolCallIds(turn); + return toolCallParts(turn).filter((part) => !resolved.has(part.toolCallId)); +} + +export function deriveToolCallState( + turn: z.infer, + toolCallId: string, +): ToolCallState { + if (resolvedToolCallIds(turn).has(toolCallId)) return "resolved"; + if (turn.dispatchedTools.some((t) => t.toolCallId === toolCallId)) return "dispatched"; + if (turn.startedTools.some((t) => t.toolCallId === toolCallId)) return "interrupted"; + + const request = turn.permissionRequests.find((r) => r.toolCallId === toolCallId); + if (request) { + const decisions = turn.permissionDecisions.filter((d) => d.toolCallId === toolCallId); + const terminal = decisions.find((d) => d.decision === "granted" || d.decision === "denied"); + if (terminal) { + // A denied call always has its denial ToolMessage appended atomically + // with the decision, so an unresolved terminal decision should be + // `granted` — but check explicitly: an unpaired denial (a buggy + // future writer) must never derive as executable. It falls back to + // awaiting-user, which self-heals via a fresh decision. + return terminal.decision === "granted" ? "cleared" : "awaiting-user"; + } + if (turn.permissionMode === "auto" && !decisions.some((d) => d.decidedBy === "classifier")) { + return "needs-classifier"; + } + return "awaiting-user"; + } + + return "unevaluated"; +} + +export function deriveTurnStatus(turn: z.infer): TurnStatus { + if (turn.error !== null) return "error"; + if (turn.completedAt !== null) return "completed"; + for (const call of unresolvedToolCalls(turn)) { + const state = deriveToolCallState(turn, call.toolCallId); + if (state === "awaiting-user" || state === "dispatched") return "waiting"; + } + return "idle"; +} diff --git a/apps/x/packages/core/src/storage/migrations.ts b/apps/x/packages/core/src/storage/migrations.ts index 6620d99f..0953df6b 100644 --- a/apps/x/packages/core/src/storage/migrations.ts +++ b/apps/x/packages/core/src/storage/migrations.ts @@ -20,6 +20,39 @@ const migrations: Record = { await db.schema.dropTable("storage_metadata").ifExists().execute(); }, }, + "2026-06-12_0002_agent_loop_turns": { + async up(db: MigrationDb): Promise { + await db.schema + .createTable("agent_loop_turns") + .ifNotExists() + .addColumn("id", "text", (col) => col.primaryKey()) + .addColumn("agent_id", "text") + .addColumn("provider", "text") + .addColumn("model", "text") + .addColumn("permission_mode", "text", (col) => col.notNull()) + .addColumn("messages", "text", (col) => col.notNull()) + .addColumn("permission_requests", "text", (col) => col.notNull()) + .addColumn("permission_decisions", "text", (col) => col.notNull()) + .addColumn("started_tools", "text", (col) => col.notNull()) + .addColumn("dispatched_tools", "text", (col) => col.notNull()) + .addColumn("error", "text") + .addColumn("created_at", "text", (col) => col.notNull()) + .addColumn("updated_at", "text", (col) => col.notNull()) + .addColumn("completed_at", "text") + .execute(); + + await db.schema + .createIndex("agent_loop_turns_created_at_idx") + .ifNotExists() + .on("agent_loop_turns") + .column("created_at") + .execute(); + }, + async down(db: MigrationDb): Promise { + await db.schema.dropIndex("agent_loop_turns_created_at_idx").ifExists().execute(); + await db.schema.dropTable("agent_loop_turns").ifExists().execute(); + }, + }, }; class InCodeMigrationProvider implements MigrationProvider { diff --git a/apps/x/packages/core/src/storage/schema.ts b/apps/x/packages/core/src/storage/schema.ts index 7a348fe0..781e4e14 100644 --- a/apps/x/packages/core/src/storage/schema.ts +++ b/apps/x/packages/core/src/storage/schema.ts @@ -8,6 +8,24 @@ export interface StorageMetadataTable { updated_at: TimestampColumn; } +export interface AgentLoopTurnsTable { + id: string; + agent_id: string | null; + provider: string | null; + model: string | null; + permission_mode: string; + messages: string; // JSON: MessageList + permission_requests: string; // JSON: PermissionRequest[] + permission_decisions: string; // JSON: PermissionDecision[] + started_tools: string; // JSON: StartedTool[] + dispatched_tools: string; // JSON: DispatchedTool[] + error: string | null; // JSON: AgentLoopError + created_at: TimestampColumn; + updated_at: TimestampColumn; + completed_at: string | null; +} + export interface Database { storage_metadata: StorageMetadataTable; + agent_loop_turns: AgentLoopTurnsTable; }