Add standalone agent-loop module with turn-based SQLite persistence

New @x/core module (src/agent-loop) — groundwork for replacing the JSONL
event runtime. Development-only; not wired into the app yet.

- Durable unit is a turn: append-only fact logs (messages, permission
  requests/decisions, started/dispatched tools) + set-once error/completedAt;
  status is derived from facts, never stored
- advance() reducer drives the loop purely from persisted state; crash
  recovery = resumeTurn() re-entering the reducer
- Permission gating as data: batched requests, user/classifier decisions
  with reasons, denials materialized as ToolMessages
- TurnHandle API per entry point: stream events or await the rest state
- Kysely migration (agent_loop_turns) + SqliteTurnStore with zod-parsed
  reads; InMemoryTurnStore for tests
- VercelModelAdapter over the existing createProvider()/streamText
- 33 vitest tests: reducer, permissions, classifier, crash recovery,
  abort, concurrency, SQLite round-trips

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Ramnique Singh 2026-06-12 18:20:19 +05:30
parent 883872064f
commit 655ebc77b9
15 changed files with 1959 additions and 0 deletions

View file

@ -0,0 +1,692 @@
import { describe, expect, it } from "vitest";
import { z } from "zod";
import {
AssistantMessage,
Message,
ToolCallPart,
} from "@x/shared/dist/message.js";
import { AgentLoopImpl } from "./agent-loop.js";
import { EventStream } from "./event-stream.js";
import { InMemoryTurnStore } from "./in-memory-turn-store.js";
import type { ModelAdapter, ModelStreamRequest } from "./model-adapter.js";
import type { PermissionClassification, PermissionGate } from "./permission-gate.js";
import type { ToolRunner, ToolRunResult } from "./tool-runner.js";
import type { TurnStore } from "./turn-store.js";
import {
AgentLoopTurn,
deriveTurnStatus,
type ModelStreamEvent,
} from "./types.js";
// ─── helpers ────────────────────────────────────────────────────────────────
function userMsg(text: string): z.infer<typeof Message> {
return { role: "user", content: text };
}
function assistantText(text: string): z.infer<typeof AssistantMessage> {
return { role: "assistant", content: text };
}
function toolCall(toolCallId: string, toolName: string): z.infer<typeof ToolCallPart> {
return { type: "tool-call", toolCallId, toolName, arguments: {} };
}
function assistantToolCalls(
...calls: z.infer<typeof ToolCallPart>[]
): z.infer<typeof AssistantMessage> {
return { role: "assistant", content: calls };
}
type ModelStep =
| { kind: "message"; message: z.infer<typeof AssistantMessage>; deltas?: string[] }
| { kind: "error"; error: unknown }
| { kind: "hang" };
class FakeModelAdapter implements ModelAdapter {
calls = 0;
constructor(private steps: ModelStep[]) {}
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>> {
this.calls++;
const out = new EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>();
const step = this.steps.shift();
void (async () => {
await Promise.resolve();
if (!step) {
const error = new Error("FakeModelAdapter: no scripted step left");
out.push({ type: "error", error });
out.fail(error);
return;
}
if (step.kind === "hang") {
out.push({ type: "text-delta", delta: "partial" });
// Mirror VercelModelAdapter's abort behavior: push an error
// event AND fail the result (the loop must rely only on the
// latter — regression for the stop-bricks-turn bug).
req.signal.addEventListener("abort", () => {
const error = new Error("aborted");
out.push({ type: "error", error });
out.fail(error);
}, { once: true });
return;
}
if (step.kind === "error") {
out.push({ type: "error", error: step.error });
out.fail(step.error);
return;
}
for (const delta of step.deltas ?? []) {
out.push({ type: "text-delta", delta });
}
if (typeof step.message.content !== "string") {
for (const part of step.message.content) {
if (part.type === "tool-call") out.push({ type: "tool-call", toolCall: part });
}
}
out.push({ type: "finish", message: step.message });
out.end(step.message);
})();
return out;
}
}
class FakeToolRunner implements ToolRunner {
ran: string[] = [];
constructor(
private behaviors: Record<string, (call: z.infer<typeof ToolCallPart>) => ToolRunResult> = {},
) {}
definitions() {
return [];
}
async run(call: z.infer<typeof ToolCallPart>): Promise<ToolRunResult> {
this.ran.push(call.toolCallId);
const behavior = this.behaviors[call.toolName];
return behavior ? behavior(call) : { type: "result", value: `ok:${call.toolName}` };
}
}
class FakePermissionGate implements PermissionGate {
checkCalls = 0;
classifyCalls = 0;
constructor(
private opts: {
required?: (toolName: string) => boolean;
classify?: (call: z.infer<typeof ToolCallPart>) => PermissionClassification;
} = {},
) {}
async check(call: z.infer<typeof ToolCallPart>) {
this.checkCalls++;
return this.opts.required?.(call.toolName)
? { required: true as const, request: { tool: call.toolName } }
: { required: false as const };
}
async classify(call: z.infer<typeof ToolCallPart>): Promise<PermissionClassification> {
this.classifyCalls++;
return this.opts.classify
? this.opts.classify(call)
: { decision: "abstained", reason: "unsure" };
}
}
// Records a snapshot per store write so tests can assert write batching.
class SnapshottingStore implements TurnStore {
private inner = new InMemoryTurnStore();
snapshots: z.infer<typeof AgentLoopTurn>[] = [];
async create(turn: z.infer<typeof AgentLoopTurn>) {
this.snapshots.push(structuredClone(turn));
await this.inner.create(turn);
}
async get(id: string) {
return this.inner.get(id);
}
async update(turn: z.infer<typeof AgentLoopTurn>) {
this.snapshots.push(structuredClone(turn));
await this.inner.update(turn);
}
}
function makeLoop(opts: {
steps?: ModelStep[];
runner?: FakeToolRunner;
gate?: FakePermissionGate;
store?: TurnStore;
maxIterations?: number;
} = {}) {
const store = opts.store ?? new InMemoryTurnStore();
const adapter = new FakeModelAdapter(opts.steps ?? []);
const runner = opts.runner ?? new FakeToolRunner();
const gate = opts.gate ?? new FakePermissionGate();
const loop = new AgentLoopImpl({
store,
modelAdapter: adapter,
toolRunner: runner,
permissionGate: gate,
...(opts.maxIterations !== undefined ? { maxIterations: opts.maxIterations } : {}),
});
return { loop, store, adapter, runner, gate };
}
function emptyTurn(
id: string,
overrides: Partial<z.infer<typeof AgentLoopTurn>> = {},
): z.infer<typeof AgentLoopTurn> {
const now = new Date().toISOString();
return {
id,
agentId: null,
provider: null,
model: null,
permissionMode: "manual",
messages: [],
permissionRequests: [],
permissionDecisions: [],
startedTools: [],
dispatchedTools: [],
error: null,
completedAt: null,
createdAt: now,
updatedAt: now,
...overrides,
};
}
function toolMessages(turn: z.infer<typeof AgentLoopTurn>) {
return turn.messages.filter((m) => m.role === "tool");
}
// ─── tests ──────────────────────────────────────────────────────────────────
describe("AgentLoopImpl", () => {
it("completes a simple text turn and persists input metadata", async () => {
const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi!") }] });
const turn = await loop.createTurn({
agentId: "a1",
provider: "openai",
model: "gpt-x",
messages: [userMsg("hello")],
}).result;
expect(turn.agentId).toBe("a1");
expect(turn.provider).toBe("openai");
expect(turn.model).toBe("gpt-x");
expect(turn.permissionMode).toBe("manual");
expect(turn.messages).toEqual([userMsg("hello"), assistantText("hi!")]);
expect(turn.completedAt).not.toBeNull();
expect(deriveTurnStatus(turn)).toBe("completed");
});
it("rejects a turn with no input messages", async () => {
const { loop } = makeLoop();
await expect(loop.createTurn({ messages: [] }).result).rejects.toThrow();
});
it("runs tool calls: result and error both append ToolMessages and continue", async () => {
const calls = [toolCall("tc1", "good"), toolCall("tc2", "bad")];
const { loop, runner } = makeLoop({
steps: [
{ kind: "message", message: assistantToolCalls(...calls) },
{ kind: "message", message: assistantText("done") },
],
runner: new FakeToolRunner({
good: () => ({ type: "result", value: { answer: 42 } }),
bad: () => ({ type: "error", value: "boom" }),
}),
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
expect(runner.ran).toEqual(["tc1", "tc2"]);
expect(toolMessages(turn)).toEqual([
{ role: "tool", content: '{"answer":42}', toolCallId: "tc1", toolName: "good" },
{ role: "tool", content: "boom", toolCallId: "tc2", toolName: "bad" },
]);
// tool error is NOT a turn error
expect(turn.error).toBeNull();
expect(deriveTurnStatus(turn)).toBe("completed");
expect(turn.startedTools.map((t) => t.toolCallId)).toEqual(["tc1", "tc2"]);
});
it("suspends on a pending tool and resumes via setToolResult", async () => {
const { loop, adapter } = makeLoop({
steps: [
{ kind: "message", message: assistantToolCalls(toolCall("tc1", "ask-human")) },
{ kind: "message", message: assistantText("thanks") },
],
runner: new FakeToolRunner({ "ask-human": () => ({ type: "pending" }) }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
expect(deriveTurnStatus(waiting)).toBe("waiting");
expect(waiting.dispatchedTools.map((t) => t.toolCallId)).toEqual(["tc1"]);
expect(adapter.calls).toBe(1); // no model call while waiting
const done = await loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "blue" }).result;
expect(toolMessages(done)).toEqual([
{ role: "tool", content: "blue", toolCallId: "tc1", toolName: "ask-human" },
]);
expect(deriveTurnStatus(done)).toBe("completed");
});
it("requires all dispatched results before the loop continues", async () => {
const { loop, adapter } = makeLoop({
steps: [
{
kind: "message",
message: assistantToolCalls(toolCall("tc1", "job"), toolCall("tc2", "job")),
},
{ kind: "message", message: assistantText("done") },
],
runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
expect(waiting.dispatchedTools).toHaveLength(2);
const stillWaiting = await loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "r1" }).result;
expect(deriveTurnStatus(stillWaiting)).toBe("waiting");
expect(adapter.calls).toBe(1);
const done = await loop.setToolResult(waiting.id, { toolCallId: "tc2", result: "r2" }).result;
expect(deriveTurnStatus(done)).toBe("completed");
expect(adapter.calls).toBe(2);
});
it("rejects setToolResult for a tool call that is not awaiting an external result", async () => {
const { loop } = makeLoop({
steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "job")) }],
runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
await expect(
loop.setToolResult(waiting.id, { toolCallId: "bogus", result: "x" }).result,
).rejects.toThrow("not awaiting an external result");
});
describe("permissions (manual mode)", () => {
it("batch-creates all permission requests in one store write", async () => {
const store = new SnapshottingStore();
const { loop } = makeLoop({
store,
steps: [{
kind: "message",
message: assistantToolCalls(toolCall("tc1", "write"), toolCall("tc2", "exec")),
}],
gate: new FakePermissionGate({ required: () => true }),
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
expect(deriveTurnStatus(turn)).toBe("waiting");
expect(turn.permissionRequests.map((r) => r.toolCallId)).toEqual(["tc1", "tc2"]);
// no snapshot with exactly one request — both landed in a single write
const counts = store.snapshots.map((s) => s.permissionRequests.length);
expect(counts).not.toContain(1);
expect(counts).toContain(2);
});
it("executes on grant; denial appends decision + ToolMessage atomically", async () => {
const store = new SnapshottingStore();
const { loop, runner } = makeLoop({
store,
steps: [
{
kind: "message",
message: assistantToolCalls(toolCall("tc1", "write"), toolCall("tc2", "exec")),
},
{ kind: "message", message: assistantText("done") },
],
gate: new FakePermissionGate({ required: () => true }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
const afterDeny = await loop
.respondToPermission(waiting.id, "tc2", "denied", "nope").result;
expect(deriveTurnStatus(afterDeny)).toBe("waiting"); // tc1 still open
expect(toolMessages(afterDeny)).toEqual([
{ role: "tool", content: "Permission denied by the user: nope", toolCallId: "tc2", toolName: "exec" },
]);
// decision + denial ToolMessage landed in the same write
const denyWrite = store.snapshots.find((s) =>
s.permissionDecisions.some((d) => d.toolCallId === "tc2"));
expect(denyWrite?.messages.some((m) => m.role === "tool" && m.toolCallId === "tc2")).toBe(true);
const done = await loop.respondToPermission(waiting.id, "tc1", "granted").result;
expect(runner.ran).toEqual(["tc1"]); // denied tool never ran
expect(deriveTurnStatus(done)).toBe("completed");
expect(done.permissionDecisions).toHaveLength(2);
expect(done.permissionDecisions.every((d) => d.decidedBy === "user")).toBe(true);
});
it("rejects a response for a call without an open request", async () => {
const { loop } = makeLoop({
steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }],
gate: new FakePermissionGate({ required: () => true }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
await expect(
loop.respondToPermission(waiting.id, "bogus", "granted").result,
).rejects.toThrow("No open permission request");
});
it("does not re-ask after a grant: granted fact persists across resume", async () => {
const store = new InMemoryTurnStore();
const gate = new FakePermissionGate({ required: () => true });
const { loop, runner } = makeLoop({
store,
gate,
steps: [{ kind: "message", message: assistantText("done") }],
});
// crafted state: granted decision persisted, tool not yet started
await store.create(emptyTurn("t1", {
messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "write"))],
permissionRequests: [{ toolCallId: "tc1", request: { tool: "write" }, requestedAt: "2026-06-12T00:00:00Z" }],
permissionDecisions: [{
toolCallId: "tc1", decidedBy: "user", decision: "granted",
reason: null, decidedAt: "2026-06-12T00:00:01Z",
}],
}));
const turn = await loop.resumeTurn("t1").result;
expect(gate.checkCalls).toBe(0); // never re-evaluated
expect(runner.ran).toEqual(["tc1"]);
expect(deriveTurnStatus(turn)).toBe("completed");
});
it("never calls the classifier in manual mode", async () => {
const gate = new FakePermissionGate({ required: () => true });
const { loop } = makeLoop({
gate,
steps: [{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) }],
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
expect(deriveTurnStatus(turn)).toBe("waiting");
expect(gate.classifyCalls).toBe(0);
});
});
describe("permissions (auto mode / classifier)", () => {
it("applies classifier grant and deny with reasons", async () => {
const { loop, runner } = makeLoop({
steps: [
{
kind: "message",
message: assistantToolCalls(toolCall("tc1", "read"), toolCall("tc2", "exec")),
},
{ kind: "message", message: assistantText("done") },
],
gate: new FakePermissionGate({
required: () => true,
classify: (call) => call.toolName === "read"
? { decision: "granted", reason: "read-only" }
: { decision: "denied", reason: "destructive" },
}),
});
const turn = await loop.createTurn({
messages: [userMsg("go")],
permissionMode: "auto",
}).result;
expect(deriveTurnStatus(turn)).toBe("completed");
expect(runner.ran).toEqual(["tc1"]);
expect(turn.permissionDecisions).toEqual([
expect.objectContaining({ toolCallId: "tc1", decidedBy: "classifier", decision: "granted", reason: "read-only" }),
expect.objectContaining({ toolCallId: "tc2", decidedBy: "classifier", decision: "denied", reason: "destructive" }),
]);
expect(toolMessages(turn).find((m) => m.toolCallId === "tc2")?.content)
.toBe("Permission denied by the auto-permission classifier: destructive");
});
it("abstain waits on the user, persists, and is not re-run on resume", async () => {
const gate = new FakePermissionGate({
required: () => true,
classify: () => ({ decision: "abstained", reason: "unsure" }),
});
const { loop, runner } = makeLoop({
gate,
steps: [
{ kind: "message", message: assistantToolCalls(toolCall("tc1", "write")) },
{ kind: "message", message: assistantText("done") },
],
});
const waiting = await loop.createTurn({
messages: [userMsg("go")],
permissionMode: "auto",
}).result;
expect(deriveTurnStatus(waiting)).toBe("waiting");
expect(gate.classifyCalls).toBe(1);
// resume must not re-run the classifier — the abstention is a fact
const stillWaiting = await loop.resumeTurn(waiting.id).result;
expect(gate.classifyCalls).toBe(1);
expect(deriveTurnStatus(stillWaiting)).toBe("waiting");
// user can decide after an abstention
const done = await loop.respondToPermission(waiting.id, "tc1", "granted").result;
expect(runner.ran).toEqual(["tc1"]);
expect(deriveTurnStatus(done)).toBe("completed");
});
});
describe("crash recovery", () => {
it("resume treats started-but-unresolved tools as interrupted; never re-runs", async () => {
const store = new InMemoryTurnStore();
const { loop, runner } = makeLoop({
store,
steps: [{ kind: "message", message: assistantText("recovered") }],
});
await store.create(emptyTurn("t1", {
messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "send-email"))],
startedTools: [{ toolCallId: "tc1", startedAt: "2026-06-12T00:00:00Z" }],
}));
const turn = await loop.resumeTurn("t1").result;
expect(runner.ran).toEqual([]); // NOT re-run
expect(toolMessages(turn)[0]?.content).toContain("interrupted");
expect(deriveTurnStatus(turn)).toBe("completed");
});
it("resume runs tools that were committed but never started", async () => {
const store = new InMemoryTurnStore();
const { loop, runner } = makeLoop({
store,
steps: [{ kind: "message", message: assistantText("done") }],
});
await store.create(emptyTurn("t1", {
messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "calc"))],
}));
const turn = await loop.resumeTurn("t1").result;
expect(runner.ran).toEqual(["tc1"]);
expect(deriveTurnStatus(turn)).toBe("completed");
});
});
it("stopTurn mid-stream persists no partial message; turn is idle and resumable", async () => {
const { loop } = makeLoop({
steps: [
{ kind: "hang" },
{ kind: "message", message: assistantText("second try") },
],
});
const handle = loop.createTurn({ messages: [userMsg("go")] });
// wait for the first streamed delta, then stop
const iterator = handle.events[Symbol.asyncIterator]();
const first = await iterator.next();
expect(first.value).toEqual({ type: "text-delta", delta: "partial" });
const stopped = await loop.stopTurn(handle.id);
expect(stopped.messages).toEqual([userMsg("go")]); // no partial persisted
expect(stopped.error).toBeNull();
expect(deriveTurnStatus(stopped)).toBe("idle");
await expect(handle.result).resolves.toBeTruthy(); // original handle also rests
const resumed = await loop.resumeTurn(handle.id).result;
expect(resumed.messages).toEqual([userMsg("go"), assistantText("second try")]);
expect(deriveTurnStatus(resumed)).toBe("completed");
});
it("stopTurn immediately after createTurn aborts the queued advance", async () => {
const { loop, adapter } = makeLoop({
steps: [{ kind: "message", message: assistantText("should not run") }],
});
// No await between create and stop — the advance is still queued
// behind the mutex and its controller must already be abortable.
const handle = loop.createTurn({ messages: [userMsg("go")] });
const stopped = await loop.stopTurn(handle.id);
expect(adapter.calls).toBe(0); // model never called
expect(stopped.messages).toEqual([userMsg("go")]); // input fact persisted
expect(deriveTurnStatus(stopped)).toBe("idle");
const resumed = await loop.resumeTurn(handle.id).result;
expect(deriveTurnStatus(resumed)).toBe("completed");
});
it("a throwing tool runner becomes an error ToolMessage, not a turn error", async () => {
const { loop } = makeLoop({
steps: [
{ kind: "message", message: assistantToolCalls(toolCall("tc1", "flaky")) },
{ kind: "message", message: assistantText("recovered") },
],
runner: new FakeToolRunner({
flaky: () => { throw new Error("ECONNRESET"); },
}),
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
expect(turn.error).toBeNull();
expect(toolMessages(turn)[0]?.content).toBe("Tool execution failed: ECONNRESET");
expect(deriveTurnStatus(turn)).toBe("completed");
});
it("an unpaired denied decision never derives as executable", async () => {
const store = new InMemoryTurnStore();
const { loop, runner } = makeLoop({ store });
// anomalous state: denied decision WITHOUT its paired denial ToolMessage
await store.create(emptyTurn("t1", {
messages: [userMsg("go"), assistantToolCalls(toolCall("tc1", "write"))],
permissionRequests: [{ toolCallId: "tc1", request: {}, requestedAt: "2026-06-12T00:00:00Z" }],
permissionDecisions: [{
toolCallId: "tc1", decidedBy: "user", decision: "denied",
reason: null, decidedAt: "2026-06-12T00:00:01Z",
}],
}));
const turn = await loop.resumeTurn("t1").result;
expect(runner.ran).toEqual([]); // the denied tool must NOT execute
expect(deriveTurnStatus(turn)).toBe("waiting");
});
it("model failure is a terminal turn error; further mutations reject", async () => {
const { loop } = makeLoop({ steps: [{ kind: "error", error: new Error("rate limited") }] });
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
expect(deriveTurnStatus(turn)).toBe("error");
expect(turn.error?.message).toBe("rate limited");
await expect(
loop.setToolResult(turn.id, { toolCallId: "x", result: "y" }).result,
).rejects.toThrow("terminal error");
await expect(
loop.respondToPermission(turn.id, "x", "granted").result,
).rejects.toThrow("terminal error");
});
it("errors out when the iteration cap is exceeded", async () => {
const { loop } = makeLoop({
maxIterations: 3,
steps: [
{ kind: "message", message: assistantToolCalls(toolCall("tc1", "calc")) },
{ kind: "message", message: assistantToolCalls(toolCall("tc2", "calc")) },
],
});
const turn = await loop.createTurn({ messages: [userMsg("go")] }).result;
expect(deriveTurnStatus(turn)).toBe("error");
expect(turn.error?.message).toContain("exceeded 3 iterations");
});
it("serializes concurrent mutations without losing updates", async () => {
const { loop } = makeLoop({
steps: [
{
kind: "message",
message: assistantToolCalls(toolCall("tc1", "job"), toolCall("tc2", "job")),
},
{ kind: "message", message: assistantText("done") },
],
runner: new FakeToolRunner({ job: () => ({ type: "pending" }) }),
});
const waiting = await loop.createTurn({ messages: [userMsg("go")] }).result;
const [a, b] = await Promise.all([
loop.setToolResult(waiting.id, { toolCallId: "tc1", result: "r1" }).result,
loop.setToolResult(waiting.id, { toolCallId: "tc2", result: "r2" }).result,
]);
const final = [a, b].find((t) => deriveTurnStatus(t) === "completed");
expect(final).toBeTruthy();
expect(toolMessages(final!).map((m) => m.content).sort()).toEqual(["r1", "r2"]);
});
it("streams events in order; result resolves identically without a consumer", async () => {
const { loop } = makeLoop({
steps: [
{
kind: "message",
message: assistantToolCalls(toolCall("tc1", "calc")),
deltas: ["thinking..."],
},
{ kind: "message", message: assistantText("done"), deltas: ["d", "one"] },
],
});
const handle = loop.createTurn({ messages: [userMsg("go")] });
const types: string[] = [];
for await (const event of handle.events) types.push(event.type);
expect(types).toEqual([
"text-delta", // thinking...
"tool-call",
"finish",
"tool-execution-start",
"tool-result",
"text-delta", "text-delta",
"finish",
]);
const turn = await handle.result;
expect(deriveTurnStatus(turn)).toBe("completed");
});
it("getTurn returns the persisted turn; unknown ids reject", async () => {
const { loop } = makeLoop({ steps: [{ kind: "message", message: assistantText("hi") }] });
const created = await loop.createTurn({ messages: [userMsg("hello")] }).result;
const fetched = await loop.getTurn(created.id);
expect(fetched).toEqual(created);
await expect(loop.getTurn("missing")).rejects.toThrow("Turn not found");
});
});

View file

@ -0,0 +1,449 @@
import crypto from "node:crypto";
import { z } from "zod";
import { ToolCallPart, ToolMessage } from "@x/shared/dist/message.js";
import { EventStream } from "./event-stream.js";
import type { ModelAdapter } from "./model-adapter.js";
import type { PermissionGate } from "./permission-gate.js";
import type { ToolRunner, ToolRunResult } from "./tool-runner.js";
import type { TurnStore } from "./turn-store.js";
import {
AgentLoopInput,
AgentLoopTurn,
deriveToolCallState,
unresolvedToolCalls,
type TurnEvent,
} from "./types.js";
const DEFAULT_MAX_ITERATIONS = 50;
export type TurnHandle = {
id: string;
events: AsyncIterable<TurnEvent>;
result: Promise<z.infer<typeof AgentLoopTurn>>;
};
export interface AgentLoop {
createTurn(input: z.infer<typeof AgentLoopInput>): TurnHandle;
respondToPermission(
turnId: string,
toolCallId: string,
decision: "granted" | "denied",
reason?: string,
): TurnHandle;
setToolResult(turnId: string, r: { toolCallId: string; result: unknown }): TurnHandle;
resumeTurn(turnId: string): TurnHandle;
getTurn(turnId: string): Promise<z.infer<typeof AgentLoopTurn>>;
stopTurn(turnId: string): Promise<z.infer<typeof AgentLoopTurn>>;
}
// Serializes async work per key. Unlike the try-lock in runs/lock.ts, callers
// queue instead of failing — public mutations on the same turn run in order.
class TurnMutex {
private chains = new Map<string, Promise<unknown>>();
run<T>(key: string, fn: () => Promise<T>): Promise<T> {
const prev = this.chains.get(key) ?? Promise.resolve();
const next = prev.then(fn, fn);
const tail: Promise<void> = next
.catch(() => undefined)
.then(() => {
// Drop the entry once the chain is fully drained.
if (this.chains.get(key) === tail) this.chains.delete(key);
});
this.chains.set(key, tail);
return next;
}
}
function nowIso(): string {
return new Date().toISOString();
}
function stringifyToolResult(value: unknown): string {
if (typeof value === "string") return value;
const json = JSON.stringify(value);
return json === undefined ? String(value) : json;
}
function toolMessage(
call: z.infer<typeof ToolCallPart>,
content: string,
): z.infer<typeof ToolMessage> {
return {
role: "tool",
content,
toolCallId: call.toolCallId,
toolName: call.toolName,
};
}
export class AgentLoopImpl implements AgentLoop {
private store: TurnStore;
private modelAdapter: ModelAdapter;
private toolRunner: ToolRunner;
private permissionGate: PermissionGate;
private maxIterations: number;
private mutex = new TurnMutex();
// All not-yet-finished entries per turn (running AND queued behind the
// mutex) — registered synchronously so stopTurn can never race past one.
private active = new Map<string, Set<AbortController>>();
constructor(deps: {
store: TurnStore;
modelAdapter: ModelAdapter;
toolRunner: ToolRunner;
permissionGate: PermissionGate;
maxIterations?: number;
}) {
this.store = deps.store;
this.modelAdapter = deps.modelAdapter;
this.toolRunner = deps.toolRunner;
this.permissionGate = deps.permissionGate;
this.maxIterations = deps.maxIterations ?? DEFAULT_MAX_ITERATIONS;
}
createTurn(input: z.infer<typeof AgentLoopInput>): TurnHandle {
const turnId = crypto.randomUUID();
return this.enter(turnId, async () => {
const parsed = AgentLoopInput.parse(input);
const now = nowIso();
await this.store.create({
id: turnId,
agentId: parsed.agentId ?? null,
provider: parsed.provider ?? null,
model: parsed.model ?? null,
permissionMode: parsed.permissionMode ?? "manual",
messages: parsed.messages,
permissionRequests: [],
permissionDecisions: [],
startedTools: [],
dispatchedTools: [],
error: null,
completedAt: null,
createdAt: now,
updatedAt: now,
});
});
}
respondToPermission(
turnId: string,
toolCallId: string,
decision: "granted" | "denied",
reason?: string,
): TurnHandle {
return this.enter(turnId, async () => {
const turn = await this.mustGet(turnId);
this.assertMutable(turn);
const state = deriveToolCallState(turn, toolCallId);
if (state !== "awaiting-user" && state !== "needs-classifier") {
throw new Error(`No open permission request for tool call: ${toolCallId}`);
}
turn.permissionDecisions.push({
toolCallId,
decidedBy: "user",
decision,
reason: reason ?? null,
decidedAt: nowIso(),
});
if (decision === "denied") {
// Denial resolves the call: decision + denial ToolMessage in one write.
const call = this.mustFindToolCall(turn, toolCallId);
turn.messages.push(toolMessage(
call,
`Permission denied by the user${reason ? `: ${reason}` : ""}`,
));
}
await this.persist(turn);
});
}
setToolResult(turnId: string, r: { toolCallId: string; result: unknown }): TurnHandle {
return this.enter(turnId, async () => {
const turn = await this.mustGet(turnId);
this.assertMutable(turn);
const state = deriveToolCallState(turn, r.toolCallId);
if (state !== "dispatched") {
throw new Error(`Tool call is not awaiting an external result: ${r.toolCallId}`);
}
const call = this.mustFindToolCall(turn, r.toolCallId);
turn.messages.push(toolMessage(call, stringifyToolResult(r.result)));
await this.persist(turn);
});
}
resumeTurn(turnId: string): TurnHandle {
return this.enter(turnId, async () => {
await this.mustGet(turnId);
});
}
async getTurn(turnId: string): Promise<z.infer<typeof AgentLoopTurn>> {
return this.mustGet(turnId);
}
async stopTurn(turnId: string): Promise<z.infer<typeof AgentLoopTurn>> {
for (const controller of this.active.get(turnId) ?? []) {
controller.abort();
}
// Queue behind the in-flight advance so it has fully wound down.
return this.mutex.run(turnId, () => this.mustGet(turnId));
}
// ── internals ───────────────────────────────────────────────────────────
// Every loop-entering method: persist its fact (prepare), then re-enter the
// advance() reducer, all under the per-turn mutex. The handle's result
// resolves when the turn reaches a rest state.
private enter(turnId: string, prepare: () => Promise<void>): TurnHandle {
const stream = new EventStream<TurnEvent, z.infer<typeof AgentLoopTurn>>();
const controller = new AbortController();
const controllers = this.active.get(turnId) ?? new Set();
controllers.add(controller);
this.active.set(turnId, controllers);
void this.mutex.run(turnId, async () => {
try {
// prepare() persists the entry's fact even if already stopped;
// a stop aborts execution, never the recording of facts.
await prepare();
await this.advance(turnId, stream, controller.signal);
stream.end(await this.mustGet(turnId));
} catch (error) {
stream.fail(error);
} finally {
controllers.delete(controller);
if (controllers.size === 0) this.active.delete(turnId);
}
});
return { id: turnId, events: stream, result: stream.result };
}
// The reducer. Driven purely by persisted facts: re-reads the turn from the
// store at the top of every iteration; no in-memory state is carried over.
// Crash recovery is free — this IS the resume function.
private async advance(
turnId: string,
stream: EventStream<TurnEvent, z.infer<typeof AgentLoopTurn>>,
signal: AbortSignal,
): Promise<void> {
for (let iteration = 0; iteration < this.maxIterations; iteration++) {
if (signal.aborted) return;
const turn = await this.mustGet(turnId);
// 1. terminal states
if (turn.error !== null || turn.completedAt !== null) return;
const unresolved = unresolvedToolCalls(turn);
const stateOf = new Map(unresolved.map((call) => [
call.toolCallId,
deriveToolCallState(turn, call.toolCallId),
]));
// 2. waiting → stop. The "never call the model while anything is
// waiting" invariant lives here and only here.
if ([...stateOf.values()].some((s) => s === "awaiting-user" || s === "dispatched")) {
return;
}
try {
// 3. classifier (auto mode): classify all, persist in one write
const needsClassifier = unresolved.filter(
(call) => stateOf.get(call.toolCallId) === "needs-classifier",
);
if (needsClassifier.length > 0) {
for (const call of needsClassifier) {
const request = turn.permissionRequests
.find((r) => r.toolCallId === call.toolCallId)?.request;
const verdict = await this.permissionGate.classify(call, request);
turn.permissionDecisions.push({
toolCallId: call.toolCallId,
decidedBy: "classifier",
decision: verdict.decision,
reason: verdict.reason,
decidedAt: nowIso(),
});
if (verdict.decision === "denied") {
turn.messages.push(toolMessage(
call,
`Permission denied by the auto-permission classifier: ${verdict.reason}`,
));
}
}
await this.persist(turn);
continue;
}
// 4. permission-not-yet-evaluated calls: batch-evaluate ALL of
// them and append all requests in ONE write — the user sees one
// approval moment, not N serial prompts.
const unevaluated = unresolved.filter(
(call) => stateOf.get(call.toolCallId) === "unevaluated",
);
const executable = unresolved.filter(
(call) => stateOf.get(call.toolCallId) === "cleared",
);
if (unevaluated.length > 0) {
const requested: string[] = [];
for (const call of unevaluated) {
const check = await this.permissionGate.check(call);
if (check.required) {
turn.permissionRequests.push({
toolCallId: call.toolCallId,
request: check.request,
requestedAt: nowIso(),
});
requested.push(call.toolCallId);
} else {
executable.push(call);
}
}
if (requested.length > 0) {
await this.persist(turn);
for (const toolCallId of requested) {
stream.push({ type: "permission-requested", toolCallId });
}
continue; // re-derive: waiting (manual) or classifier (auto)
}
}
// 5. interrupted calls (started, never resolved nor dispatched):
// never silently re-run a started tool — tell the model instead.
const interrupted = unresolved.filter(
(call) => stateOf.get(call.toolCallId) === "interrupted",
);
if (interrupted.length > 0) {
for (const call of interrupted) {
turn.messages.push(toolMessage(
call,
"Tool execution was interrupted before completing. It may or may not have taken effect; do not assume it ran.",
));
}
await this.persist(turn);
continue;
}
// 6. execute cleared calls
if (executable.length > 0) {
for (const call of executable) {
signal.throwIfAborted();
// Record the start fact BEFORE side effects — this is what
// makes resume side-effect-safe.
turn.startedTools.push({
toolCallId: call.toolCallId,
startedAt: nowIso(),
});
await this.persist(turn);
stream.push({ type: "tool-execution-start", toolCallId: call.toolCallId });
// Tool failures are conversational: a throwing runner
// becomes an error ToolMessage the model can react to,
// never a terminal turn error. Aborts still propagate.
const outcome = await this.toolRunner
.run(call, { turnId, signal })
.catch((error: unknown): ToolRunResult => {
signal.throwIfAborted();
return {
type: "error",
value: `Tool execution failed: ${error instanceof Error ? error.message : String(error)}`,
};
});
if (outcome.type === "pending") {
turn.dispatchedTools.push({
toolCallId: call.toolCallId,
dispatchedAt: nowIso(),
});
} else {
turn.messages.push(toolMessage(call, stringifyToolResult(outcome.value)));
}
await this.persist(turn);
if (outcome.type !== "pending") {
stream.push({ type: "tool-result", toolCallId: call.toolCallId });
}
}
continue;
}
// 7. nothing unresolved + complete assistant message last → done
const last = turn.messages[turn.messages.length - 1];
if (last && last.role === "assistant") {
turn.completedAt = nowIso();
await this.persist(turn);
return;
}
// 8. model call: accumulate in memory, commit only the complete
// AssistantMessage. Deltas flow to the handle, never to disk.
// The stream's result promise is the single source of failure:
// it rejects on model error AND on abort, and the catch below
// tells those apart via signal.aborted.
const modelStream = this.modelAdapter.stream({
provider: turn.provider,
model: turn.model,
messages: turn.messages,
tools: this.toolRunner.definitions(),
signal,
});
for await (const event of modelStream) {
stream.push(event);
}
const assistantMessage = await modelStream.result;
turn.messages.push(assistantMessage);
await this.persist(turn);
} catch (error) {
if (signal.aborted) return; // stopped: turn stays as persisted (idle)
await this.setTurnError(turnId, error);
return;
}
}
// 9. iteration cap exceeded
await this.setTurnError(turnId, new Error(
`Agent loop exceeded ${this.maxIterations} iterations`,
));
}
private async setTurnError(turnId: string, error: unknown): Promise<void> {
const turn = await this.mustGet(turnId);
turn.error = {
message: error instanceof Error ? error.message : String(error),
...(error instanceof Error && error.cause !== undefined
? { details: error.cause }
: {}),
at: nowIso(),
};
await this.persist(turn);
}
private async persist(turn: z.infer<typeof AgentLoopTurn>): Promise<void> {
turn.updatedAt = nowIso();
await this.store.update(turn);
}
private async mustGet(turnId: string): Promise<z.infer<typeof AgentLoopTurn>> {
const turn = await this.store.get(turnId);
if (!turn) throw new Error(`Turn not found: ${turnId}`);
return turn;
}
private assertMutable(turn: z.infer<typeof AgentLoopTurn>): void {
if (turn.error !== null) {
throw new Error(`Turn has a terminal error: ${turn.id}`);
}
if (turn.completedAt !== null) {
throw new Error(`Turn is already completed: ${turn.id}`);
}
}
private mustFindToolCall(
turn: z.infer<typeof AgentLoopTurn>,
toolCallId: string,
): z.infer<typeof ToolCallPart> {
for (const msg of turn.messages) {
if (msg.role !== "assistant" || typeof msg.content === "string") continue;
for (const part of msg.content) {
if (part.type === "tool-call" && part.toolCallId === toolCallId) return part;
}
}
throw new Error(`Tool call not found in transcript: ${toolCallId}`);
}
}

View file

@ -0,0 +1,58 @@
import { describe, expect, it } from "vitest";
import { EventStream } from "./event-stream.js";
async function collect<T>(iterable: AsyncIterable<T>): Promise<T[]> {
const items: T[] = [];
for await (const item of iterable) items.push(item);
return items;
}
describe("EventStream", () => {
it("yields pushed events and completes on end()", async () => {
const stream = new EventStream<number, string>();
const collecting = collect(stream);
stream.push(1);
stream.push(2);
stream.end("done");
expect(await collecting).toEqual([1, 2]);
expect(await stream.result).toBe("done");
});
it("delivers events buffered before iteration starts", async () => {
const stream = new EventStream<number, string>();
stream.push(1);
stream.push(2);
stream.end("done");
expect(await collect(stream)).toEqual([1, 2]);
});
it("resolves result without any event consumer", async () => {
const stream = new EventStream<number, string>();
stream.push(1);
stream.end("done");
expect(await stream.result).toBe("done");
});
it("rejects result and terminates iteration on fail()", async () => {
const stream = new EventStream<number, string>();
const collecting = collect(stream);
stream.push(1);
stream.fail(new Error("boom"));
expect(await collecting).toEqual([1]);
await expect(stream.result).rejects.toThrow("boom");
});
it("ignores pushes after completion", async () => {
const stream = new EventStream<number, string>();
stream.end("done");
stream.push(99);
expect(await collect(stream)).toEqual([]);
});
});

View file

@ -0,0 +1,61 @@
// Tiny EventStream<TEvent, TResult>: push events, complete with a result.
// Consumers can iterate events (streaming) or just await the result
// (await-to-rest); consuming events is never required for correctness.
export class EventStream<TEvent, TResult> implements AsyncIterable<TEvent> {
private buffer: TEvent[] = [];
private waiters: Array<() => void> = [];
private done = false;
readonly result: Promise<TResult>;
private resolveResult!: (value: TResult) => void;
private rejectResult!: (error: unknown) => void;
constructor() {
this.result = new Promise<TResult>((resolve, reject) => {
this.resolveResult = resolve;
this.rejectResult = reject;
});
// Mark rejections as handled so callers that only iterate events (or
// ignore the handle entirely) don't trigger unhandled-rejection noise.
// Awaiting `result` still rejects as expected.
this.result.catch(() => undefined);
}
push(event: TEvent): void {
if (this.done) return;
this.buffer.push(event);
this.wake();
}
end(result: TResult): void {
if (this.done) return;
this.done = true;
this.resolveResult(result);
this.wake();
}
fail(error: unknown): void {
if (this.done) return;
this.done = true;
this.rejectResult(error);
this.wake();
}
private wake(): void {
const waiters = this.waiters;
this.waiters = [];
for (const waiter of waiters) waiter();
}
async *[Symbol.asyncIterator](): AsyncIterator<TEvent> {
let index = 0;
for (;;) {
while (index < this.buffer.length) {
yield this.buffer[index++];
}
if (this.done) return;
await new Promise<void>((resolve) => this.waiters.push(resolve));
}
}
}

View file

@ -0,0 +1,26 @@
import { z } from "zod";
import { AgentLoopTurn } from "./types.js";
import type { TurnStore } from "./turn-store.js";
export class InMemoryTurnStore implements TurnStore {
private turns = new Map<string, z.infer<typeof AgentLoopTurn>>();
async create(turn: z.infer<typeof AgentLoopTurn>): Promise<void> {
if (this.turns.has(turn.id)) {
throw new Error(`Turn already exists: ${turn.id}`);
}
this.turns.set(turn.id, structuredClone(turn));
}
async get(id: string): Promise<z.infer<typeof AgentLoopTurn> | null> {
const turn = this.turns.get(id);
return turn ? structuredClone(turn) : null;
}
async update(turn: z.infer<typeof AgentLoopTurn>): Promise<void> {
if (!this.turns.has(turn.id)) {
throw new Error(`Turn not found: ${turn.id}`);
}
this.turns.set(turn.id, structuredClone(turn));
}
}

View file

@ -0,0 +1,9 @@
export * from "./types.js";
export * from "./event-stream.js";
export * from "./turn-store.js";
export * from "./in-memory-turn-store.js";
export * from "./tool-runner.js";
export * from "./permission-gate.js";
export * from "./model-adapter.js";
export * from "./agent-loop.js";
export * from "./sqlite-turn-store.js";

View file

@ -0,0 +1,141 @@
import { jsonSchema, stepCountIs, streamText, tool, type ToolSet } from "ai";
import { z } from "zod";
import {
AssistantContentPart,
AssistantMessage,
MessageList,
ToolCallPart,
} from "@x/shared/dist/message.js";
import { convertFromMessages } from "../agents/runtime.js";
import { createProvider } from "../models/models.js";
import { resolveProviderConfig } from "../models/defaults.js";
import { EventStream } from "./event-stream.js";
import type { ModelStreamEvent, ToolDefinition } from "./types.js";
export type ModelStreamRequest = {
provider: string | null;
model: string | null;
messages: z.infer<typeof MessageList>;
tools: ToolDefinition[];
signal: AbortSignal;
};
// Streams one model step. Iterate for deltas, or just await `.result` for the
// final complete AssistantMessage. The loop commits only the complete message;
// deltas are never persisted.
//
// Contract: `.result` is authoritative — it MUST resolve with the complete
// message or reject on failure/abort (the loop distinguishes the two via its
// own AbortSignal). `error` events are observational only; the loop ignores
// them.
export interface ModelAdapter {
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>;
}
// Thin adapter over the existing provider factory + Vercel AI SDK streamText.
// All retry/failover policy stays out of the agent loop; if a step fails, the
// stream emits an `error` event and the loop records a turn-level error.
export class VercelModelAdapter implements ModelAdapter {
stream(req: ModelStreamRequest): EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>> {
const out = new EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>();
void this.run(req, out).catch((error: unknown) => {
out.push({ type: "error", error });
out.fail(error);
});
return out;
}
private async run(
req: ModelStreamRequest,
out: EventStream<ModelStreamEvent, z.infer<typeof AssistantMessage>>,
): Promise<void> {
if (!req.provider || !req.model) {
throw new Error("Agent loop turn has no provider/model configured");
}
const providerConfig = await resolveProviderConfig(req.provider);
const provider = createProvider(providerConfig);
const model = provider.languageModel(req.model);
const tools: ToolSet = {};
for (const def of req.tools) {
tools[def.name] = tool({
...(def.description ? { description: def.description } : {}),
inputSchema: jsonSchema(
(def.inputSchema ?? { type: "object", properties: {} }) as Parameters<typeof jsonSchema>[0],
),
});
}
const result = streamText({
model,
messages: convertFromMessages(req.messages),
tools,
stopWhen: stepCountIs(1),
abortSignal: req.signal,
});
// Accumulate complete assistant content parts in stream order. Deltas
// append into the current text/reasoning part; tool calls are discrete.
const parts: z.infer<typeof AssistantContentPart>[] = [];
const lastPart = () => parts[parts.length - 1];
for await (const event of result.fullStream) {
req.signal.throwIfAborted();
switch (event.type) {
case "text-delta": {
const last = lastPart();
if (last?.type === "text") {
last.text += event.text;
} else {
parts.push({ type: "text", text: event.text });
}
out.push({ type: "text-delta", delta: event.text });
break;
}
case "reasoning-delta": {
const last = lastPart();
if (last?.type === "reasoning") {
last.text += event.text;
} else {
parts.push({ type: "reasoning", text: event.text });
}
out.push({ type: "reasoning-delta", delta: event.text });
break;
}
case "tool-call": {
const toolCall: z.infer<typeof ToolCallPart> = {
type: "tool-call",
toolCallId: event.toolCallId,
toolName: event.toolName,
arguments: event.input,
};
parts.push(toolCall);
out.push({ type: "tool-call", toolCall });
break;
}
case "error":
throw event.error instanceof Error
? event.error
: new Error(formatStreamError(event.error));
default:
break;
}
}
const message: z.infer<typeof AssistantMessage> = {
role: "assistant",
content: parts.length > 0 ? parts : "",
};
out.push({ type: "finish", message });
out.end(message);
}
}
function formatStreamError(error: unknown): string {
if (typeof error === "string") return error;
try {
return JSON.stringify(error);
} catch {
return String(error);
}
}

View file

@ -0,0 +1,22 @@
import { z } from "zod";
import { ToolCallPart } from "@x/shared/dist/message.js";
export type PermissionCheckResult =
| { required: false }
| { required: true; request: unknown };
export type PermissionClassification = {
decision: "granted" | "denied" | "abstained";
reason: string;
};
// Decides whether a tool call needs user approval, and (in auto mode)
// classifies it. The real implementation (bridging getToolPermissionMetadata /
// classifyToolPermissions) is integration-phase work; v1 uses fakes in tests.
export interface PermissionGate {
check(toolCall: z.infer<typeof ToolCallPart>): Promise<PermissionCheckResult>;
classify(
toolCall: z.infer<typeof ToolCallPart>,
request: unknown,
): Promise<PermissionClassification>;
}

View file

@ -0,0 +1,139 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { sql } from "kysely";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { z } from "zod";
import type { AgentLoopTurn } from "./types.js";
let tmpDir: string;
let workspaceDir: string;
let storageModule: typeof import("../storage/index.js") | null = null;
beforeEach(async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "rowboat-agent-loop-test-"));
workspaceDir = path.join(tmpDir, "workspace");
process.env.ROWBOAT_WORKDIR = workspaceDir;
vi.resetModules();
// config.ts kicks off knowledge-repo init as an import side effect; mock it
// out so tests don't touch git (same pattern as storage/storage.test.ts).
vi.doMock("../knowledge/version_history.js", () => ({
initRepo: vi.fn(async () => undefined),
}));
vi.doMock("../knowledge/deprecate_today_note.js", () => ({
deprecateTodayNote: vi.fn(async () => undefined),
}));
});
afterEach(async () => {
if (storageModule) {
await storageModule.shutdownStorage().catch(() => undefined);
storageModule = null;
}
delete process.env.ROWBOAT_WORKDIR;
vi.doUnmock("../knowledge/version_history.js");
vi.doUnmock("../knowledge/deprecate_today_note.js");
vi.resetModules();
await fs.rm(tmpDir, { recursive: true, force: true });
});
async function loadStore() {
storageModule = await import("../storage/index.js");
await storageModule.initStorage();
const { SqliteTurnStore } = await import("./sqlite-turn-store.js");
return { store: new SqliteTurnStore(storageModule.getDb()), db: storageModule.getDb() };
}
function sampleTurn(id: string): z.infer<typeof AgentLoopTurn> {
return {
id,
agentId: "agent-1",
provider: "openai",
model: "gpt-x",
permissionMode: "auto",
messages: [
{ role: "user", content: "hello" },
{
role: "assistant",
content: [
{ type: "text", text: "let me check" },
{ type: "tool-call", toolCallId: "tc1", toolName: "read", arguments: { path: "/a" } },
],
},
{ role: "tool", content: "file contents", toolCallId: "tc1", toolName: "read" },
],
permissionRequests: [
{ toolCallId: "tc1", request: { fileAccess: ["/a"] }, requestedAt: "2026-06-12T00:00:00Z" },
],
permissionDecisions: [
{
toolCallId: "tc1",
decidedBy: "classifier",
decision: "granted",
reason: "read-only access",
decidedAt: "2026-06-12T00:00:01Z",
},
],
startedTools: [{ toolCallId: "tc1", startedAt: "2026-06-12T00:00:02Z" }],
dispatchedTools: [],
error: null,
completedAt: null,
createdAt: "2026-06-12T00:00:00Z",
updatedAt: "2026-06-12T00:00:03Z",
};
}
describe("SqliteTurnStore", () => {
it("migration creates the agent_loop_turns table and index", async () => {
const { db } = await loadStore();
const tables = await sql<{ name: string }>`
select name from sqlite_master
where type = 'table' and name = 'agent_loop_turns'
`.execute(db);
expect(tables.rows).toHaveLength(1);
const indexes = await sql<{ name: string }>`
select name from sqlite_master
where type = 'index' and name = 'agent_loop_turns_created_at_idx'
`.execute(db);
expect(indexes.rows).toHaveLength(1);
});
it("round-trips every column", async () => {
const { store } = await loadStore();
const turn = sampleTurn("t1");
await store.create(turn);
expect(await store.get("t1")).toEqual(turn);
const updated = {
...turn,
dispatchedTools: [{ toolCallId: "tc1", dispatchedAt: "2026-06-12T00:00:04Z" }],
error: { message: "boom", code: "E1", details: { hint: "x" }, at: "2026-06-12T00:00:05Z" },
completedAt: "2026-06-12T00:00:06Z",
updatedAt: "2026-06-12T00:00:06Z",
};
await store.update(updated);
expect(await store.get("t1")).toEqual(updated);
});
it("returns null for unknown ids and rejects updates to missing turns", async () => {
const { store } = await loadStore();
expect(await store.get("missing")).toBeNull();
await expect(store.update(sampleTurn("missing"))).rejects.toThrow("Turn not found");
});
it("fails loudly on a corrupted JSON column", async () => {
const { store, db } = await loadStore();
await store.create(sampleTurn("t1"));
await db
.updateTable("agent_loop_turns")
.set({ permission_decisions: JSON.stringify([{ bogus: true }]) })
.where("id", "=", "t1")
.execute();
await expect(store.get("t1")).rejects.toThrow();
});
});

View file

@ -0,0 +1,87 @@
import type { Insertable, Kysely, Selectable } from "kysely";
import { z } from "zod";
import { MessageList } from "@x/shared/dist/message.js";
import type { AgentLoopTurnsTable, Database } from "../storage/schema.js";
import type { TurnStore } from "./turn-store.js";
import {
AgentLoopError,
AgentLoopTurn,
DispatchedTool,
PermissionDecision,
PermissionMode,
PermissionRequest,
StartedTool,
} from "./types.js";
// Accepts a Kysely<Database> from the existing getDb(); it does not own the
// storage lifecycle (never calls initStorage()). Every JSON column is
// zod-parsed on read so schema drift fails loudly at the boundary.
export class SqliteTurnStore implements TurnStore {
constructor(private db: Kysely<Database>) {}
async create(turn: z.infer<typeof AgentLoopTurn>): Promise<void> {
await this.db
.insertInto("agent_loop_turns")
.values(toRow(turn))
.execute();
}
async get(id: string): Promise<z.infer<typeof AgentLoopTurn> | null> {
const row = await this.db
.selectFrom("agent_loop_turns")
.selectAll()
.where("id", "=", id)
.executeTakeFirst();
return row ? fromRow(row) : null;
}
async update(turn: z.infer<typeof AgentLoopTurn>): Promise<void> {
const { id, ...rest } = toRow(turn);
const result = await this.db
.updateTable("agent_loop_turns")
.set(rest)
.where("id", "=", id)
.executeTakeFirst();
if (result.numUpdatedRows === 0n) {
throw new Error(`Turn not found: ${id}`);
}
}
}
function toRow(turn: z.infer<typeof AgentLoopTurn>): Insertable<AgentLoopTurnsTable> {
return {
id: turn.id,
agent_id: turn.agentId,
provider: turn.provider,
model: turn.model,
permission_mode: turn.permissionMode,
messages: JSON.stringify(turn.messages),
permission_requests: JSON.stringify(turn.permissionRequests),
permission_decisions: JSON.stringify(turn.permissionDecisions),
started_tools: JSON.stringify(turn.startedTools),
dispatched_tools: JSON.stringify(turn.dispatchedTools),
error: turn.error === null ? null : JSON.stringify(turn.error),
created_at: turn.createdAt,
updated_at: turn.updatedAt,
completed_at: turn.completedAt,
};
}
function fromRow(row: Selectable<AgentLoopTurnsTable>): z.infer<typeof AgentLoopTurn> {
return {
id: row.id,
agentId: row.agent_id,
provider: row.provider,
model: row.model,
permissionMode: PermissionMode.parse(row.permission_mode),
messages: MessageList.parse(JSON.parse(row.messages)),
permissionRequests: z.array(PermissionRequest).parse(JSON.parse(row.permission_requests)),
permissionDecisions: z.array(PermissionDecision).parse(JSON.parse(row.permission_decisions)),
startedTools: z.array(StartedTool).parse(JSON.parse(row.started_tools)),
dispatchedTools: z.array(DispatchedTool).parse(JSON.parse(row.dispatched_tools)),
error: row.error === null ? null : AgentLoopError.parse(JSON.parse(row.error)),
createdAt: row.created_at,
updatedAt: row.updated_at,
completedAt: row.completed_at,
};
}

View file

@ -0,0 +1,22 @@
import { z } from "zod";
import { ToolCallPart } from "@x/shared/dist/message.js";
import type { ToolDefinition } from "./types.js";
export type ToolRunResult =
| { type: "result"; value: unknown } // → ToolMessage
| { type: "error"; value: unknown } // → ToolMessage (model sees it; NOT a turn error)
| { type: "pending" }; // → DispatchedTool; result arrives via setToolResult
export type ToolRunContext = {
turnId: string;
signal: AbortSignal;
};
// Executes tool calls. The real implementation (bridging exec-tool.ts / MCP)
// is integration-phase work; v1 uses fakes in tests.
export interface ToolRunner {
// Tool definitions advertised to the model. Environment, not turn state:
// resume works because the loop is reconstructed with the same runner.
definitions(): ToolDefinition[];
run(toolCall: z.infer<typeof ToolCallPart>, ctx: ToolRunContext): Promise<ToolRunResult>;
}

View file

@ -0,0 +1,10 @@
import { z } from "zod";
import { AgentLoopTurn } from "./types.js";
// Durable storage for turns. The per-turn mutex lives ABOVE the store (in
// AgentLoopImpl), not in it — stores only read and write whole turns.
export interface TurnStore {
create(turn: z.infer<typeof AgentLoopTurn>): Promise<void>;
get(id: string): Promise<z.infer<typeof AgentLoopTurn> | null>;
update(turn: z.infer<typeof AgentLoopTurn>): Promise<void>;
}

View file

@ -0,0 +1,192 @@
import { z } from "zod";
import {
AssistantMessage,
MessageList,
ToolCallPart,
} from "@x/shared/dist/message.js";
// ─── Persisted fact schemas ─────────────────────────────────────────────────
//
// A turn is five append-only fact logs + set-once scalars. Records are never
// mutated or deleted; every field records exactly one non-derivable fact.
// Everything else (status, per-call lifecycle) is derived.
export const PermissionRequest = z.object({
toolCallId: z.string(),
// What the user is approving (file access, command, ...). Computed from
// tool args by the PermissionGate, so it must be persisted to pin down
// exactly what was asked.
request: z.unknown(),
requestedAt: z.string(),
});
export const PermissionDecision = z.discriminatedUnion("decidedBy", [
z.object({
toolCallId: z.string(),
decidedBy: z.literal("user"),
decision: z.enum(["granted", "denied"]),
reason: z.string().nullable(),
decidedAt: z.string(),
}),
z.object({
toolCallId: z.string(),
decidedBy: z.literal("classifier"),
decision: z.enum(["granted", "denied", "abstained"]),
reason: z.string(),
decidedAt: z.string(),
}),
]);
export const StartedTool = z.object({
toolCallId: z.string(),
startedAt: z.string(),
});
export const DispatchedTool = z.object({
toolCallId: z.string(),
dispatchedAt: z.string(),
});
export const AgentLoopError = z.object({
message: z.string(),
code: z.string().optional(),
details: z.unknown().optional(),
at: z.string(),
});
export const PermissionMode = z.enum(["manual", "auto"]);
export const AgentLoopTurn = z.object({
id: z.string(),
agentId: z.string().nullable(),
provider: z.string().nullable(),
model: z.string().nullable(),
permissionMode: PermissionMode,
// append-only fact logs
messages: MessageList,
permissionRequests: z.array(PermissionRequest),
permissionDecisions: z.array(PermissionDecision),
startedTools: z.array(StartedTool),
dispatchedTools: z.array(DispatchedTool),
// set-once scalars
error: AgentLoopError.nullable(),
completedAt: z.string().nullable(),
createdAt: z.string(),
updatedAt: z.string(),
});
export const AgentLoopInput = z.object({
agentId: z.string().nullable().optional(),
provider: z.string().nullable().optional(),
model: z.string().nullable().optional(),
permissionMode: PermissionMode.optional(),
// May include prior-conversation history; turns are self-contained by design.
messages: MessageList.min(1),
});
// ─── Tool definitions (environment, not turn state) ────────────────────────
export type ToolDefinition = {
name: string;
description?: string;
// JSON Schema for the tool input
inputSchema?: unknown;
};
// ─── Live (never persisted) event types ─────────────────────────────────────
export type ModelStreamEvent =
| { type: "text-delta"; delta: string }
| { type: "reasoning-delta"; delta: string }
| { type: "tool-call"; toolCall: z.infer<typeof ToolCallPart> }
| { type: "finish"; message: z.infer<typeof AssistantMessage> }
| { type: "error"; error: unknown };
export type TurnEvent =
| ModelStreamEvent
| { type: "tool-execution-start"; toolCallId: string }
| { type: "tool-result"; toolCallId: string }
| { type: "permission-requested"; toolCallId: string };
// ─── Derived state ──────────────────────────────────────────────────────────
export type TurnStatus = "waiting" | "completed" | "error" | "idle";
export type ToolCallState =
| "resolved" // matching ToolMessage exists — terminal
| "dispatched" // delegated; result arrives via setToolResult
| "interrupted" // started but never resolved nor dispatched (crash/abort)
| "needs-classifier" // open request, auto mode, classifier has not spoken
| "awaiting-user" // open request, waiting on a user decision
| "cleared" // terminal `granted` decision; ready to execute
| "unevaluated"; // no facts yet; permission gate has not been consulted
export function toolCallParts(
turn: z.infer<typeof AgentLoopTurn>,
): z.infer<typeof ToolCallPart>[] {
const parts: z.infer<typeof ToolCallPart>[] = [];
for (const msg of turn.messages) {
if (msg.role !== "assistant" || typeof msg.content === "string") continue;
for (const part of msg.content) {
if (part.type === "tool-call") parts.push(part);
}
}
return parts;
}
export function resolvedToolCallIds(turn: z.infer<typeof AgentLoopTurn>): Set<string> {
const ids = new Set<string>();
for (const msg of turn.messages) {
if (msg.role === "tool") ids.add(msg.toolCallId);
}
return ids;
}
export function unresolvedToolCalls(
turn: z.infer<typeof AgentLoopTurn>,
): z.infer<typeof ToolCallPart>[] {
const resolved = resolvedToolCallIds(turn);
return toolCallParts(turn).filter((part) => !resolved.has(part.toolCallId));
}
export function deriveToolCallState(
turn: z.infer<typeof AgentLoopTurn>,
toolCallId: string,
): ToolCallState {
if (resolvedToolCallIds(turn).has(toolCallId)) return "resolved";
if (turn.dispatchedTools.some((t) => t.toolCallId === toolCallId)) return "dispatched";
if (turn.startedTools.some((t) => t.toolCallId === toolCallId)) return "interrupted";
const request = turn.permissionRequests.find((r) => r.toolCallId === toolCallId);
if (request) {
const decisions = turn.permissionDecisions.filter((d) => d.toolCallId === toolCallId);
const terminal = decisions.find((d) => d.decision === "granted" || d.decision === "denied");
if (terminal) {
// A denied call always has its denial ToolMessage appended atomically
// with the decision, so an unresolved terminal decision should be
// `granted` — but check explicitly: an unpaired denial (a buggy
// future writer) must never derive as executable. It falls back to
// awaiting-user, which self-heals via a fresh decision.
return terminal.decision === "granted" ? "cleared" : "awaiting-user";
}
if (turn.permissionMode === "auto" && !decisions.some((d) => d.decidedBy === "classifier")) {
return "needs-classifier";
}
return "awaiting-user";
}
return "unevaluated";
}
export function deriveTurnStatus(turn: z.infer<typeof AgentLoopTurn>): TurnStatus {
if (turn.error !== null) return "error";
if (turn.completedAt !== null) return "completed";
for (const call of unresolvedToolCalls(turn)) {
const state = deriveToolCallState(turn, call.toolCallId);
if (state === "awaiting-user" || state === "dispatched") return "waiting";
}
return "idle";
}

View file

@ -20,6 +20,39 @@ const migrations: Record<string, Migration> = {
await db.schema.dropTable("storage_metadata").ifExists().execute();
},
},
"2026-06-12_0002_agent_loop_turns": {
async up(db: MigrationDb): Promise<void> {
await db.schema
.createTable("agent_loop_turns")
.ifNotExists()
.addColumn("id", "text", (col) => col.primaryKey())
.addColumn("agent_id", "text")
.addColumn("provider", "text")
.addColumn("model", "text")
.addColumn("permission_mode", "text", (col) => col.notNull())
.addColumn("messages", "text", (col) => col.notNull())
.addColumn("permission_requests", "text", (col) => col.notNull())
.addColumn("permission_decisions", "text", (col) => col.notNull())
.addColumn("started_tools", "text", (col) => col.notNull())
.addColumn("dispatched_tools", "text", (col) => col.notNull())
.addColumn("error", "text")
.addColumn("created_at", "text", (col) => col.notNull())
.addColumn("updated_at", "text", (col) => col.notNull())
.addColumn("completed_at", "text")
.execute();
await db.schema
.createIndex("agent_loop_turns_created_at_idx")
.ifNotExists()
.on("agent_loop_turns")
.column("created_at")
.execute();
},
async down(db: MigrationDb): Promise<void> {
await db.schema.dropIndex("agent_loop_turns_created_at_idx").ifExists().execute();
await db.schema.dropTable("agent_loop_turns").ifExists().execute();
},
},
};
class InCodeMigrationProvider implements MigrationProvider {

View file

@ -8,6 +8,24 @@ export interface StorageMetadataTable {
updated_at: TimestampColumn;
}
export interface AgentLoopTurnsTable {
id: string;
agent_id: string | null;
provider: string | null;
model: string | null;
permission_mode: string;
messages: string; // JSON: MessageList
permission_requests: string; // JSON: PermissionRequest[]
permission_decisions: string; // JSON: PermissionDecision[]
started_tools: string; // JSON: StartedTool[]
dispatched_tools: string; // JSON: DispatchedTool[]
error: string | null; // JSON: AgentLoopError
created_at: TimestampColumn;
updated_at: TimestampColumn;
completed_at: string | null;
}
export interface Database {
storage_metadata: StorageMetadataTable;
agent_loop_turns: AgentLoopTurnsTable;
}