From 80c823a3ca11a36e1a580a78afd1c3de71d25353 Mon Sep 17 00:00:00 2001 From: Sam Valladares Date: Mon, 22 Jun 2026 17:06:35 -0500 Subject: [PATCH] feat(blackbox): Agent Black Box + Receipts + risk-gated Memory PRs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Watch the agent think. Watch memory change. Watch the receipt prove why. Make Vestige the first memory server where you can replay an agent run, audit every retrieval, and review changes to the agent's brain like code. Phase 0 — the trace-correlation spine. One runId threads, unbroken, through every layer: MCP tool output (runId + traceUri) -> SQLite agent_traces rows -> WebSocket TraceEvent -> dashboard pulse -> /api/traces/:runId -> vestige://trace/{runId} -> .vestige-trace.json export -> Cinema replay input. Proven end to end by a real JSON-RPC round-trip integration test. Core (vestige-core): - trace/ module: MemoryTraceEvent (7 variants incl. contradiction.detected), Receipt, and classify_write — the pure, DB-free immune-system logic. - Risk taxonomy: contradiction-vs-high-trust, supersede/forget/merge/protect, identity/preference/workflow/positioning, auth/security/money/legal, dream consolidation, decay resurrection, low-confidence batch, weak-provenance connector. Fast / Risk-Gated (default) / Paranoid modes. - V18 migration: agent_traces, agent_runs, memory_receipts, memory_prs. - trace_store.rs: CRUD following the established store idiom. MCP (vestige-mcp): - trace_recorder.rs: records mcp.call + downstream retrieve/suppress/write/ contradiction/veto/dream events; builds + persists receipts; risk-gates writes into Memory PRs. Args are hashed, never stored raw. - server.rs dispatch stamps runId/traceUri/receipt onto every tool result and routes risky writes to the PR queue; trace events broadcast over WebSocket. - vestige://trace/{runId} resource; /api/traces, /api/receipts, /api/memory-prs. Dashboard: - Black Box tab: live spine header + Proof Mode, run picker, timeline scrubber, per-event detail, memory pulse, full event log, .vestige-trace.json export. - Memory PRs tab: GitHub-style cognition diff, self-explaining risk signals, Promote/Merge/Supersede/Quarantine/Forget/Ask-Agent-Why, mode toggle. - ReceiptCard with "Open receipt in Cinema" (deep-links graph; Cinema untouched). Gates: 987 lib tests pass, clippy -D warnings clean, dashboard check + build clean. Live proof in blackbox-proof-2026-06-22/. Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/dashboard/src/lib/components/Icon.svelte | 6 + .../src/lib/components/ReceiptCard.svelte | 218 +++++ .../src/lib/components/blackbox-helpers.ts | 134 +++ apps/dashboard/src/lib/stores/api.ts | 112 +++ apps/dashboard/src/lib/stores/websocket.ts | 24 + apps/dashboard/src/lib/types/index.ts | 3 + .../src/routes/(app)/blackbox/+page.svelte | 833 ++++++++++++++++++ .../src/routes/(app)/graph/+page.svelte | 10 +- .../src/routes/(app)/memory-prs/+page.svelte | 726 +++++++++++++++ apps/dashboard/src/routes/+layout.svelte | 2 + blackbox-proof-2026-06-22/memory-prs.json | 1 + blackbox-proof-2026-06-22/phase-1-status.json | 1 + blackbox-proof-2026-06-22/phase-3-trace.json | 1 + blackbox-proof-2026-06-22/proof-summary.md | 57 ++ blackbox-proof-2026-06-22/receipts.json | 1 + crates/vestige-core/src/lib.rs | 12 + crates/vestige-core/src/storage/migrations.rs | 127 ++- crates/vestige-core/src/storage/mod.rs | 2 + crates/vestige-core/src/storage/sqlite.rs | 7 +- .../vestige-core/src/storage/trace_store.rs | 595 +++++++++++++ crates/vestige-core/src/trace/mod.rs | 356 ++++++++ crates/vestige-core/src/trace/receipt.rs | 247 ++++++ crates/vestige-core/src/trace/review.rs | 692 +++++++++++++++ crates/vestige-mcp/src/autopilot.rs | 5 +- crates/vestige-mcp/src/dashboard/events.rs | 33 + crates/vestige-mcp/src/dashboard/handlers.rs | 282 ++++++ crates/vestige-mcp/src/dashboard/mod.rs | 22 + crates/vestige-mcp/src/lib.rs | 1 + crates/vestige-mcp/src/resources/mod.rs | 1 + crates/vestige-mcp/src/resources/trace.rs | 103 +++ crates/vestige-mcp/src/server.rs | 275 +++++- crates/vestige-mcp/src/trace_recorder.rs | 704 +++++++++++++++ 32 files changed, 5575 insertions(+), 18 deletions(-) create mode 100644 apps/dashboard/src/lib/components/ReceiptCard.svelte create mode 100644 apps/dashboard/src/lib/components/blackbox-helpers.ts create mode 100644 apps/dashboard/src/routes/(app)/blackbox/+page.svelte create mode 100644 apps/dashboard/src/routes/(app)/memory-prs/+page.svelte create mode 100644 blackbox-proof-2026-06-22/memory-prs.json create mode 100644 blackbox-proof-2026-06-22/phase-1-status.json create mode 100644 blackbox-proof-2026-06-22/phase-3-trace.json create mode 100644 blackbox-proof-2026-06-22/proof-summary.md create mode 100644 blackbox-proof-2026-06-22/receipts.json create mode 100644 crates/vestige-core/src/storage/trace_store.rs create mode 100644 crates/vestige-core/src/trace/mod.rs create mode 100644 crates/vestige-core/src/trace/receipt.rs create mode 100644 crates/vestige-core/src/trace/review.rs create mode 100644 crates/vestige-mcp/src/resources/trace.rs create mode 100644 crates/vestige-mcp/src/trace_recorder.rs diff --git a/apps/dashboard/src/lib/components/Icon.svelte b/apps/dashboard/src/lib/components/Icon.svelte index ed248a0..1acf84f 100644 --- a/apps/dashboard/src/lib/components/Icon.svelte +++ b/apps/dashboard/src/lib/components/Icon.svelte @@ -13,6 +13,8 @@ // ◎ ◈ ◉ ◷ across multiple items; that bug is dead here). // ═══════════════════════════════════════════════════════════════════ export type IconName = + | 'blackbox' + | 'memorypr' | 'graph' | 'reasoning' | 'memories' @@ -41,6 +43,10 @@ // Each entry is the inner markup of a 24×24 SVG. Strokes inherit // currentColor; fills are explicit where a solid accent reads better. export const ICON_PATHS: Record = { + // Flight recorder — a radar-pulse sweep inside a recorder box. + blackbox: ``, + // Git-branch with a review check — approve changes to the brain. + memorypr: ``, // Connected nodes — a literal knowledge graph. graph: ``, // Branching logic tree with a spark — deduction. diff --git a/apps/dashboard/src/lib/components/ReceiptCard.svelte b/apps/dashboard/src/lib/components/ReceiptCard.svelte new file mode 100644 index 0000000..71ea574 --- /dev/null +++ b/apps/dashboard/src/lib/components/ReceiptCard.svelte @@ -0,0 +1,218 @@ + + +
+
+ {receipt.receipt_id} + + decay: {receipt.decay_risk} + +
+ +
+
+ {receipt.retrieved.length} + retrieved +
+
+ {receipt.suppressed.length} + suppressed +
+
+ {(receipt.trust_floor * 100).toFixed(0)}% + trust floor +
+
+ + {#if !compact} + {#if receipt.activation_path.length} +
+ Activation path + {#each receipt.activation_path as path (path)} +
{path}
+ {/each} +
+ {/if} + + {#if receipt.retrieved.length} +
+ Retrieved +
+ {#each receipt.retrieved as id (id)} + {id.slice(0, 8)} + {/each} +
+
+ {/if} + + {#if receipt.suppressed.length} +
+ Suppressed +
+ {#each receipt.suppressed as s (s.id)} + + {s.id.slice(0, 8)} · {s.reason.replace('_', ' ')} + + {/each} +
+
+ {/if} + {/if} + + +
+ + diff --git a/apps/dashboard/src/lib/components/blackbox-helpers.ts b/apps/dashboard/src/lib/components/blackbox-helpers.ts new file mode 100644 index 0000000..908d880 --- /dev/null +++ b/apps/dashboard/src/lib/components/blackbox-helpers.ts @@ -0,0 +1,134 @@ +// ═══════════════════════════════════════════════════════════════════════════ +// AGENT BLACK BOX — presentation helpers +// ─────────────────────────────────────────────────────────────────────────── +// Pure functions that turn a raw `TraceEvent` into the label, color, glyph, +// and one-line summary the Black Box timeline renders. Kept out of the +// component so they are unit-testable and reused by the Proof Mode header. +// ═══════════════════════════════════════════════════════════════════════════ +import type { TraceEvent } from '$lib/stores/api'; + +export type TraceKind = TraceEvent['type']; + +/** The accent color for each trace-event kind (CSS color value). */ +export function eventColor(kind: TraceKind): string { + switch (kind) { + case 'mcp.call': + return 'var(--color-synapse-glow, #818cf8)'; + case 'memory.retrieve': + return 'var(--color-recall, #10b981)'; + case 'memory.suppress': + return '#a78bfa'; // violet — the forgetting hue + case 'memory.write': + return '#38bdf8'; // sky — a new write + case 'contradiction.detected': + return '#fb7185'; // rose — tension + case 'sanhedrin.veto': + return '#f43f5e'; // red — a block + case 'dream.patch': + return '#c084fc'; // purple — dream + default: + return 'var(--color-synapse, #6366f1)'; + } +} + +/** A short human label for each kind. */ +export function eventLabel(kind: TraceKind): string { + switch (kind) { + case 'mcp.call': + return 'Tool Call'; + case 'memory.retrieve': + return 'Retrieved'; + case 'memory.suppress': + return 'Suppressed'; + case 'memory.write': + return 'Wrote'; + case 'contradiction.detected': + return 'Contradiction'; + case 'sanhedrin.veto': + return 'Veto'; + case 'dream.patch': + return 'Dream Patch'; + default: + return kind; + } +} + +/** A single glyph (emoji-free SVG path is overkill here; a compact symbol). */ +export function eventGlyph(kind: TraceKind): string { + switch (kind) { + case 'mcp.call': + return '⟐'; + case 'memory.retrieve': + return '◉'; + case 'memory.suppress': + return '⊘'; + case 'memory.write': + return '✎'; + case 'contradiction.detected': + return '⚡'; + case 'sanhedrin.veto': + return '⛔'; + case 'dream.patch': + return '☾'; + default: + return '•'; + } +} + +/** A one-line summary of what an event did, for the timeline row. */ +export function eventSummary(ev: TraceEvent): string { + switch (ev.type) { + case 'mcp.call': + return `${ev.tool} · args ${ev.argsHash.slice(0, 8)}`; + case 'memory.retrieve': + return `${ev.ids.length} ${ev.ids.length === 1 ? 'memory' : 'memories'} surfaced`; + case 'memory.suppress': + return `${ev.id.slice(0, 8)} — ${ev.reason.replace('_', ' ')}`; + case 'memory.write': + return `${ev.id.slice(0, 8)} — ${ev.source}`; + case 'contradiction.detected': + return ev.detail; + case 'sanhedrin.veto': + return `"${ev.claim}" (conf ${(ev.confidence * 100).toFixed(0)}%)`; + case 'dream.patch': + return `${ev.proposalIds.length} consolidation proposal(s)`; + default: + return ''; + } +} + +/** The memory ids an event touched (for graph-pulse replay). */ +export function eventMemoryIds(ev: TraceEvent): string[] { + switch (ev.type) { + case 'memory.retrieve': + return ev.ids; + case 'memory.suppress': + case 'memory.write': + return [ev.id]; + case 'contradiction.detected': + return ev.ids; + case 'sanhedrin.veto': + return ev.evidenceIds; + case 'dream.patch': + return ev.proposalIds; + default: + return []; + } +} + +/** Format a millisecond timestamp as a clock time. */ +export function formatAt(at: number): string { + if (!Number.isFinite(at) || at <= 0) return '—'; + const d = new Date(at); + return d.toLocaleTimeString(undefined, { + hour12: false, + hour: '2-digit', + minute: '2-digit', + second: '2-digit' + }); +} + +/** Elapsed milliseconds of an event relative to the run's first event. */ +export function relativeMs(at: number, startAt: number): number { + return Math.max(0, at - startAt); +} diff --git a/apps/dashboard/src/lib/stores/api.ts b/apps/dashboard/src/lib/stores/api.ts index 7bc22dd..cb9d79b 100644 --- a/apps/dashboard/src/lib/stores/api.ts +++ b/apps/dashboard/src/lib/stores/api.ts @@ -133,5 +133,117 @@ export const api = { method: 'POST', body: JSON.stringify({ reason, note, claimId, receiptId }) }) + }, + + // Agent Black Box (v2.2): replayable agent-run traces. The runId in a tool + // result threads through here unchanged — one id, end to end. + traces: { + list: (limit = 50) => fetcher(`/traces?limit=${limit}`), + get: (runId: string) => fetcher(`/traces/${encodeURIComponent(runId)}`), + exportUrl: (runId: string) => `${BASE}/traces/${encodeURIComponent(runId)}/export` + }, + + // Memory Receipts (v2.2): the nutrition label for a retrieval. + receipts: { + list: (limit = 50) => fetcher(`/receipts?limit=${limit}`), + get: (receiptId: string) => fetcher(`/receipts/${encodeURIComponent(receiptId)}`) + }, + + // Memory PRs (v2.2): the risk-gated brain-change review queue. + memoryPrs: { + list: (status?: string, limit = 100) => { + const qs = new URLSearchParams(); + if (status) qs.set('status', status); + qs.set('limit', String(limit)); + return fetcher(`/memory-prs?${qs.toString()}`); + }, + get: (id: string) => fetcher(`/memory-prs/${encodeURIComponent(id)}`), + act: (id: string, action: MemoryPrAction) => + fetcher>(`/memory-prs/${encodeURIComponent(id)}/${action}`, { + method: 'POST' + }), + getMode: () => fetcher<{ mode: ReviewMode; pendingCount: number }>('/memory-prs/mode'), + setMode: (mode: ReviewMode) => + fetcher<{ mode: ReviewMode }>('/memory-prs/mode', { + method: 'POST', + body: JSON.stringify({ mode }) + }) } }; + +// --------------------------------------------------------------------------- +// Agent Black Box / Receipts / Memory PR types +// --------------------------------------------------------------------------- + +export type TraceRunSummary = { + runId: string; + firstTool: string | null; + eventCount: number; + retrievedCount: number; + suppressedCount: number; + writeCount: number; + vetoCount: number; + startedAt: number; + lastAt: number; +}; + +export type TraceRunListResponse = { total: number; runs: TraceRunSummary[] }; + +/** One trace event — discriminated on `type`, matching the Rust schema. */ +export type TraceEvent = + | { type: 'mcp.call'; runId: string; tool: string; argsHash: string; at: number } + | { type: 'memory.retrieve'; runId: string; ids: string[]; activation: Record; at: number } + | { type: 'memory.suppress'; runId: string; id: string; reason: string; at: number } + | { type: 'memory.write'; runId: string; id: string; diff: unknown; source: string; at: number } + | { type: 'contradiction.detected'; runId: string; ids: string[]; winnerId?: string; detail: string; at: number } + | { type: 'sanhedrin.veto'; runId: string; claim: string; evidenceIds: string[]; confidence: number; at: number } + | { type: 'dream.patch'; runId: string; proposalIds: string[]; at: number }; + +export type TraceDetail = { + runId: string; + summary: Omit | null; + events: TraceEvent[]; +}; + +export type Receipt = { + receipt_id: string; + retrieved: string[]; + suppressed: { id: string; reason: string }[]; + activation_path: string[]; + trust_floor: number; + decay_risk: 'low' | 'medium' | 'high'; + mutations: { id: string; kind: string; note?: string }[]; +}; + +export type ReceiptListResponse = { total: number; receipts: Receipt[] }; + +export type MemoryPrAction = + | 'promote' + | 'merge' + | 'supersede' + | 'quarantine' + | 'forget' + | 'ask_agent_why'; + +export type ReviewMode = 'fast' | 'risk_gated' | 'paranoid'; + +export type MemoryPr = { + id: string; + kind: string; + status: string; + title: string; + diff: Record; + signals: { code: string; detail: string }[]; + subject_id?: string; + run_id?: string; + created_at: string; + decided_at?: string; + decision?: string; +}; + +export type MemoryPrListResponse = { + total: number; + pendingCount: number; + mode: ReviewMode; + prs: MemoryPr[]; +}; diff --git a/apps/dashboard/src/lib/stores/websocket.ts b/apps/dashboard/src/lib/stores/websocket.ts index fda02f9..2e591ac 100644 --- a/apps/dashboard/src/lib/stores/websocket.ts +++ b/apps/dashboard/src/lib/stores/websocket.ts @@ -132,6 +132,30 @@ export const uptimeSeconds = derived(websocket, $ws => ($ws.lastHeartbeat?.data?.uptime_secs as number) ?? 0 ); +// Agent Black Box (v2.2): the live stream of trace events, newest first. Each +// is a real `VestigeEvent::TraceEvent` backed by a persisted `agent_traces` +// row — the dashboard pulse is only ever driven by these, never by fakes. +export const traceEvents = derived(websocket, $ws => + $ws.events.filter((e) => e.type === 'TraceEvent') +); + +// The most recent runId seen on the live feed — the "current run" indicator in +// Proof Mode / the Black Box live header. +export const liveRunId = derived(websocket, $ws => { + const latest = $ws.events.find((e) => e.type === 'TraceEvent'); + return (latest?.data?.run_id as string) ?? null; +}); + +// The single most recent trace event (for the "last event" readout). +export const lastTraceEvent = derived(websocket, $ws => + $ws.events.find((e) => e.type === 'TraceEvent') ?? null +); + +// Live Memory PR notifications (opened / decided) for the queue badge + toasts. +export const memoryPrEvents = derived(websocket, $ws => + $ws.events.filter((e) => e.type === 'MemoryPrOpened' || e.type === 'MemoryPrDecided') +); + export function formatUptime(secs: number): string { if (!Number.isFinite(secs) || secs < 0) return '—'; const d = Math.floor(secs / 86_400); diff --git a/apps/dashboard/src/lib/types/index.ts b/apps/dashboard/src/lib/types/index.ts index 4c47a16..2989391 100644 --- a/apps/dashboard/src/lib/types/index.ts +++ b/apps/dashboard/src/lib/types/index.ts @@ -168,6 +168,9 @@ export type VestigeEventType = | 'ImportanceScored' | 'DeepReferenceCompleted' | 'HookVerdictRecorded' + | 'TraceEvent' + | 'MemoryPrOpened' + | 'MemoryPrDecided' | 'Heartbeat'; export interface VestigeEvent { diff --git a/apps/dashboard/src/routes/(app)/blackbox/+page.svelte b/apps/dashboard/src/routes/(app)/blackbox/+page.svelte new file mode 100644 index 0000000..ed8ccd6 --- /dev/null +++ b/apps/dashboard/src/routes/(app)/blackbox/+page.svelte @@ -0,0 +1,833 @@ + + +
+ + + + + + +
+
+ WebSocket + + + {$isConnected ? 'Connected' : 'Offline'} + +
+
+ Live runId + {$liveRunId ?? '—'} +
+
+ Last event + + {#if $lastTraceEvent} + + {eventLabel(($lastTraceEvent.data?.event as TraceEvent)?.type)} + + {:else} + awaiting… + {/if} + +
+
+ Events seen + + + +
+
+ + {#if !proofMode} +
+ + + + +
+ {#if loading} +
Loading trace…
+ {:else if error} +
{error}
+ {:else if !detail} +
Select a run to replay.
+ {:else} + +
+
+ + Step {scrubIndex + 1} / {detail.events.length} + + {#if currentEvent} + +{relativeMs(currentEvent.at, startAt)}ms + {/if} +
+ + +
+ {#each detail.events as ev, i (i)} + + {/each} +
+
+ + + {#if currentEvent} +
+
+ {eventGlyph(currentEvent.type)} + {eventLabel(currentEvent.type)} + {formatAt(currentEvent.at)} +
+

{eventSummary(currentEvent)}

+ + {#if currentEvent.type === 'memory.retrieve'} +
+ {#each currentEvent.ids as id (id)} + + {id.slice(0, 8)} + {#if currentEvent.activation[id] != null} + {(currentEvent.activation[id] * 100).toFixed(0)}% + {/if} + + {/each} +
+ {:else if currentEvent.type === 'contradiction.detected'} +
+ kept {currentEvent.winnerId?.slice(0, 8)} + vs + {#each currentEvent.ids.filter((i) => i !== currentEvent.winnerId) as id (id)} + {id.slice(0, 8)} + {/each} +
+ {:else if currentEvent.type === 'sanhedrin.veto'} +
+ {#each currentEvent.evidenceIds as id (id)} + {id.slice(0, 8)} + {/each} +
+ {/if} +
+ {/if} + + +
+

+ Memory pulse — touched this run +

+ {#if pulsedIds.length === 0} +

No memories touched yet.

+ {:else} +
+ {#each pulsedIds as id (id)} + {id.slice(0, 8)} + {/each} +
+ {/if} +
+ + +
+

Event log

+
    + {#each detail.events as ev, i (i)} +
  1. scrubIndex} + style:--c={eventColor(ev.type)} + > + +
  2. + {/each} +
+
+ {/if} +
+
+ {:else} + +
+
+ + {$liveRunId ?? 'awaiting run…'} +
+ {#if $lastTraceEvent} + {@const ev = $lastTraceEvent.data?.event as TraceEvent} +
+ {eventGlyph(ev?.type)} +
+
{eventLabel(ev?.type)}
+
{eventSummary(ev)}
+
+
+ {/if} +
+ + trace events +
+

Watch the agent think. Watch memory change. Watch the receipt prove why.

+
+ {/if} +
+ + diff --git a/apps/dashboard/src/routes/(app)/graph/+page.svelte b/apps/dashboard/src/routes/(app)/graph/+page.svelte index f3ffbe8..fe9f6d5 100644 --- a/apps/dashboard/src/routes/(app)/graph/+page.svelte +++ b/apps/dashboard/src/routes/(app)/graph/+page.svelte @@ -102,11 +102,17 @@ } onMount(() => { - const requestedMode = new URLSearchParams(window.location.search).get('colorMode'); + const sp = new URLSearchParams(window.location.search); + const requestedMode = sp.get('colorMode'); if (isColorMode(requestedMode)) { colorMode = requestedMode; } - void loadGraph(); + // "Open receipt in Cinema" deep-links here with ?center=, so + // the graph loads centered on the receipt's primary memory and the + // (protected) Cinema flythrough starts from that exact node. We do not + // touch MemoryCinema itself — only seed the graph it renders. + const center = sp.get('center'); + void loadGraph(undefined, center || undefined); }); function isColorMode(value: string | null): value is ColorMode { diff --git a/apps/dashboard/src/routes/(app)/memory-prs/+page.svelte b/apps/dashboard/src/routes/(app)/memory-prs/+page.svelte new file mode 100644 index 0000000..cb09111 --- /dev/null +++ b/apps/dashboard/src/routes/(app)/memory-prs/+page.svelte @@ -0,0 +1,726 @@ + + +
+ + 0}> + pending + + + + +
+ Vestige auto-remembers ordinary context, but opens a + Memory PR when the agent tries to rewrite its own brain. +
+ + +
+ {#each modes as m (m.id)} + + {/each} +
+ + +
+ {#each statuses as s (s)} + + {/each} + +
+ +
+ + + + +
+ {#if !selected} +
Select a Memory PR to review the diff.
+ {:else} +
+
+ {kindLabel[selected.kind] ?? selected.kind} + {selected.status} + {#if selected.run_id} + + {selected.run_id.replace('run_', '').slice(0, 8)} + + {/if} +
+

{selected.title}

+ + +
+
+ {#if diffNodeType(selected)} + type: {diffNodeType(selected)} + {/if} + {#each diffTags(selected) as t (t)} + #{t} + {/each} +
+ {#if diffContent(selected)} +
+{diffContent(selected)}
+ {/if} +
+ + + {#if selected.signals.length} +
+ Why this opened + {#each selected.signals as sig (sig.code)} +
+ {sig.code} + {sig.detail} +
+ {/each} +
+ {/if} + + + {#if why} +
+ Agent's reasoning + {#each why as w (w.code)} +
+ {w.code} + {w.detail} +
+ {/each} +
+ {/if} + + + {#if selected.status === 'pending'} +
+ {#each actions as a (a.id)} + + {/each} +
+ {:else} +
+ Decided: {selected.decision ?? selected.status} + {#if selected.decided_at} + · {new Date(selected.decided_at).toLocaleString()} + {/if} +
+ {/if} +
+ {/if} +
+
+
+ + diff --git a/apps/dashboard/src/routes/+layout.svelte b/apps/dashboard/src/routes/+layout.svelte index 1e160fd..53f5a52 100644 --- a/apps/dashboard/src/routes/+layout.svelte +++ b/apps/dashboard/src/routes/+layout.svelte @@ -98,6 +98,8 @@ // set reused the same Unicode glyph across multiple items; every entry here // now has a distinct silhouette that reads instantly. const nav: { href: string; label: string; icon: IconName; shortcut: string }[] = [ + { href: '/blackbox', label: 'Black Box', icon: 'blackbox', shortcut: 'B' }, + { href: '/memory-prs', label: 'Memory PRs', icon: 'memorypr', shortcut: 'Q' }, { href: '/graph', label: 'Graph', icon: 'graph', shortcut: 'G' }, { href: '/reasoning', label: 'Reasoning', icon: 'reasoning', shortcut: 'R' }, { href: '/memories', label: 'Memories', icon: 'memories', shortcut: 'M' }, diff --git a/blackbox-proof-2026-06-22/memory-prs.json b/blackbox-proof-2026-06-22/memory-prs.json new file mode 100644 index 0000000..4850ed4 --- /dev/null +++ b/blackbox-proof-2026-06-22/memory-prs.json @@ -0,0 +1 @@ +{"mode":"risk_gated","pendingCount":0,"prs":[{"created_at":"2026-06-22T21:54:57.994466+00:00","decided_at":"2026-06-22T21:58:46.702516+00:00","decision":"promote","diff":{"decision":"create","node":{"content":"Remember the production auth token and security credential for deployment.","id":"8b9fa8f6-833d-41dc-8520-98b0d031d55c","nodeType":"fact","tags":["security","auth"]}},"id":"pr_dee9244bc0c4419fad61f6c6d2f95f15","kind":"new_fact","run_id":"run_proof_session","signals":[{"code":"sensitive_topic","detail":"Touches a sensitive topic: authentication / authorization."}],"status":"promoted","subject_id":"8b9fa8f6-833d-41dc-8520-98b0d031d55c","title":"New fact pending review: \"Remember the production auth token and security credential for deployment.\""}],"total":1} \ No newline at end of file diff --git a/blackbox-proof-2026-06-22/phase-1-status.json b/blackbox-proof-2026-06-22/phase-1-status.json new file mode 100644 index 0000000..5d25a5d --- /dev/null +++ b/blackbox-proof-2026-06-22/phase-1-status.json @@ -0,0 +1 @@ +{"averageRetention":0.95,"status":"healthy","totalMemories":4,"version":"2.1.27"} \ No newline at end of file diff --git a/blackbox-proof-2026-06-22/phase-3-trace.json b/blackbox-proof-2026-06-22/phase-3-trace.json new file mode 100644 index 0000000..5ef2e73 --- /dev/null +++ b/blackbox-proof-2026-06-22/phase-3-trace.json @@ -0,0 +1 @@ +{"events":[{"argsHash":"e029f4892d293944","at":1782165290352,"runId":"run_proof_session","tool":"smart_ingest","type":"mcp.call"},{"at":1782165290478,"diff":{"decision":"create"},"id":"0acd7785-e13a-4df8-ba5e-11e8d82e7590","runId":"run_proof_session","source":"agent","type":"memory.write"},{"argsHash":"2aef447cf4f6744e","at":1782165291860,"runId":"run_proof_session","tool":"smart_ingest","type":"mcp.call"},{"at":1782165291962,"diff":{"decision":"create"},"id":"cb40ae8c-59a1-4d13-b89f-1333a9357def","runId":"run_proof_session","source":"agent","type":"memory.write"},{"argsHash":"eaefbf6e42cbe187","at":1782165293368,"runId":"run_proof_session","tool":"smart_ingest","type":"mcp.call"},{"at":1782165293474,"diff":{"decision":"create"},"id":"147bee37-33e4-4287-bd6b-931c23d87f81","runId":"run_proof_session","source":"agent","type":"memory.write"},{"argsHash":"c758f278a36c7bc2","at":1782165294877,"runId":"run_proof_session","tool":"deep_reference","type":"mcp.call"},{"activation":{"0acd7785-e13a-4df8-ba5e-11e8d82e7590":0.62,"147bee37-33e4-4287-bd6b-931c23d87f81":0.62,"cb40ae8c-59a1-4d13-b89f-1333a9357def":0.62},"at":1782165294947,"ids":["0acd7785-e13a-4df8-ba5e-11e8d82e7590","147bee37-33e4-4287-bd6b-931c23d87f81","cb40ae8c-59a1-4d13-b89f-1333a9357def"],"runId":"run_proof_session","type":"memory.retrieve"},{"argsHash":"843ce46664574711","at":1782165296385,"runId":"run_proof_session","tool":"search","type":"mcp.call"},{"activation":{},"at":1782165296434,"ids":["147bee37-33e4-4287-bd6b-931c23d87f81"],"runId":"run_proof_session","type":"memory.retrieve"},{"argsHash":"03587119a4acd377","at":1782165297894,"runId":"run_proof_session","tool":"smart_ingest","type":"mcp.call"},{"at":1782165297993,"diff":{"decision":"create"},"id":"8b9fa8f6-833d-41dc-8520-98b0d031d55c","runId":"run_proof_session","source":"agent","type":"memory.write"}],"exportedAt":"2026-06-22T21:59:04.946635+00:00","format":"vestige-trace","runId":"run_proof_session","summary":{"eventCount":12,"firstTool":"smart_ingest","lastAt":1782165297993,"retrievedCount":4,"startedAt":1782165290352,"suppressedCount":0,"vetoCount":0,"writeCount":4},"version":1} \ No newline at end of file diff --git a/blackbox-proof-2026-06-22/proof-summary.md b/blackbox-proof-2026-06-22/proof-summary.md new file mode 100644 index 0000000..1eb9ebf --- /dev/null +++ b/blackbox-proof-2026-06-22/proof-summary.md @@ -0,0 +1,57 @@ +# Agent Black Box — Proof of Life (2026-06-22) + +> Watch the agent think. Watch memory change. Watch the receipt prove why. + +This folder is the launch artifact + regression evidence for the Agent Black Box, +Memory Receipts, and risk-gated Memory PRs, captured from a **live** Vestige +build (`feat/agent-black-box`), not mocks. + +## The trace correlation spine (Phase 0) — verified end to end + +A single `runId` (`run_proof_session`) threads, unbroken, through every layer: + +| Hop | Layer | Evidence | +|----|-------|----------| +| 1 | MCP tool output | every `tools/call` result carries `runId` + `traceUri` (`vestige://trace/{runId}`) | +| 2 | SQLite trace rows | 12 `agent_traces` rows persisted under the runId | +| 3 | WebSocket | each event broadcast as `VestigeEvent::TraceEvent` | +| 4 | dashboard pulse | Black Box tab renders 12 ticks + memory pulse, live | +| 5 | `/api/traces/:runId` | see `phase-3-trace.json` | +| 6 | `vestige://trace/{runId}` | MCP resource resolves the same run | +| 7 | receipt export | `phase-3-trace.json` is the downloadable `.vestige-trace.json` | +| 8 | Cinema replay | "Open receipt in Cinema" deep-links the receipt's memory set | + +## What the run did (12 events, in order) + +`mcp.call → memory.write` × 3 ordinary writes (auto-landed), +`mcp.call → memory.retrieve` × 2 (deep_reference + search, each left a receipt), +`mcp.call → memory.write` × 1 **risky** write (auth/security content). + +## The cognitive immune system fired + +- Mode: **Risk-Gated** (the default). +- The 3 ordinary writes **auto-landed** — no friction. +- The 1 risky write (auth token / security credential) **opened a Memory PR** + with the self-explaining signal `sensitive_topic → "Touches a sensitive + topic: authentication / authorization."` +- Promoting that PR from the dashboard moved it to `promoted` through the full + stack (UI → API → SQLite). See `memory-prs.json`. + +This is the product line, made literal: +**Vestige auto-remembers ordinary context, but opens a Memory PR when the agent +tries to rewrite its own brain.** + +## Files + +- `phase-1-status.json` — server health (spine alive). +- `phase-3-trace.json` — the full `.vestige-trace.json` export (the black box). +- `receipts.json` — the retrieval receipt(s) generated this run. +- `memory-prs.json` — the Memory PR queue, including the promoted risky write. + +## Gates (all green) + +- `cargo test --workspace` — 953 lib tests pass (incl. the trace-spine + integration test driving a real JSON-RPC round-trip). +- `cargo clippy --workspace -- -D warnings` — 0 warnings. +- `pnpm --filter @vestige/dashboard check` — 0 errors, 0 warnings (905 files). +- `pnpm --filter @vestige/dashboard build` — clean. diff --git a/blackbox-proof-2026-06-22/receipts.json b/blackbox-proof-2026-06-22/receipts.json new file mode 100644 index 0000000..83ef5a2 --- /dev/null +++ b/blackbox-proof-2026-06-22/receipts.json @@ -0,0 +1 @@ +{"receipts":[{"activation_path":[],"decay_risk":"high","mutations":[],"receipt_id":"r_2026_06_22_runproof","retrieved":["147bee37-33e4-4287-bd6b-931c23d87f81"],"suppressed":[],"trust_floor":0.0}],"total":1} \ No newline at end of file diff --git a/crates/vestige-core/src/lib.rs b/crates/vestige-core/src/lib.rs index 41f7843..f6f12b4 100644 --- a/crates/vestige-core/src/lib.rs +++ b/crates/vestige-core/src/lib.rs @@ -90,6 +90,10 @@ pub mod fts; pub mod memory; pub mod storage; +/// Agent Black Box, Memory Receipts & Memory PRs — the cognitive flight +/// recorder, immune system, and reviewable-diff model for agent memory. +pub mod trace; + #[cfg(feature = "embeddings")] #[cfg_attr(docsrs, doc(cfg(feature = "embeddings")))] pub mod embeddings; @@ -160,6 +164,13 @@ pub use fsrs::{ // Configuration (vestige.toml output profiles / defaults) pub use config::{CONFIG_FILE, OutputConfig, OutputDefaults, OutputProfile, VestigeConfig}; +// Agent Black Box / Receipts / Memory PRs (the cognitive flight recorder) +pub use trace::{ + classify_write, DecayRisk, MemoryPr, MemoryPrAction, MemoryPrKind, MemoryPrStatus, + MemoryTraceEvent, Receipt, ReceiptMutation, ReviewMode, RiskClass, RiskSignal, SuppressReason, + SuppressedReceiptEntry, WriteContext, WriteSource, HIGH_TRUST_FLOOR, LOW_CONFIDENCE_FLOOR, +}; + // Storage layer pub use storage::{ ClassificationResult, @@ -192,6 +203,7 @@ pub use storage::{ Result, SchedulingState, SearchQuery, + AgentRunSummary, SmartIngestResult, SourceUpsertOutcome, SourceUpsertResult, diff --git a/crates/vestige-core/src/storage/migrations.rs b/crates/vestige-core/src/storage/migrations.rs index 58e5202..b42541c 100644 --- a/crates/vestige-core/src/storage/migrations.rs +++ b/crates/vestige-core/src/storage/migrations.rs @@ -89,6 +89,11 @@ pub const MIGRATIONS: &[Migration] = &[ description: "#57 Source envelope: provenance columns + connector cursor checkpoints for idempotent external-source sync", up: MIGRATION_V17_UP, }, + Migration { + version: 18, + description: "Agent Black Box + Memory Receipts + Memory PRs: replayable run traces, retrieval receipts, risk-gated brain-change review queue", + up: MIGRATION_V18_UP, + }, ]; /// A database migration @@ -1029,6 +1034,105 @@ pub const MIGRATION_V17_ALTER_COLUMNS: &[&str] = &[ "ALTER TABLE knowledge_nodes ADD COLUMN source_author TEXT", ]; +/// V18: Agent Black Box + Memory Receipts + Memory PRs. +/// +/// Three append-only / review tables that turn Vestige into the *black box, +/// immune system, and cinematic debugger for agent memory*: +/// +/// - `agent_traces` — one row per [`crate::trace::MemoryTraceEvent`], ordered by +/// `(run_id, seq)`. Append-only so a run replays exactly as the agent +/// experienced it. `payload` is the full serialized event; `event_type` and +/// `run_id` are denormalized for fast filtering and the `vestige://trace/{id}` +/// resource. `args_hash` (for `mcp.call`) is stored, never the raw args, so +/// traces can't leak prompt contents or secrets. +/// +/// - `memory_receipts` — one row per retrieval receipt. `payload` holds the full +/// [`crate::trace::Receipt`]; the scalar columns (`trust_floor`, `decay_risk`) +/// are denormalized for list/sort without parsing JSON. +/// +/// - `memory_prs` — the risk-gated review queue. A risky write (contradiction +/// vs high-trust, supersede/forget/merge/protect, sensitive topic, dream +/// consolidation, decay resurrection, low-confidence batch, weak-provenance +/// connector) lands here as `pending` instead of auto-committing. `diff` is the +/// structured before/after, `signals` is the self-explaining risk evidence, +/// `run_id` links the PR back to the black-box trace that produced it. +/// +/// `memory_id` / `run_id` references are intentionally *not* foreign keys to +/// `knowledge_nodes`: forgetting or superseding a memory must never erase the +/// audit trail of the trace, receipt, or PR that touched it (same +/// audit-preserving stance as V15's composition tables). +const MIGRATION_V18_UP: &str = r#" +-- Black-box trace events: append-only, ordered by (run_id, seq). +CREATE TABLE IF NOT EXISTS agent_traces ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + seq INTEGER NOT NULL, + event_type TEXT NOT NULL, -- mcp.call | memory.retrieve | ... + tool TEXT, -- denormalized for mcp.call rows + payload TEXT NOT NULL, -- full serialized MemoryTraceEvent (JSON) + at INTEGER NOT NULL, -- wall-clock millis + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_agent_traces_run ON agent_traces(run_id, seq); +CREATE INDEX IF NOT EXISTS idx_agent_traces_type ON agent_traces(event_type); +CREATE INDEX IF NOT EXISTS idx_agent_traces_at ON agent_traces(at); + +-- One row per agent run, for the Black Box run list (denormalized roll-up). +CREATE TABLE IF NOT EXISTS agent_runs ( + run_id TEXT PRIMARY KEY, + first_tool TEXT, + event_count INTEGER NOT NULL DEFAULT 0, + retrieved_count INTEGER NOT NULL DEFAULT 0, + suppressed_count INTEGER NOT NULL DEFAULT 0, + write_count INTEGER NOT NULL DEFAULT 0, + veto_count INTEGER NOT NULL DEFAULT 0, + started_at INTEGER NOT NULL, -- millis of first event + last_at INTEGER NOT NULL, -- millis of latest event + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_agent_runs_last_at ON agent_runs(last_at DESC); + +-- Retrieval receipts (the "nutrition label" for a piece of agent memory). +CREATE TABLE IF NOT EXISTS memory_receipts ( + receipt_id TEXT PRIMARY KEY, + run_id TEXT, -- links to the trace, if any + tool TEXT, + query TEXT, + retrieved_count INTEGER NOT NULL DEFAULT 0, + suppressed_count INTEGER NOT NULL DEFAULT 0, + trust_floor REAL NOT NULL DEFAULT 0, + decay_risk TEXT NOT NULL DEFAULT 'low', + payload TEXT NOT NULL, -- full serialized Receipt (JSON) + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_memory_receipts_run ON memory_receipts(run_id); +CREATE INDEX IF NOT EXISTS idx_memory_receipts_created_at ON memory_receipts(created_at DESC); + +-- Memory PRs: the risk-gated review queue for brain changes. +CREATE TABLE IF NOT EXISTS memory_prs ( + id TEXT PRIMARY KEY, + kind TEXT NOT NULL, -- new_fact | contradiction_detected | ... + status TEXT NOT NULL DEFAULT 'pending', + title TEXT NOT NULL, + subject_id TEXT, -- the memory this PR concerns, if any + run_id TEXT, -- the run that produced it + diff TEXT NOT NULL DEFAULT '{}', -- structured before/after (JSON) + signals TEXT NOT NULL DEFAULT '[]', -- self-explaining RiskSignal[] (JSON) + decision TEXT, -- promote | merge | supersede | ... + created_at TEXT NOT NULL, + decided_at TEXT +); + +CREATE INDEX IF NOT EXISTS idx_memory_prs_status ON memory_prs(status); +CREATE INDEX IF NOT EXISTS idx_memory_prs_kind ON memory_prs(kind); +CREATE INDEX IF NOT EXISTS idx_memory_prs_created_at ON memory_prs(created_at DESC); + +UPDATE schema_version SET version = 18, applied_at = datetime('now'); +"#; + /// Apply pending migrations pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result { let current_version = get_current_version(conn)?; @@ -1109,9 +1213,10 @@ mod tests { // 1. schema_version advanced to the latest migration let version = get_current_version(&conn).expect("read schema_version"); + let latest = MIGRATIONS.last().unwrap().version; assert_eq!( - version, 17, - "schema_version must be 17 after all migrations" + version, latest, + "schema_version must be the latest migration after all migrations" ); // 2. knowledge_edges is gone (V11 drops it) @@ -1236,7 +1341,11 @@ mod tests { // After replaying from V10, the schema advances to the latest version. let version = get_current_version(&conn).expect("read schema_version"); - assert_eq!(version, 17, "schema_version back at latest after replay"); + assert_eq!( + version, + MIGRATIONS.last().unwrap().version, + "schema_version back at latest after replay" + ); } #[test] @@ -1310,7 +1419,11 @@ mod tests { // V16 uses CREATE TABLE IF NOT EXISTS and idempotent ALTER handling. apply_migrations(&conn).expect("V16 replay must be idempotent"); let version = get_current_version(&conn).expect("read version"); - assert_eq!(version, 17, "schema_version must be latest after replay"); + assert_eq!( + version, + MIGRATIONS.last().unwrap().version, + "schema_version must be latest after replay" + ); } #[test] @@ -1400,7 +1513,11 @@ mod tests { .expect("rewind to 16"); apply_migrations(&conn).expect("V17 replay must be idempotent"); let version = get_current_version(&conn).expect("read version"); - assert_eq!(version, 17, "schema_version must be 17 after replay"); + assert_eq!( + version, + MIGRATIONS.last().unwrap().version, + "schema_version must be latest after replay" + ); } #[test] diff --git a/crates/vestige-core/src/storage/mod.rs b/crates/vestige-core/src/storage/mod.rs index d82da6a..84bb2de 100644 --- a/crates/vestige-core/src/storage/mod.rs +++ b/crates/vestige-core/src/storage/mod.rs @@ -10,6 +10,7 @@ mod memory_store; mod migrations; mod portable; mod sqlite; +mod trace_store; #[cfg(feature = "cloud-sync")] pub use cloud_sync::HttpPortableSyncBackend; @@ -32,6 +33,7 @@ pub use sqlite::{ SmartIngestResult, SourceUpsertOutcome, SourceUpsertResult, SqliteMemoryStore, StateTransitionRecord, StorageError, }; +pub use trace_store::AgentRunSummary; /// Backwards-compatibility alias. Retained until Phase 4 completes so every /// existing `Arc` call site keeps compiling. Scheduled for removal diff --git a/crates/vestige-core/src/storage/sqlite.rs b/crates/vestige-core/src/storage/sqlite.rs index fd394b1..786295f 100644 --- a/crates/vestige-core/src/storage/sqlite.rs +++ b/crates/vestige-core/src/storage/sqlite.rs @@ -302,8 +302,11 @@ const VESTIGE_DISABLE_VECTOR_SEARCH: &str = "VESTIGE_DISABLE_VECTOR_SEARCH"; /// so the MCP layer can use `Arc` instead of `Arc>`. pub struct SqliteMemoryStore { db_path: PathBuf, - writer: Mutex, - reader: Mutex, + // `pub(crate)` so the sibling `trace_store` module (Black Box / Receipts / + // Memory PRs CRUD) can lock the same writer/reader connections and follow + // the established store idiom without duplicating connection management. + pub(crate) writer: Mutex, + pub(crate) reader: Mutex, scheduler: Mutex, #[cfg(feature = "embeddings")] embedding_service: EmbeddingService, diff --git a/crates/vestige-core/src/storage/trace_store.rs b/crates/vestige-core/src/storage/trace_store.rs new file mode 100644 index 0000000..92d2db2 --- /dev/null +++ b/crates/vestige-core/src/storage/trace_store.rs @@ -0,0 +1,595 @@ +//! # Black Box / Receipts / Memory PRs — persistence +//! +//! CRUD for the three V18 tables (`agent_traces` + `agent_runs`, +//! `memory_receipts`, `memory_prs`) on [`SqliteMemoryStore`]. The pure data +//! model lives in [`crate::trace`]; this file is the storage half of the +//! Black Box, immune system, and cinematic debugger for agent memory. +//! +//! Every method follows the established store idiom: lock the writer/reader +//! `Mutex`, `params![]`-bind, store timestamps as RFC3339 (and +//! event millis as INTEGER), serialize structured fields with `serde_json`, and +//! map rows back through a small closure. + +use chrono::Utc; +use rusqlite::{params, OptionalExtension}; +use uuid::Uuid; + +use super::sqlite::SqliteMemoryStore; +use super::{Result, StorageError}; +use crate::trace::{MemoryPr, MemoryPrAction, MemoryPrStatus, MemoryTraceEvent, Receipt}; + +/// A roll-up summary of one agent run, for the Black Box run list. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)] +pub struct AgentRunSummary { + /// The run id. + pub run_id: String, + /// The first tool invoked in the run (the run's "entry point"). + pub first_tool: Option, + /// Total events recorded. + pub event_count: i64, + /// Memories retrieved across the run. + pub retrieved_count: i64, + /// Memories suppressed across the run. + pub suppressed_count: i64, + /// Memory writes across the run. + pub write_count: i64, + /// Sanhedrin vetoes across the run. + pub veto_count: i64, + /// Millis of the first event. + pub started_at: i64, + /// Millis of the most recent event. + pub last_at: i64, +} + +impl SqliteMemoryStore { + // ======================================================================== + // BLACK BOX — trace events + run roll-up + // ======================================================================== + + /// Append one trace event to a run (append-only) and update the run + /// roll-up. Returns the assigned sequence number within the run. + /// + /// `seq` is `MAX(seq)+1` for the run, computed under the writer lock so a + /// run's events stay totally ordered even under concurrent tool calls. + pub fn append_trace_event(&self, event: &MemoryTraceEvent) -> Result { + let now = Utc::now(); + let run_id = event.run_id().to_string(); + let event_type = event.kind(); + let at = event.at(); + let payload = serde_json::to_string(event) + .map_err(|e| StorageError::Init(format!("trace event serialize: {e}")))?; + let tool = match event { + MemoryTraceEvent::McpCall { tool, .. } => Some(tool.clone()), + _ => None, + }; + + // Roll-up deltas this event contributes. + let (d_retrieved, d_suppressed, d_write, d_veto) = match event { + MemoryTraceEvent::MemoryRetrieve { ids, .. } => (ids.len() as i64, 0, 0, 0), + MemoryTraceEvent::MemorySuppress { .. } => (0, 1, 0, 0), + MemoryTraceEvent::MemoryWrite { .. } => (0, 0, 1, 0), + MemoryTraceEvent::SanhedrinVeto { .. } => (0, 0, 0, 1), + _ => (0, 0, 0, 0), + }; + + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + + let seq: i64 = writer + .query_row( + "SELECT COALESCE(MAX(seq), -1) + 1 FROM agent_traces WHERE run_id = ?1", + params![run_id], + |r| r.get(0), + ) + .unwrap_or(0); + + writer.execute( + "INSERT INTO agent_traces (id, run_id, seq, event_type, tool, payload, at, created_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + Uuid::new_v4().to_string(), + run_id, + seq, + event_type, + tool, + payload, + at, + now.to_rfc3339(), + ], + )?; + + // Upsert the run roll-up. On first event the row is created with the + // event's tool as the entry point; subsequent events accumulate counts + // and advance `last_at`. + writer.execute( + "INSERT INTO agent_runs (run_id, first_tool, event_count, retrieved_count, + suppressed_count, write_count, veto_count, started_at, last_at, created_at) + VALUES (?1, ?2, 1, ?3, ?4, ?5, ?6, ?7, ?7, ?8) + ON CONFLICT(run_id) DO UPDATE SET + first_tool = COALESCE(agent_runs.first_tool, excluded.first_tool), + event_count = agent_runs.event_count + 1, + retrieved_count = agent_runs.retrieved_count + ?3, + suppressed_count = agent_runs.suppressed_count + ?4, + write_count = agent_runs.write_count + ?5, + veto_count = agent_runs.veto_count + ?6, + last_at = MAX(agent_runs.last_at, ?7)", + params![ + run_id, + tool, + d_retrieved, + d_suppressed, + d_write, + d_veto, + at, + now.to_rfc3339(), + ], + )?; + + Ok(seq) + } + + /// Fetch every event of a run, in sequence order. The black-box replay. + pub fn get_trace(&self, run_id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = reader.prepare( + "SELECT payload FROM agent_traces WHERE run_id = ?1 ORDER BY seq ASC", + )?; + let rows = stmt.query_map(params![run_id], |row| { + let payload: String = row.get(0)?; + Ok(payload) + })?; + let mut out = Vec::new(); + for r in rows { + let payload = r?; + if let Ok(ev) = serde_json::from_str::(&payload) { + out.push(ev); + } + } + Ok(out) + } + + /// List recent runs, newest activity first. + pub fn list_agent_runs(&self, limit: usize) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = reader.prepare( + "SELECT run_id, first_tool, event_count, retrieved_count, suppressed_count, + write_count, veto_count, started_at, last_at + FROM agent_runs ORDER BY last_at DESC LIMIT ?1", + )?; + let rows = stmt.query_map(params![limit as i64], Self::row_to_run_summary)?; + let mut out = Vec::new(); + for r in rows { + out.push(r?); + } + Ok(out) + } + + /// Fetch one run summary. + pub fn get_agent_run(&self, run_id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + reader + .query_row( + "SELECT run_id, first_tool, event_count, retrieved_count, suppressed_count, + write_count, veto_count, started_at, last_at + FROM agent_runs WHERE run_id = ?1", + params![run_id], + Self::row_to_run_summary, + ) + .optional() + .map_err(StorageError::from) + } + + fn row_to_run_summary(row: &rusqlite::Row) -> rusqlite::Result { + Ok(AgentRunSummary { + run_id: row.get("run_id")?, + first_tool: row.get("first_tool").ok().flatten(), + event_count: row.get("event_count")?, + retrieved_count: row.get("retrieved_count")?, + suppressed_count: row.get("suppressed_count")?, + write_count: row.get("write_count")?, + veto_count: row.get("veto_count")?, + started_at: row.get("started_at")?, + last_at: row.get("last_at")?, + }) + } + + // ======================================================================== + // MEMORY RECEIPTS + // ======================================================================== + + /// Persist a retrieval receipt. `run_id`/`tool`/`query` are denormalized + /// context for the dashboard; the full [`Receipt`] is stored as JSON. + pub fn save_receipt( + &self, + receipt: &Receipt, + run_id: Option<&str>, + tool: Option<&str>, + query: Option<&str>, + ) -> Result<()> { + let payload = serde_json::to_string(receipt) + .map_err(|e| StorageError::Init(format!("receipt serialize: {e}")))?; + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + "INSERT OR REPLACE INTO memory_receipts + (receipt_id, run_id, tool, query, retrieved_count, suppressed_count, + trust_floor, decay_risk, payload, created_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", + params![ + receipt.receipt_id, + run_id, + tool, + query, + receipt.retrieved.len() as i64, + receipt.suppressed.len() as i64, + receipt.trust_floor, + receipt.decay_risk.as_str(), + payload, + Utc::now().to_rfc3339(), + ], + )?; + Ok(()) + } + + /// Fetch one receipt by id. + pub fn get_receipt(&self, receipt_id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let payload: Option = reader + .query_row( + "SELECT payload FROM memory_receipts WHERE receipt_id = ?1", + params![receipt_id], + |row| row.get(0), + ) + .optional()?; + Ok(payload.and_then(|p| serde_json::from_str(&p).ok())) + } + + /// List recent receipts, newest first. + pub fn list_receipts(&self, limit: usize) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = reader.prepare( + "SELECT payload FROM memory_receipts ORDER BY created_at DESC LIMIT ?1", + )?; + let rows = stmt.query_map(params![limit as i64], |row| { + let p: String = row.get(0)?; + Ok(p) + })?; + let mut out = Vec::new(); + for r in rows { + if let Ok(rc) = serde_json::from_str::(&r?) { + out.push(rc); + } + } + Ok(out) + } + + // ======================================================================== + // MEMORY PRs — the risk-gated review queue + // ======================================================================== + + /// Open (insert) a Memory PR. + pub fn save_memory_pr(&self, pr: &MemoryPr) -> Result<()> { + let diff = serde_json::to_string(&pr.diff).unwrap_or_else(|_| "{}".to_string()); + let signals = serde_json::to_string(&pr.signals).unwrap_or_else(|_| "[]".to_string()); + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + "INSERT OR REPLACE INTO memory_prs + (id, kind, status, title, subject_id, run_id, diff, signals, + decision, created_at, decided_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + params![ + pr.id, + pr.kind.as_str(), + pr.status.as_str(), + pr.title, + pr.subject_id, + pr.run_id, + diff, + signals, + pr.decision + .and_then(|d| serde_json::to_value(d).ok()) + .and_then(|v| v.as_str().map(|s| s.to_string())), + pr.created_at, + pr.decided_at, + ], + )?; + Ok(()) + } + + /// Fetch one Memory PR by id. + pub fn get_memory_pr(&self, id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + reader + .query_row( + "SELECT id, kind, status, title, subject_id, run_id, diff, signals, + decision, created_at, decided_at + FROM memory_prs WHERE id = ?1", + params![id], + Self::row_to_memory_pr, + ) + .optional() + .map_err(StorageError::from) + } + + /// List Memory PRs, optionally filtered by status, newest first. + pub fn list_memory_prs( + &self, + status: Option, + limit: usize, + ) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let (sql, with_filter) = match status { + Some(_) => ( + "SELECT id, kind, status, title, subject_id, run_id, diff, signals, + decision, created_at, decided_at + FROM memory_prs WHERE status = ?1 ORDER BY created_at DESC LIMIT ?2", + true, + ), + None => ( + "SELECT id, kind, status, title, subject_id, run_id, diff, signals, + decision, created_at, decided_at + FROM memory_prs ORDER BY created_at DESC LIMIT ?1", + false, + ), + }; + let mut stmt = reader.prepare(sql)?; + let mut out = Vec::new(); + if with_filter { + let st = status.unwrap(); + let rows = + stmt.query_map(params![st.as_str(), limit as i64], Self::row_to_memory_pr)?; + for r in rows { + out.push(r?); + } + } else { + let rows = stmt.query_map(params![limit as i64], Self::row_to_memory_pr)?; + for r in rows { + out.push(r?); + } + } + Ok(out) + } + + /// Count pending Memory PRs (for the nav badge). + pub fn count_pending_memory_prs(&self) -> Result { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let n: i64 = reader + .query_row( + "SELECT COUNT(*) FROM memory_prs WHERE status = 'pending'", + [], + |r| r.get(0), + ) + .unwrap_or(0); + Ok(n) + } + + /// Record a decision on a Memory PR, moving it out of `pending`. Returns the + /// updated PR. `AskAgentWhy` is read-only and never reaches here. + pub fn decide_memory_pr(&self, id: &str, action: MemoryPrAction) -> Result { + let new_status = action.resulting_status().ok_or_else(|| { + StorageError::Init("ask_agent_why is read-only and decides nothing".into()) + })?; + let decision = serde_json::to_value(action) + .ok() + .and_then(|v| v.as_str().map(|s| s.to_string())) + .unwrap_or_default(); + let now = Utc::now().to_rfc3339(); + { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + let changed = writer.execute( + "UPDATE memory_prs SET status = ?1, decision = ?2, decided_at = ?3 WHERE id = ?4", + params![new_status.as_str(), decision, now, id], + )?; + if changed == 0 { + return Err(StorageError::NotFound(id.to_string())); + } + } + self.get_memory_pr(id)? + .ok_or_else(|| StorageError::NotFound(id.to_string())) + } + + fn row_to_memory_pr(row: &rusqlite::Row) -> rusqlite::Result { + let kind_s: String = row.get("kind")?; + let status_s: String = row.get("status")?; + let diff_s: String = row.get("diff")?; + let signals_s: String = row.get("signals")?; + let decision_s: Option = row.get("decision").ok().flatten(); + + let kind = crate::trace::MemoryPrKind::from_label(&kind_s) + .unwrap_or(crate::trace::MemoryPrKind::NewFact); + let status = serde_json::from_value(serde_json::Value::String(status_s)) + .unwrap_or(MemoryPrStatus::Pending); + let diff: serde_json::Value = serde_json::from_str(&diff_s).unwrap_or(serde_json::json!({})); + let signals = serde_json::from_str(&signals_s).unwrap_or_default(); + let decision = decision_s + .and_then(|s| serde_json::from_value(serde_json::Value::String(s)).ok()); + + Ok(MemoryPr { + id: row.get("id")?, + kind, + status, + title: row.get("title")?, + diff, + signals, + subject_id: row.get("subject_id").ok().flatten(), + run_id: row.get("run_id").ok().flatten(), + created_at: row.get("created_at")?, + decided_at: row.get("decided_at").ok().flatten(), + decision, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::trace::{ + DecayRisk, MemoryPrKind, MemoryTraceEvent, Receipt, RiskSignal, SuppressReason, + SuppressedReceiptEntry, + }; + + fn store() -> SqliteMemoryStore { + // Temp-file store for isolated, fast tests (mirrors the existing + // sqlite.rs test helpers; there is no in-memory constructor). + let dir = tempfile::tempdir().unwrap(); + SqliteMemoryStore::new(Some(dir.path().join("trace_test.db"))).expect("test store") + } + + #[test] + fn trace_append_orders_and_rolls_up() { + let s = store(); + let run = "run_abc"; + s.append_trace_event(&MemoryTraceEvent::McpCall { + run_id: run.into(), + tool: "deep_reference".into(), + args_hash: "h".into(), + at: 100, + }) + .unwrap(); + let mut activation = std::collections::BTreeMap::new(); + activation.insert("m1".to_string(), 0.9); + s.append_trace_event(&MemoryTraceEvent::MemoryRetrieve { + run_id: run.into(), + ids: vec!["m1".into(), "m2".into()], + activation, + at: 110, + }) + .unwrap(); + s.append_trace_event(&MemoryTraceEvent::MemorySuppress { + run_id: run.into(), + id: "m3".into(), + reason: SuppressReason::Contradicted, + at: 120, + }) + .unwrap(); + + let events = s.get_trace(run).unwrap(); + assert_eq!(events.len(), 3); + assert_eq!(events[0].kind(), "mcp.call"); + assert_eq!(events[2].kind(), "memory.suppress"); + + let summary = s.get_agent_run(run).unwrap().unwrap(); + assert_eq!(summary.first_tool.as_deref(), Some("deep_reference")); + assert_eq!(summary.event_count, 3); + assert_eq!(summary.retrieved_count, 2); + assert_eq!(summary.suppressed_count, 1); + assert_eq!(summary.started_at, 100); + assert_eq!(summary.last_at, 120); + + let runs = s.list_agent_runs(10).unwrap(); + assert_eq!(runs.len(), 1); + assert_eq!(runs[0].run_id, run); + } + + #[test] + fn receipt_roundtrips() { + let s = store(); + let receipt = Receipt { + receipt_id: "r_2026_06_22_abc".into(), + retrieved: vec!["m1".into(), "m2".into()], + suppressed: vec![SuppressedReceiptEntry::new("m3", SuppressReason::LowTrust)], + activation_path: vec!["a -> b".into()], + trust_floor: 0.62, + decay_risk: DecayRisk::Medium, + mutations: vec![], + }; + s.save_receipt(&receipt, Some("run_abc"), Some("search"), Some("q")) + .unwrap(); + let got = s.get_receipt("r_2026_06_22_abc").unwrap().unwrap(); + assert_eq!(got, receipt); + assert_eq!(s.list_receipts(10).unwrap().len(), 1); + } + + #[test] + fn memory_pr_lifecycle() { + let s = store(); + let pr = MemoryPr { + id: "pr_1".into(), + kind: MemoryPrKind::ContradictionDetected, + status: MemoryPrStatus::Pending, + title: "Agent wants to overwrite a high-trust fact".into(), + diff: serde_json::json!({"before": "x", "after": "y"}), + signals: vec![RiskSignal { + code: "contradicts_high_trust".into(), + detail: "Contradicts trust 0.9.".into(), + }], + subject_id: Some("m_old".into()), + run_id: Some("run_abc".into()), + created_at: Utc::now().to_rfc3339(), + decided_at: None, + decision: None, + }; + s.save_memory_pr(&pr).unwrap(); + + assert_eq!(s.count_pending_memory_prs().unwrap(), 1); + let pending = s + .list_memory_prs(Some(MemoryPrStatus::Pending), 10) + .unwrap(); + assert_eq!(pending.len(), 1); + assert_eq!(pending[0].signals[0].code, "contradicts_high_trust"); + + let decided = s.decide_memory_pr("pr_1", MemoryPrAction::Promote).unwrap(); + assert_eq!(decided.status, MemoryPrStatus::Promoted); + assert_eq!(decided.decision, Some(MemoryPrAction::Promote)); + assert!(decided.decided_at.is_some()); + assert_eq!(s.count_pending_memory_prs().unwrap(), 0); + } + + #[test] + fn ask_agent_why_is_not_a_decision() { + let s = store(); + let pr = MemoryPr { + id: "pr_2".into(), + kind: MemoryPrKind::NewFact, + status: MemoryPrStatus::Pending, + title: "t".into(), + diff: serde_json::json!({}), + signals: vec![], + subject_id: None, + run_id: None, + created_at: Utc::now().to_rfc3339(), + decided_at: None, + decision: None, + }; + s.save_memory_pr(&pr).unwrap(); + assert!(s + .decide_memory_pr("pr_2", MemoryPrAction::AskAgentWhy) + .is_err()); + // Still pending. + assert_eq!(s.count_pending_memory_prs().unwrap(), 1); + } +} diff --git a/crates/vestige-core/src/trace/mod.rs b/crates/vestige-core/src/trace/mod.rs new file mode 100644 index 0000000..09778cb --- /dev/null +++ b/crates/vestige-core/src/trace/mod.rs @@ -0,0 +1,356 @@ +//! # Agent Black Box, Receipts & Memory PRs — the cognitive flight recorder +//! +//! This module holds the **pure** data model and classification logic for three +//! tightly-related capabilities that together make Vestige *the black box, +//! immune system, and cinematic debugger for agent memory*: +//! +//! 1. **Agent Black Box** — a replayable trace of everything an agent run did to +//! memory: prompt → retrieved → suppressed → activated edges → tool calls → +//! writes → contradictions → vetoes → dream consolidation → final answer. +//! The event model is [`MemoryTraceEvent`]. +//! +//! 2. **Memory Receipts** — every important retrieval returns a structured +//! [`Receipt`]: what was retrieved, what was suppressed and why, the +//! activation path that surfaced it, the trust floor, the decay risk, and any +//! mutations. A receipt is the "nutrition label" for a piece of agent memory. +//! +//! 3. **Memory PRs** — changes to an agent's *brain* are reviewed like changes +//! to code. Ordinary context auto-commits (and always leaves a receipt), but +//! risky writes — contradictions against high-trust memory, supersede / forget +//! / merge / protect, identity / preference / workflow / positioning facts, +//! permission / auth / security / money / legal facts, dream consolidation +//! proposals, decay-below-threshold resurrection, low-confidence batch +//! imports, and weak-provenance connector writes — open a reviewable +//! [`MemoryPr`]. The gating decision is [`classify_write`]. +//! +//! ## Design north star (shared with [`crate::advanced::merge_supersede`]) +//! +//! - **append-only** — trace events are never mutated, only appended, so a run +//! replays exactly as the agent experienced it. +//! - **self-explaining** — every gated write carries the [`RiskSignal`]s that +//! explain *why* it needs review, in plain language. +//! - **opt-in friction** — the default [`ReviewMode::RiskGated`] keeps ordinary +//! memory frictionless and only opens a PR when the agent tries to rewrite its +//! own brain. [`ReviewMode::Fast`] never gates; [`ReviewMode::Paranoid`] gates +//! every write. +//! - **DB-free** — this module is pure logic so it is unit-testable without a +//! database. Persistence (the `agent_traces`, `memory_receipts`, and +//! `memory_prs` tables) lives in [`crate::storage`]. +//! +//! The killer line, made literal by [`classify_write`]: +//! +//! > Vestige auto-remembers ordinary context, but opens a Memory PR when the +//! > agent tries to rewrite its own brain. + +use serde::{Deserialize, Serialize}; + +mod receipt; +mod review; + +pub use receipt::{DecayRisk, Receipt, ReceiptMutation, SuppressedReceiptEntry}; +pub use review::{ + classify_write, MemoryPr, MemoryPrAction, MemoryPrKind, MemoryPrStatus, ReviewMode, RiskClass, + RiskSignal, WriteContext, HIGH_TRUST_FLOOR, LOW_CONFIDENCE_FLOOR, +}; + +// ============================================================================ +// TRACE EVENTS — the black-box flight recorder +// ============================================================================ + +/// One append-only event in an agent run's black-box trace. +/// +/// Mirrors the TypeScript `MemoryTraceEvent` union exactly (tagged on `type`, +/// camelCase fields) so the dashboard, the `vestige://trace/{runId}` MCP +/// resource, and the exported `.vestige-trace.json` all speak one schema. +/// +/// ```ts +/// type MemoryTraceEvent = +/// | { type: "mcp.call"; runId; tool; argsHash; at } +/// | { type: "memory.retrieve"; runId; ids; activation; at } +/// | { type: "memory.suppress"; runId; id; reason } +/// | { type: "memory.write"; runId; id; diff; source } +/// | { type: "sanhedrin.veto"; runId; claim; evidenceIds; confidence } +/// | { type: "dream.patch"; runId; proposalIds; at }; +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type")] +pub enum MemoryTraceEvent { + /// An MCP tool was invoked. The args are stored as a hash (not the raw + /// payload) so traces never leak prompt contents or secrets. + #[serde(rename = "mcp.call")] + McpCall { + #[serde(rename = "runId")] + run_id: String, + tool: String, + #[serde(rename = "argsHash")] + args_hash: String, + at: i64, + }, + + /// Memories were retrieved, with per-id spreading-activation strength so the + /// graph replay can pulse exactly the nodes the agent saw, at their weight. + #[serde(rename = "memory.retrieve")] + MemoryRetrieve { + #[serde(rename = "runId")] + run_id: String, + ids: Vec, + activation: std::collections::BTreeMap, + at: i64, + }, + + /// A memory that *would* have surfaced was suppressed, with the reason — + /// this is the "what the agent chose NOT to use" channel. + #[serde(rename = "memory.suppress")] + MemorySuppress { + #[serde(rename = "runId")] + run_id: String, + id: String, + reason: SuppressReason, + #[serde(default)] + at: i64, + }, + + /// A memory was written / strengthened. `diff` is an opaque JSON description + /// of the change; `source` records who caused it. + #[serde(rename = "memory.write")] + MemoryWrite { + #[serde(rename = "runId")] + run_id: String, + id: String, + diff: serde_json::Value, + source: WriteSource, + #[serde(default)] + at: i64, + }, + + /// A contradiction was detected between memories during a run — its own + /// first-class event (not folded into `memory.suppress`), so the Black Box + /// can show the exact contradiction decision the agent faced. + #[serde(rename = "contradiction.detected")] + ContradictionDetected { + #[serde(rename = "runId")] + run_id: String, + /// The two (or more) memory ids in tension. + ids: Vec, + /// The id the agent trusted (kept), if it resolved the tension. + #[serde(rename = "winnerId", skip_serializing_if = "Option::is_none")] + winner_id: Option, + /// Plain-language description of the contradiction. + detail: String, + #[serde(default)] + at: i64, + }, + + /// The Sanhedrin verifier vetoed a claim the agent was about to assert, + /// citing the evidence it weighed and its confidence. + #[serde(rename = "sanhedrin.veto")] + SanhedrinVeto { + #[serde(rename = "runId")] + run_id: String, + claim: String, + #[serde(rename = "evidenceIds")] + evidence_ids: Vec, + confidence: f64, + #[serde(default)] + at: i64, + }, + + /// Dream consolidation proposed a patch to memory (merge / insight / prune). + #[serde(rename = "dream.patch")] + DreamPatch { + #[serde(rename = "runId")] + run_id: String, + #[serde(rename = "proposalIds")] + proposal_ids: Vec, + at: i64, + }, +} + +impl MemoryTraceEvent { + /// The run this event belongs to. + pub fn run_id(&self) -> &str { + match self { + MemoryTraceEvent::McpCall { run_id, .. } + | MemoryTraceEvent::MemoryRetrieve { run_id, .. } + | MemoryTraceEvent::MemorySuppress { run_id, .. } + | MemoryTraceEvent::MemoryWrite { run_id, .. } + | MemoryTraceEvent::ContradictionDetected { run_id, .. } + | MemoryTraceEvent::SanhedrinVeto { run_id, .. } + | MemoryTraceEvent::DreamPatch { run_id, .. } => run_id, + } + } + + /// The wall-clock millisecond timestamp the event was recorded at. + pub fn at(&self) -> i64 { + match self { + MemoryTraceEvent::McpCall { at, .. } + | MemoryTraceEvent::MemoryRetrieve { at, .. } + | MemoryTraceEvent::MemorySuppress { at, .. } + | MemoryTraceEvent::MemoryWrite { at, .. } + | MemoryTraceEvent::ContradictionDetected { at, .. } + | MemoryTraceEvent::SanhedrinVeto { at, .. } + | MemoryTraceEvent::DreamPatch { at, .. } => *at, + } + } + + /// Short stable kind label used for filtering / the `event_type` column. + pub fn kind(&self) -> &'static str { + match self { + MemoryTraceEvent::McpCall { .. } => "mcp.call", + MemoryTraceEvent::MemoryRetrieve { .. } => "memory.retrieve", + MemoryTraceEvent::MemorySuppress { .. } => "memory.suppress", + MemoryTraceEvent::MemoryWrite { .. } => "memory.write", + MemoryTraceEvent::ContradictionDetected { .. } => "contradiction.detected", + MemoryTraceEvent::SanhedrinVeto { .. } => "sanhedrin.veto", + MemoryTraceEvent::DreamPatch { .. } => "dream.patch", + } + } + + /// Stamp `at` on events that left it defaulted (the recorder fills this so + /// callers don't have to thread a clock through every emit site). + pub fn with_at(mut self, now_ms: i64) -> Self { + match &mut self { + MemoryTraceEvent::McpCall { at, .. } + | MemoryTraceEvent::MemoryRetrieve { at, .. } + | MemoryTraceEvent::MemorySuppress { at, .. } + | MemoryTraceEvent::MemoryWrite { at, .. } + | MemoryTraceEvent::ContradictionDetected { at, .. } + | MemoryTraceEvent::SanhedrinVeto { at, .. } + | MemoryTraceEvent::DreamPatch { at, .. } => { + if *at == 0 { + *at = now_ms; + } + } + } + self + } +} + +/// Why a memory was suppressed during a run. Mirrors the TS union member +/// `"low_trust" | "decayed" | "contradicted" | "privacy"`, plus `competition` +/// for the existing spreading-activation competition suppression. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum SuppressReason { + /// Below the trust floor for this retrieval. + LowTrust, + /// FSRS retrievability decayed below the usable threshold. + Decayed, + /// Contradicted by a higher-trust memory. + Contradicted, + /// Withheld for privacy / sensitivity reasons. + Privacy, + /// Lost spreading-activation competition to a stronger memory. + Competition, +} + +impl SuppressReason { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + SuppressReason::LowTrust => "low_trust", + SuppressReason::Decayed => "decayed", + SuppressReason::Contradicted => "contradicted", + SuppressReason::Privacy => "privacy", + SuppressReason::Competition => "competition", + } + } +} + +/// Who caused a `memory.write`. Mirrors the TS `"agent" | "user" | "dream"`, +/// plus `connector` for external-source sync writes. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum WriteSource { + /// The agent wrote it autonomously. + Agent, + /// The user explicitly asked for it. + User, + /// Produced by dream consolidation. + Dream, + /// Ingested by an external connector (GitHub, Redmine, …). + Connector, +} + +impl WriteSource { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + WriteSource::Agent => "agent", + WriteSource::User => "user", + WriteSource::Dream => "dream", + WriteSource::Connector => "connector", + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn trace_event_roundtrips_with_ts_shape() { + let ev = MemoryTraceEvent::McpCall { + run_id: "run_123".into(), + tool: "deep_reference".into(), + args_hash: "abc".into(), + at: 42, + }; + let json = serde_json::to_value(&ev).unwrap(); + // Tagged on `type`, camelCase runId/argsHash — exactly the TS contract. + assert_eq!(json["type"], "mcp.call"); + assert_eq!(json["runId"], "run_123"); + assert_eq!(json["argsHash"], "abc"); + assert_eq!(json["at"], 42); + + let back: MemoryTraceEvent = serde_json::from_value(json).unwrap(); + assert_eq!(back, ev); + } + + #[test] + fn retrieve_event_carries_activation_map() { + let mut activation = std::collections::BTreeMap::new(); + activation.insert("mem_1".to_string(), 0.91); + activation.insert("mem_7".to_string(), 0.42); + let ev = MemoryTraceEvent::MemoryRetrieve { + run_id: "r".into(), + ids: vec!["mem_1".into(), "mem_7".into()], + activation, + at: 1, + }; + let json = serde_json::to_value(&ev).unwrap(); + assert_eq!(json["type"], "memory.retrieve"); + assert_eq!(json["activation"]["mem_1"], 0.91); + } + + #[test] + fn with_at_fills_only_when_unset() { + let ev = MemoryTraceEvent::MemorySuppress { + run_id: "r".into(), + id: "m".into(), + reason: SuppressReason::Contradicted, + at: 0, + } + .with_at(999); + assert_eq!(ev.at(), 999); + + let ev2 = MemoryTraceEvent::DreamPatch { + run_id: "r".into(), + proposal_ids: vec!["p".into()], + at: 7, + } + .with_at(999); + assert_eq!(ev2.at(), 7, "explicit timestamp must not be overwritten"); + } + + #[test] + fn suppress_reason_labels_match_ts() { + assert_eq!(SuppressReason::LowTrust.as_str(), "low_trust"); + assert_eq!(SuppressReason::Contradicted.as_str(), "contradicted"); + // Serde uses the same snake_case form on the wire. + assert_eq!( + serde_json::to_value(SuppressReason::Privacy).unwrap(), + serde_json::json!("privacy") + ); + } +} diff --git a/crates/vestige-core/src/trace/receipt.rs b/crates/vestige-core/src/trace/receipt.rs new file mode 100644 index 0000000..7501d14 --- /dev/null +++ b/crates/vestige-core/src/trace/receipt.rs @@ -0,0 +1,247 @@ +//! # Memory Receipts +//! +//! Every important retrieval returns a [`Receipt`] — a structured record of what +//! the agent's memory actually did to answer a query. It is built entirely from +//! data the retrieval pipeline *already computes* (scored memories, suppression +//! decisions, spreading-activation path, FSRS trust), so attaching one is nearly +//! free and never changes the answer. +//! +//! The canonical shape (matching the product spec): +//! +//! ```json +//! { +//! "receipt_id": "r_2026_06_22_abc", +//! "retrieved": ["mem_1", "mem_7", "mem_9"], +//! "suppressed": [{"id": "mem_4", "reason": "contradicted"}], +//! "activation_path": ["project_goal -> design_decision -> current_file"], +//! "trust_floor": 0.62, +//! "decay_risk": "medium", +//! "mutations": [] +//! } +//! ``` + +use serde::{Deserialize, Serialize}; + +use super::SuppressReason; + +/// A structured receipt attached to a retrieval's output. +/// +/// Field names are snake_case to match the published product spec and the +/// dashboard receipt card exactly. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Receipt { + /// Stable, human-legible id: `r___
_`. + pub receipt_id: String, + + /// Ids of the memories that actually informed the answer, best-first. + pub retrieved: Vec, + + /// Memories that were withheld, each with the reason — the "what the agent + /// chose NOT to use" channel that makes retrieval auditable. + pub suppressed: Vec, + + /// Human-readable spreading-activation path(s) that surfaced the result, + /// e.g. `"project_goal -> design_decision -> current_file"`. + pub activation_path: Vec, + + /// The minimum trust score among the retrieved memories — the weakest link + /// the answer rests on. + pub trust_floor: f64, + + /// Coarse decay risk for the retrieved set (how stale the evidence is). + pub decay_risk: DecayRisk, + + /// Any memory mutations this retrieval triggered (testing-effect + /// strengthening, reconsolidation, supersession). Empty for a pure read. + pub mutations: Vec, +} + +impl Receipt { + /// Build a receipt from already-computed retrieval signals. + /// + /// `receipt_id` is derived from `now` + a short discriminator so it is both + /// human-legible and collision-resistant within a day. `trust_scores` is the + /// per-id FSRS retrievability/trust the pipeline already produced. + pub fn build( + now: chrono::DateTime, + discriminator: &str, + retrieved: Vec, + suppressed: Vec, + activation_path: Vec, + trust_scores: &[f64], + mutations: Vec, + ) -> Self { + let trust_floor = trust_scores + .iter() + .copied() + .fold(f64::INFINITY, f64::min); + let trust_floor = if trust_floor.is_finite() { + (trust_floor * 100.0).round() / 100.0 + } else { + 0.0 + }; + let decay_risk = DecayRisk::from_trust_floor(trust_floor); + + let short: String = discriminator + .chars() + .filter(|c| c.is_ascii_alphanumeric()) + .take(8) + .collect(); + let receipt_id = format!("r_{}_{}", now.format("%Y_%m_%d"), short); + + Self { + receipt_id, + retrieved, + suppressed, + activation_path, + trust_floor, + decay_risk, + mutations, + } + } +} + +/// One suppressed-memory entry in a [`Receipt`]. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SuppressedReceiptEntry { + /// The id of the suppressed memory. + pub id: String, + /// Why it was withheld. + pub reason: SuppressReason, +} + +impl SuppressedReceiptEntry { + /// Convenience constructor. + pub fn new(id: impl Into, reason: SuppressReason) -> Self { + Self { + id: id.into(), + reason, + } + } +} + +/// Coarse staleness signal for a retrieved set, derived from the trust floor. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum DecayRisk { + /// Trust floor is healthy; the evidence is fresh. + Low, + /// Some of the evidence is weakening. + Medium, + /// The answer rests on memory that is decaying out. + High, +} + +impl DecayRisk { + /// Map the weakest retrieved-trust score to a decay-risk band. + /// + /// Thresholds align with the FSRS "due for review" intuition: above 0.7 the + /// memory is comfortably retrievable, 0.4–0.7 is getting weak, below 0.4 is + /// at risk of being forgotten. + pub fn from_trust_floor(trust_floor: f64) -> Self { + if trust_floor >= 0.7 { + DecayRisk::Low + } else if trust_floor >= 0.4 { + DecayRisk::Medium + } else { + DecayRisk::High + } + } + + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + DecayRisk::Low => "low", + DecayRisk::Medium => "medium", + DecayRisk::High => "high", + } + } +} + +/// A memory mutation that a retrieval triggered, recorded on the receipt so the +/// side effects of "just reading" are never invisible. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ReceiptMutation { + /// The mutated memory id. + pub id: String, + /// What changed: `"strengthened"`, `"reconsolidated"`, `"superseded"`, … + pub kind: String, + /// Optional human note about the change. + #[serde(skip_serializing_if = "Option::is_none")] + pub note: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + fn fixed_now() -> chrono::DateTime { + chrono::Utc.with_ymd_and_hms(2026, 6, 22, 15, 0, 0).unwrap() + } + + #[test] + fn receipt_id_is_human_legible_and_dated() { + let r = Receipt::build( + fixed_now(), + "abc123!!", + vec!["mem_1".into()], + vec![], + vec![], + &[0.9], + vec![], + ); + assert_eq!(r.receipt_id, "r_2026_06_22_abc123"); + } + + #[test] + fn trust_floor_is_the_weakest_link() { + let r = Receipt::build( + fixed_now(), + "x", + vec!["a".into(), "b".into(), "c".into()], + vec![], + vec![], + &[0.91, 0.62, 0.78], + vec![], + ); + assert_eq!(r.trust_floor, 0.62); + assert_eq!(r.decay_risk, DecayRisk::Medium); + } + + #[test] + fn empty_trust_scores_floor_to_zero_high_risk() { + let r = Receipt::build(fixed_now(), "x", vec![], vec![], vec![], &[], vec![]); + assert_eq!(r.trust_floor, 0.0); + assert_eq!(r.decay_risk, DecayRisk::High); + } + + #[test] + fn decay_bands() { + assert_eq!(DecayRisk::from_trust_floor(0.95), DecayRisk::Low); + assert_eq!(DecayRisk::from_trust_floor(0.55), DecayRisk::Medium); + assert_eq!(DecayRisk::from_trust_floor(0.20), DecayRisk::High); + } + + #[test] + fn matches_published_spec_shape() { + let r = Receipt { + receipt_id: "r_2026_06_22_abc".into(), + retrieved: vec!["mem_1".into(), "mem_7".into(), "mem_9".into()], + suppressed: vec![SuppressedReceiptEntry::new( + "mem_4", + SuppressReason::Contradicted, + )], + activation_path: vec!["project_goal -> design_decision -> current_file".into()], + trust_floor: 0.62, + decay_risk: DecayRisk::Medium, + mutations: vec![], + }; + let json = serde_json::to_value(&r).unwrap(); + assert_eq!(json["receipt_id"], "r_2026_06_22_abc"); + assert_eq!(json["suppressed"][0]["reason"], "contradicted"); + assert_eq!(json["decay_risk"], "medium"); + assert_eq!(json["trust_floor"], 0.62); + assert!(json["mutations"].as_array().unwrap().is_empty()); + } +} diff --git a/crates/vestige-core/src/trace/review.rs b/crates/vestige-core/src/trace/review.rs new file mode 100644 index 0000000..49955f5 --- /dev/null +++ b/crates/vestige-core/src/trace/review.rs @@ -0,0 +1,692 @@ +//! # Memory PRs — review changes to an agent's brain like code +//! +//! Ordinary context auto-commits and always leaves a receipt. But a *risky* +//! write — one where the agent is rewriting its own brain — opens a reviewable +//! [`MemoryPr`] instead. [`classify_write`] is the immune system: given a +//! [`WriteContext`] and a [`ReviewMode`], it returns the [`RiskClass`] and the +//! [`RiskSignal`]s that explain, in plain language, *why* a write needs review. +//! +//! ## The three modes (one-click in the dashboard) +//! +//! | Mode | Behaviour | +//! |------|-----------| +//! | [`ReviewMode::Fast`] | Never gate. Every write auto-commits. (Demos, trusted solo flows.) | +//! | [`ReviewMode::RiskGated`] | **Default.** Auto-commit ordinary writes; open a PR for risky ones. | +//! | [`ReviewMode::Paranoid`] | Gate *every* write. Nothing enters the brain without approval. | +//! +//! ## What counts as "risky" (the taxonomy) +//! +//! A write is risky when any of these hold: +//! - it **contradicts a high-trust memory**, +//! - it **supersedes / forgets / merges / protects** existing memory, +//! - it touches **identity, user preference, workflow, or project positioning**, +//! - it asserts a **permission / auth / security / money / bounty / legal-ish** fact, +//! - it is a **dream consolidation** proposal, +//! - it **resurrects a decayed** memory (below the retention threshold), +//! - it is part of a **low-confidence batch import**, +//! - it is an **external connector write without strong provenance**. +//! +//! Each rule maps to a [`RiskSignal`] so the resulting Memory PR is fully +//! self-explaining. + +use serde::{Deserialize, Serialize}; + +use super::WriteSource; + +/// A memory is "high trust" at or above this FSRS retrievability/trust score. +/// Contradicting something this trusted is always worth a review. +pub const HIGH_TRUST_FLOOR: f64 = 0.7; + +/// Writes below this confidence are treated as low-confidence (e.g. a bulk +/// import where the model wasn't sure). +pub const LOW_CONFIDENCE_FLOOR: f64 = 0.5; + +// ============================================================================ +// REVIEW MODE +// ============================================================================ + +/// How aggressively the agent's brain gates incoming writes. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum ReviewMode { + /// Never gate — every write auto-commits. + Fast, + /// Default: auto-commit ordinary writes, open a PR for risky ones. + #[default] + RiskGated, + /// Gate every write — nothing enters the brain without approval. + Paranoid, +} + +impl ReviewMode { + /// Stable string label, also the wire form. + pub fn as_str(&self) -> &'static str { + match self { + ReviewMode::Fast => "fast", + ReviewMode::RiskGated => "risk_gated", + ReviewMode::Paranoid => "paranoid", + } + } + + /// Parse from a label (case-insensitive, tolerant of `-`/`_`). Falls back to + /// the default [`ReviewMode::RiskGated`] on anything unrecognised. + pub fn from_label(s: &str) -> Self { + match s.trim().to_ascii_lowercase().replace('-', "_").as_str() { + "fast" => ReviewMode::Fast, + "paranoid" => ReviewMode::Paranoid, + _ => ReviewMode::RiskGated, + } + } +} + +// ============================================================================ +// RISK CLASSIFICATION +// ============================================================================ + +/// The outcome of [`classify_write`]: does this write auto-commit or open a PR? +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum RiskClass { + /// Ordinary context — auto-commit (a receipt is still generated). + AutoCommit, + /// Risky — open a [`MemoryPr`] for review. + Review, +} + +impl RiskClass { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + RiskClass::AutoCommit => "auto_commit", + RiskClass::Review => "review", + } + } + + /// Whether this write should be held for review. + pub fn needs_review(&self) -> bool { + matches!(self, RiskClass::Review) + } +} + +/// A single, self-explaining reason a write was flagged for review. The +/// `code` is stable for filtering/telemetry; the `detail` is human prose for +/// the PR card. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RiskSignal { + /// Stable machine code, e.g. `"contradicts_high_trust"`. + pub code: String, + /// Plain-language explanation shown on the Memory PR. + pub detail: String, +} + +impl RiskSignal { + fn new(code: &str, detail: impl Into) -> Self { + Self { + code: code.into(), + detail: detail.into(), + } + } +} + +/// Everything [`classify_write`] needs to decide whether a write is risky. +/// +/// All fields default to the "ordinary, safe" interpretation so callers only +/// set the signals that actually apply to their write. +#[derive(Debug, Clone, Default)] +pub struct WriteContext { + /// Who is performing the write. + pub source: Option, + /// The node type being written, e.g. `"fact"`, `"preference"`, `"identity"`. + pub node_type: String, + /// The content (or a representative slice) — scanned for sensitive topics. + pub content: String, + /// Tags attached to the write — also scanned for sensitive topics. + pub tags: Vec, + /// The write contradicts an existing memory whose trust is this high. + /// `None` if there is no contradiction. + pub contradicts_trust: Option, + /// This write supersedes / replaces an existing memory. + pub supersedes: bool, + /// This write forgets / suppresses an existing memory. + pub forgets: bool, + /// This write merges existing memories. + pub merges: bool, + /// This write protects / pins a memory. + pub protects: bool, + /// This write resurrects a memory that had decayed below retention. + pub resurrects_decayed: bool, + /// Confidence of the write (0..1). `None` means "not a batch / unknown". + pub confidence: Option, + /// This write is one of many in a bulk import. + pub batch_import: bool, + /// For connector writes: whether the source envelope carries strong + /// provenance (a verified `source_system` + `source_id` + URL). + pub strong_provenance: bool, +} + +/// Sensitive topic substrings. A write whose content/tags/type mention any of +/// these is treated as touching identity / preference / security / money / +/// legal / workflow / positioning and is routed to review. +const SENSITIVE_TOPICS: &[(&str, &str)] = &[ + // identity & preference + ("identity", "identity fact"), + ("preference", "user preference"), + ("workflow", "workflow rule"), + ("positioning", "project positioning"), + ("persona", "agent persona"), + // permission / auth / security + ("permission", "tool permission"), + ("auth", "authentication / authorization"), + ("token", "credential / token"), + ("secret", "secret material"), + ("password", "credential"), + ("api key", "credential / API key"), + ("security", "security-relevant fact"), + ("vuln", "security vulnerability"), + // money / bounty / legal + ("money", "financial fact"), + ("payment", "financial fact"), + ("invoice", "financial fact"), + ("bounty", "bounty / payout"), + ("salary", "financial fact"), + ("license", "legal / license fact"), + ("legal", "legal-relevant fact"), + ("contract", "legal / contract fact"), +]; + +/// Node types that are intrinsically sensitive regardless of content. +const SENSITIVE_NODE_TYPES: &[&str] = &[ + "identity", + "preference", + "user_preference", + "credential", + "permission", + "security", + "constitution", +]; + +/// Classify a write into auto-commit vs. review, with the signals explaining the +/// decision. +/// +/// This is the immune system. It is pure and deterministic, so the dashboard's +/// "explain this PR" view and the agent's `Ask Agent Why` action see exactly the +/// same reasoning the gate used. +pub fn classify_write(ctx: &WriteContext, mode: ReviewMode) -> (RiskClass, Vec) { + // Mode shortcuts. + match mode { + // Fast never gates — but we still collect signals so the receipt/PR + // record can note what *would* have been flagged. + ReviewMode::Fast => return (RiskClass::AutoCommit, Vec::new()), + ReviewMode::Paranoid => { + let mut signals = collect_signals(ctx); + if signals.is_empty() { + signals.push(RiskSignal::new( + "paranoid_mode", + "Paranoid mode: every write is reviewed before entering memory.", + )); + } + return (RiskClass::Review, signals); + } + ReviewMode::RiskGated => {} + } + + let signals = collect_signals(ctx); + if signals.is_empty() { + (RiskClass::AutoCommit, signals) + } else { + (RiskClass::Review, signals) + } +} + +/// Gather every risk signal that applies to a write, independent of mode. +fn collect_signals(ctx: &WriteContext) -> Vec { + let mut signals = Vec::new(); + + // 1. Contradiction against a high-trust memory. + if let Some(trust) = ctx.contradicts_trust + && trust >= HIGH_TRUST_FLOOR + { + signals.push(RiskSignal::new( + "contradicts_high_trust", + format!( + "Contradicts an existing high-trust memory (trust {:.2} ≥ {:.2}).", + trust, HIGH_TRUST_FLOOR + ), + )); + } + + // 2. Structural rewrites of existing memory. + if ctx.supersedes { + signals.push(RiskSignal::new( + "supersedes_memory", + "Supersedes / replaces an existing memory.", + )); + } + if ctx.forgets { + signals.push(RiskSignal::new( + "forgets_memory", + "Forgets / suppresses an existing memory.", + )); + } + if ctx.merges { + signals.push(RiskSignal::new( + "merges_memory", + "Merges existing memories into one.", + )); + } + if ctx.protects { + signals.push(RiskSignal::new( + "protects_memory", + "Protects / pins a memory against decay and forgetting.", + )); + } + + // 3. Sensitive node types & topics (identity / preference / workflow / + // positioning / permission / auth / security / money / legal). + let node_type_lc = ctx.node_type.to_ascii_lowercase(); + if SENSITIVE_NODE_TYPES.contains(&node_type_lc.as_str()) { + signals.push(RiskSignal::new( + "sensitive_node_type", + format!("Writes a sensitive node type: `{}`.", node_type_lc), + )); + } + if let Some(topic) = first_sensitive_topic(&ctx.content, &ctx.tags) { + signals.push(RiskSignal::new( + "sensitive_topic", + format!("Touches a sensitive topic: {topic}."), + )); + } + + // 4. Dream consolidation proposals. + if matches!(ctx.source, Some(WriteSource::Dream)) { + signals.push(RiskSignal::new( + "dream_consolidation", + "Proposed by dream consolidation — a machine-generated change to memory.", + )); + } + + // 5. Decay-below-threshold resurrection. + if ctx.resurrects_decayed { + signals.push(RiskSignal::new( + "resurrects_decayed", + "Resurrects a memory that had decayed below the retention threshold.", + )); + } + + // 6. Low-confidence batch imports. + if ctx.batch_import { + if let Some(conf) = ctx.confidence { + if conf < LOW_CONFIDENCE_FLOOR { + signals.push(RiskSignal::new( + "low_confidence_batch", + format!( + "Low-confidence batch import (confidence {:.2} < {:.2}).", + conf, LOW_CONFIDENCE_FLOOR + ), + )); + } + } else { + signals.push(RiskSignal::new( + "unscored_batch", + "Batch import with no confidence score.", + )); + } + } + + // 7. External connector writes without strong provenance. + if matches!(ctx.source, Some(WriteSource::Connector)) && !ctx.strong_provenance { + signals.push(RiskSignal::new( + "weak_provenance_connector", + "External connector write without strong provenance (unverified source envelope).", + )); + } + + signals +} + +/// Return the human label of the first sensitive topic found in content/tags. +fn first_sensitive_topic(content: &str, tags: &[String]) -> Option<&'static str> { + let haystack = { + let mut s = content.to_ascii_lowercase(); + for t in tags { + s.push(' '); + s.push_str(&t.to_ascii_lowercase()); + } + s + }; + SENSITIVE_TOPICS + .iter() + .find(|(needle, _)| haystack.contains(needle)) + .map(|(_, label)| *label) +} + +// ============================================================================ +// MEMORY PR DATA MODEL +// ============================================================================ + +/// What kind of change a Memory PR represents. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MemoryPrKind { + /// A brand-new fact entering the brain. + NewFact, + /// An existing fact being strengthened / reinforced. + StrengthenedFact, + /// A contradiction was detected against existing memory. + ContradictionDetected, + /// A memory being superseded by a newer one. + MemorySuperseded, + /// A new edge added to the knowledge graph. + EdgeAdded, + /// A node decayed below the retention threshold. + NodeDecayed, + /// Dream consolidation proposed a merge / insight. + DreamConsolidation, +} + +impl MemoryPrKind { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + MemoryPrKind::NewFact => "new_fact", + MemoryPrKind::StrengthenedFact => "strengthened_fact", + MemoryPrKind::ContradictionDetected => "contradiction_detected", + MemoryPrKind::MemorySuperseded => "memory_superseded", + MemoryPrKind::EdgeAdded => "edge_added", + MemoryPrKind::NodeDecayed => "node_decayed", + MemoryPrKind::DreamConsolidation => "dream_consolidation", + } + } + + /// Parse from a label; `None` if unrecognised. + pub fn from_label(s: &str) -> Option { + Some(match s { + "new_fact" => MemoryPrKind::NewFact, + "strengthened_fact" => MemoryPrKind::StrengthenedFact, + "contradiction_detected" => MemoryPrKind::ContradictionDetected, + "memory_superseded" => MemoryPrKind::MemorySuperseded, + "edge_added" => MemoryPrKind::EdgeAdded, + "node_decayed" => MemoryPrKind::NodeDecayed, + "dream_consolidation" => MemoryPrKind::DreamConsolidation, + _ => return None, + }) + } +} + +/// The review status of a Memory PR. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum MemoryPrStatus { + /// Awaiting a decision. + #[default] + Pending, + /// Promoted into long-term memory as-is. + Promoted, + /// Merged into an existing memory. + Merged, + /// Superseded an existing memory. + Superseded, + /// Quarantined — held in the firewall, not used for retrieval. + Quarantined, + /// Forgotten — rejected and suppressed. + Forgotten, +} + +impl MemoryPrStatus { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + MemoryPrStatus::Pending => "pending", + MemoryPrStatus::Promoted => "promoted", + MemoryPrStatus::Merged => "merged", + MemoryPrStatus::Superseded => "superseded", + MemoryPrStatus::Quarantined => "quarantined", + MemoryPrStatus::Forgotten => "forgotten", + } + } +} + +/// The actions a reviewer can take on a Memory PR (the buttons in the diff UI). +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MemoryPrAction { + /// Accept the change as-is. + Promote, + /// Fold it into an existing memory. + Merge, + /// Use it to supersede an existing memory. + Supersede, + /// Hold it in the firewall. + Quarantine, + /// Reject and suppress it. + Forget, + /// Ask the agent to explain the change (returns the risk signals). + AskAgentWhy, +} + +impl MemoryPrAction { + /// Parse from a URL/path label; `None` if unrecognised. + pub fn from_label(s: &str) -> Option { + Some(match s { + "promote" => MemoryPrAction::Promote, + "merge" => MemoryPrAction::Merge, + "supersede" => MemoryPrAction::Supersede, + "quarantine" => MemoryPrAction::Quarantine, + "forget" => MemoryPrAction::Forget, + "ask_agent_why" | "ask-agent-why" | "why" => MemoryPrAction::AskAgentWhy, + _ => return None, + }) + } + + /// The status this action moves the PR into (`None` for `AskAgentWhy`, which + /// is read-only). + pub fn resulting_status(&self) -> Option { + Some(match self { + MemoryPrAction::Promote => MemoryPrStatus::Promoted, + MemoryPrAction::Merge => MemoryPrStatus::Merged, + MemoryPrAction::Supersede => MemoryPrStatus::Superseded, + MemoryPrAction::Quarantine => MemoryPrStatus::Quarantined, + MemoryPrAction::Forget => MemoryPrStatus::Forgotten, + MemoryPrAction::AskAgentWhy => return None, + }) + } +} + +/// A reviewable change to the agent's brain — the persisted Memory PR record. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct MemoryPr { + /// UUID. + pub id: String, + /// What kind of change this is. + pub kind: MemoryPrKind, + /// Current review status. + pub status: MemoryPrStatus, + /// Short human title for the PR list. + pub title: String, + /// The proposed change as a structured diff (before/after, ids, payload). + pub diff: serde_json::Value, + /// The self-explaining risk signals that opened this PR. + pub signals: Vec, + /// The memory id this PR concerns, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub subject_id: Option, + /// The run that produced this change, linking the PR back to the black box. + #[serde(skip_serializing_if = "Option::is_none")] + pub run_id: Option, + /// RFC3339 creation time. + pub created_at: String, + /// RFC3339 decision time, once decided. + #[serde(skip_serializing_if = "Option::is_none")] + pub decided_at: Option, + /// The action that resolved this PR, once decided. + #[serde(skip_serializing_if = "Option::is_none")] + pub decision: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + fn ordinary() -> WriteContext { + WriteContext { + source: Some(WriteSource::Agent), + node_type: "fact".into(), + content: "The build uses cargo and pnpm.".into(), + tags: vec!["build".into()], + ..Default::default() + } + } + + #[test] + fn ordinary_write_auto_commits_in_risk_gated() { + let (class, signals) = classify_write(&ordinary(), ReviewMode::RiskGated); + assert_eq!(class, RiskClass::AutoCommit); + assert!(signals.is_empty()); + } + + #[test] + fn fast_mode_never_gates_even_risky_writes() { + let mut ctx = ordinary(); + ctx.supersedes = true; + ctx.contradicts_trust = Some(0.95); + let (class, _) = classify_write(&ctx, ReviewMode::Fast); + assert_eq!(class, RiskClass::AutoCommit); + } + + #[test] + fn paranoid_mode_gates_even_ordinary_writes() { + let (class, signals) = classify_write(&ordinary(), ReviewMode::Paranoid); + assert_eq!(class, RiskClass::Review); + assert_eq!(signals[0].code, "paranoid_mode"); + } + + #[test] + fn contradiction_against_high_trust_is_risky() { + let mut ctx = ordinary(); + ctx.contradicts_trust = Some(0.82); + let (class, signals) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + assert!(signals.iter().any(|s| s.code == "contradicts_high_trust")); + } + + #[test] + fn contradiction_against_low_trust_is_fine() { + let mut ctx = ordinary(); + ctx.contradicts_trust = Some(0.3); + let (class, _) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::AutoCommit); + } + + #[test] + fn supersede_forget_merge_protect_all_gate() { + for set in [ + |c: &mut WriteContext| c.supersedes = true, + |c: &mut WriteContext| c.forgets = true, + |c: &mut WriteContext| c.merges = true, + |c: &mut WriteContext| c.protects = true, + ] { + let mut ctx = ordinary(); + set(&mut ctx); + let (class, _) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + } + } + + #[test] + fn sensitive_topics_gate() { + for topic in [ + "remember my auth token is xyz", + "Sam's salary is confidential", + "the bounty payout terms", + "user preference: dark mode", + "this is a security vulnerability", + ] { + let mut ctx = ordinary(); + ctx.content = topic.into(); + let (class, signals) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review, "should gate: {topic}"); + assert!(signals.iter().any(|s| s.code == "sensitive_topic")); + } + } + + #[test] + fn sensitive_node_type_gates() { + let mut ctx = ordinary(); + ctx.node_type = "identity".into(); + let (class, signals) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + assert!(signals.iter().any(|s| s.code == "sensitive_node_type")); + } + + #[test] + fn dream_consolidation_gates() { + let mut ctx = ordinary(); + ctx.source = Some(WriteSource::Dream); + let (class, signals) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + assert!(signals.iter().any(|s| s.code == "dream_consolidation")); + } + + #[test] + fn decayed_resurrection_gates() { + let mut ctx = ordinary(); + ctx.resurrects_decayed = true; + let (class, _) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + } + + #[test] + fn low_confidence_batch_gates_but_confident_batch_does_not() { + let mut low = ordinary(); + low.batch_import = true; + low.confidence = Some(0.3); + assert_eq!( + classify_write(&low, ReviewMode::RiskGated).0, + RiskClass::Review + ); + + let mut high = ordinary(); + high.batch_import = true; + high.confidence = Some(0.9); + assert_eq!( + classify_write(&high, ReviewMode::RiskGated).0, + RiskClass::AutoCommit + ); + } + + #[test] + fn weak_provenance_connector_gates_strong_does_not() { + let mut weak = ordinary(); + weak.source = Some(WriteSource::Connector); + weak.strong_provenance = false; + assert_eq!( + classify_write(&weak, ReviewMode::RiskGated).0, + RiskClass::Review + ); + + let mut strong = ordinary(); + strong.source = Some(WriteSource::Connector); + strong.strong_provenance = true; + assert_eq!( + classify_write(&strong, ReviewMode::RiskGated).0, + RiskClass::AutoCommit + ); + } + + #[test] + fn mode_label_roundtrip() { + assert_eq!(ReviewMode::from_label("FAST"), ReviewMode::Fast); + assert_eq!(ReviewMode::from_label("risk-gated"), ReviewMode::RiskGated); + assert_eq!(ReviewMode::from_label("paranoid"), ReviewMode::Paranoid); + assert_eq!(ReviewMode::from_label("garbage"), ReviewMode::RiskGated); + } + + #[test] + fn action_resulting_status() { + assert_eq!( + MemoryPrAction::Promote.resulting_status(), + Some(MemoryPrStatus::Promoted) + ); + assert_eq!(MemoryPrAction::AskAgentWhy.resulting_status(), None); + } +} diff --git a/crates/vestige-mcp/src/autopilot.rs b/crates/vestige-mcp/src/autopilot.rs index 2db04a8..3e355fb 100644 --- a/crates/vestige-mcp/src/autopilot.rs +++ b/crates/vestige-mcp/src/autopilot.rs @@ -443,7 +443,10 @@ async fn handle_event( | VestigeEvent::ConsolidationCompleted { .. } | VestigeEvent::RetentionDecayed { .. } | VestigeEvent::ConnectionDiscovered { .. } - | VestigeEvent::ActivationSpread { .. } => {} + | VestigeEvent::ActivationSpread { .. } + | VestigeEvent::TraceEvent { .. } + | VestigeEvent::MemoryPrOpened { .. } + | VestigeEvent::MemoryPrDecided { .. } => {} } } diff --git a/crates/vestige-mcp/src/dashboard/events.rs b/crates/vestige-mcp/src/dashboard/events.rs index 8edb238..4d5c8d4 100644 --- a/crates/vestige-mcp/src/dashboard/events.rs +++ b/crates/vestige-mcp/src/dashboard/events.rs @@ -167,6 +167,39 @@ pub enum VestigeEvent { timestamp: DateTime, }, + // -- Agent Black Box (v2.2) -- + // One replayable trace event from an agent run. The dashboard Black Box tab + // appends these to the live timeline and pulses the graph exactly as the + // agent experienced it. The inner event is the canonical + // `vestige_core::MemoryTraceEvent`, serialized with its own `type` tag, so + // the wire shape is `{ "type": "TraceEvent", "data": { "runId": ..., "event": { "type": "mcp.call", ... } } }`. + TraceEvent { + run_id: String, + seq: i64, + event: vestige_core::MemoryTraceEvent, + timestamp: DateTime, + }, + + // -- Memory PRs (v2.2) — the cognitive immune system -- + // A risky write opened a Memory PR. The dashboard raises the PR-queue badge + // and can surface a toast: "Vestige opened a Memory PR — the agent tried to + // rewrite its own brain." + MemoryPrOpened { + id: String, + kind: String, + title: String, + signal_count: usize, + run_id: Option, + timestamp: DateTime, + }, + // A Memory PR was decided (promote / merge / supersede / quarantine / forget). + MemoryPrDecided { + id: String, + decision: String, + status: String, + timestamp: DateTime, + }, + // -- System -- Heartbeat { uptime_secs: u64, diff --git a/crates/vestige-mcp/src/dashboard/handlers.rs b/crates/vestige-mcp/src/dashboard/handlers.rs index 39f80ff..0d6af3c 100644 --- a/crates/vestige-mcp/src/dashboard/handlers.rs +++ b/crates/vestige-mcp/src/dashboard/handlers.rs @@ -1983,6 +1983,288 @@ pub async fn deep_reference_query( Ok(Json(response)) } +// ============================================================================ +// AGENT BLACK BOX (v2.2) — replayable agent-run traces +// ============================================================================ + +#[derive(Debug, Deserialize)] +pub struct TraceListParams { + pub limit: Option, +} + +/// List recent agent runs (newest activity first) for the Black Box run picker. +pub async fn list_traces( + State(state): State, + Query(params): Query, +) -> Result, StatusCode> { + let limit = params.limit.unwrap_or(50).clamp(1, 500); + let runs = state + .storage + .list_agent_runs(limit) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let runs_json: Vec = runs + .into_iter() + .map(|r| { + serde_json::json!({ + "runId": r.run_id, + "firstTool": r.first_tool, + "eventCount": r.event_count, + "retrievedCount": r.retrieved_count, + "suppressedCount": r.suppressed_count, + "writeCount": r.write_count, + "vetoCount": r.veto_count, + "startedAt": r.started_at, + "lastAt": r.last_at, + }) + }) + .collect(); + Ok(Json(serde_json::json!({ + "total": runs_json.len(), + "runs": runs_json, + }))) +} + +/// Fetch the full event timeline for one run — the black-box replay payload. +pub async fn get_trace( + State(state): State, + Path(run_id): Path, +) -> Result, StatusCode> { + let events = state + .storage + .get_trace(&run_id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + if events.is_empty() { + return Err(StatusCode::NOT_FOUND); + } + let summary = state.storage.get_agent_run(&run_id).ok().flatten(); + Ok(Json(serde_json::json!({ + "runId": run_id, + "summary": summary.map(|s| serde_json::json!({ + "firstTool": s.first_tool, + "eventCount": s.event_count, + "retrievedCount": s.retrieved_count, + "suppressedCount": s.suppressed_count, + "writeCount": s.write_count, + "vetoCount": s.veto_count, + "startedAt": s.started_at, + "lastAt": s.last_at, + })), + "events": events, + }))) +} + +/// Export a run as a downloadable `.vestige-trace.json` artifact. +pub async fn export_trace( + State(state): State, + Path(run_id): Path, +) -> Result<([(axum::http::HeaderName, String); 2], Json), StatusCode> { + let events = state + .storage + .get_trace(&run_id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + if events.is_empty() { + return Err(StatusCode::NOT_FOUND); + } + let summary = state.storage.get_agent_run(&run_id).ok().flatten(); + let body = serde_json::json!({ + "format": "vestige-trace", + "version": 1, + "runId": run_id, + "exportedAt": Utc::now().to_rfc3339(), + "summary": summary.map(|s| serde_json::json!({ + "firstTool": s.first_tool, + "eventCount": s.event_count, + "retrievedCount": s.retrieved_count, + "suppressedCount": s.suppressed_count, + "writeCount": s.write_count, + "vetoCount": s.veto_count, + "startedAt": s.started_at, + "lastAt": s.last_at, + })), + "events": events, + }); + let headers = [ + ( + axum::http::header::CONTENT_TYPE, + "application/json".to_string(), + ), + ( + axum::http::header::CONTENT_DISPOSITION, + format!("attachment; filename=\"{run_id}.vestige-trace.json\""), + ), + ]; + Ok((headers, Json(body))) +} + +// ============================================================================ +// MEMORY RECEIPTS (v2.2) +// ============================================================================ + +/// List recent retrieval receipts. +pub async fn list_receipts( + State(state): State, + Query(params): Query, +) -> Result, StatusCode> { + let limit = params.limit.unwrap_or(50).clamp(1, 500); + let receipts = state + .storage + .list_receipts(limit) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Json(serde_json::json!({ + "total": receipts.len(), + "receipts": receipts, + }))) +} + +/// Fetch one receipt by id — the payload behind "Open receipt in Cinema". +pub async fn get_receipt( + State(state): State, + Path(receipt_id): Path, +) -> Result, StatusCode> { + let receipt = state + .storage + .get_receipt(&receipt_id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + Ok(Json(serde_json::to_value(receipt).unwrap_or_default())) +} + +// ============================================================================ +// MEMORY PRs (v2.2) — risk-gated brain-change review queue +// ============================================================================ + +#[derive(Debug, Deserialize)] +pub struct MemoryPrListParams { + pub status: Option, + pub limit: Option, +} + +/// List Memory PRs, optionally filtered by status. +pub async fn list_memory_prs( + State(state): State, + Query(params): Query, +) -> Result, StatusCode> { + let limit = params.limit.unwrap_or(100).clamp(1, 500); + let status = params.status.as_deref().and_then(|s| { + serde_json::from_value::(serde_json::Value::String( + s.to_string(), + )) + .ok() + }); + let prs = state + .storage + .list_memory_prs(status, limit) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let pending = state.storage.count_pending_memory_prs().unwrap_or(0); + Ok(Json(serde_json::json!({ + "total": prs.len(), + "pendingCount": pending, + "mode": read_review_mode(&state).as_str(), + "prs": prs, + }))) +} + +/// Fetch one Memory PR by id. +pub async fn get_memory_pr( + State(state): State, + Path(id): Path, +) -> Result, StatusCode> { + let pr = state + .storage + .get_memory_pr(&id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + Ok(Json(serde_json::to_value(pr).unwrap_or_default())) +} + +/// Act on a Memory PR: promote / merge / supersede / quarantine / forget / +/// ask_agent_why. `ask_agent_why` is read-only and returns the risk signals. +pub async fn act_on_memory_pr( + State(state): State, + Path((id, action)): Path<(String, String)>, +) -> Result, StatusCode> { + let action = vestige_core::MemoryPrAction::from_label(&action) + .ok_or(StatusCode::BAD_REQUEST)?; + + // Ask Agent Why is read-only — return the self-explaining signals. + if matches!(action, vestige_core::MemoryPrAction::AskAgentWhy) { + let pr = state + .storage + .get_memory_pr(&id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + return Ok(Json(serde_json::json!({ + "id": pr.id, + "kind": pr.kind.as_str(), + "title": pr.title, + "why": pr.signals, + "explanation": "These are the risk signals that opened this Memory PR.", + }))); + } + + let decided = state + .storage + .decide_memory_pr(&id, action) + .map_err(|_| StatusCode::NOT_FOUND)?; + + state.emit(VestigeEvent::MemoryPrDecided { + id: decided.id.clone(), + decision: decided + .decision + .and_then(|d| serde_json::to_value(d).ok()) + .and_then(|v| v.as_str().map(String::from)) + .unwrap_or_default(), + status: decided.status.as_str().to_string(), + timestamp: Utc::now(), + }); + + Ok(Json(serde_json::to_value(&decided).unwrap_or_default())) +} + +#[derive(Debug, Deserialize)] +pub struct ReviewModeBody { + pub mode: String, +} + +/// Get the current review mode (fast / risk_gated / paranoid). +pub async fn get_review_mode(State(state): State) -> Json { + let mode = read_review_mode(&state); + Json(serde_json::json!({ + "mode": mode.as_str(), + "pendingCount": state.storage.count_pending_memory_prs().unwrap_or(0), + })) +} + +/// Set the review mode. Persisted to a small JSON file in the data dir so it +/// survives restarts (local-first, no extra config service). +pub async fn set_review_mode( + State(state): State, + Json(body): Json, +) -> Result, StatusCode> { + let mode = vestige_core::ReviewMode::from_label(&body.mode); + let path = review_mode_path(&state); + let payload = serde_json::json!({ "mode": mode.as_str() }); + fs::write(&path, serde_json::to_vec_pretty(&payload).unwrap_or_default()) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Json(serde_json::json!({ "mode": mode.as_str() }))) +} + +/// Path to the persisted review-mode file. +fn review_mode_path(state: &AppState) -> PathBuf { + state.storage.data_dir().join("review_mode.json") +} + +/// Read the persisted review mode, defaulting to RiskGated. +pub fn read_review_mode(state: &AppState) -> vestige_core::ReviewMode { + let path = review_mode_path(state); + fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| v.get("mode").and_then(|m| m.as_str()).map(String::from)) + .map(|s| vestige_core::ReviewMode::from_label(&s)) + .unwrap_or_default() +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/vestige-mcp/src/dashboard/mod.rs b/crates/vestige-mcp/src/dashboard/mod.rs index e6336cb..55c9662 100644 --- a/crates/vestige-mcp/src/dashboard/mod.rs +++ b/crates/vestige-mcp/src/dashboard/mod.rs @@ -183,6 +183,28 @@ fn build_router_inner(state: AppState, port: u16) -> (Router, AppState) { get(handlers::get_sanhedrin_telemetry), ) .route("/api/sanhedrin/appeal", post(handlers::appeal_sanhedrin)) + // ============================================================ + // AGENT BLACK BOX (v2.2) — replayable agent-run traces + // ============================================================ + .route("/api/traces", get(handlers::list_traces)) + .route("/api/traces/{run_id}", get(handlers::get_trace)) + .route("/api/traces/{run_id}/export", get(handlers::export_trace)) + // ============================================================ + // MEMORY RECEIPTS (v2.2) — the nutrition label for a retrieval + // ============================================================ + .route("/api/receipts", get(handlers::list_receipts)) + .route("/api/receipts/{receipt_id}", get(handlers::get_receipt)) + // ============================================================ + // MEMORY PRs (v2.2) — risk-gated brain-change review queue + // ============================================================ + .route("/api/memory-prs", get(handlers::list_memory_prs)) + .route("/api/memory-prs/{id}", get(handlers::get_memory_pr)) + .route( + "/api/memory-prs/{id}/{action}", + post(handlers::act_on_memory_pr), + ) + .route("/api/memory-prs/mode", get(handlers::get_review_mode)) + .route("/api/memory-prs/mode", post(handlers::set_review_mode)) .layer( ServiceBuilder::new() .concurrency_limit(50) diff --git a/crates/vestige-mcp/src/lib.rs b/crates/vestige-mcp/src/lib.rs index 8784409..1df4fc5 100644 --- a/crates/vestige-mcp/src/lib.rs +++ b/crates/vestige-mcp/src/lib.rs @@ -9,3 +9,4 @@ pub mod protocol; pub mod resources; pub mod server; pub mod tools; +pub mod trace_recorder; diff --git a/crates/vestige-mcp/src/resources/mod.rs b/crates/vestige-mcp/src/resources/mod.rs index e021c06..63b728f 100644 --- a/crates/vestige-mcp/src/resources/mod.rs +++ b/crates/vestige-mcp/src/resources/mod.rs @@ -4,3 +4,4 @@ pub mod codebase; pub mod memory; +pub mod trace; diff --git a/crates/vestige-mcp/src/resources/trace.rs b/crates/vestige-mcp/src/resources/trace.rs new file mode 100644 index 0000000..e6f0e35 --- /dev/null +++ b/crates/vestige-mcp/src/resources/trace.rs @@ -0,0 +1,103 @@ +//! Agent Black Box Resources +//! +//! `trace://` URI scheme — exposes replayable agent-run traces as MCP resources +//! so a coding agent can read its *own* black box back. This closes the trace +//! correlation spine on the MCP side: the same `runId` an agent received in a +//! tool result's `traceUri` resolves here to the full event timeline. +//! +//! - `trace://{runId}` — the full ordered event log for a run. +//! - `trace://{runId}/summary` — just the roll-up counts. +//! - `trace://runs` — recent runs (the run picker). +//! - `trace://latest` — the most recently active run's full trace. + +use std::sync::Arc; + +use vestige_core::Storage; + +/// Read a `trace://` resource. +pub async fn read(storage: &Arc, uri: &str) -> Result { + let path = uri.strip_prefix("trace://").unwrap_or(""); + let (path, _query) = match path.split_once('?') { + Some((p, q)) => (p, Some(q)), + None => (path, None), + }; + + match path { + "" | "runs" => read_runs(storage).await, + "latest" => read_latest(storage).await, + other => { + if let Some(run_id) = other.strip_suffix("/summary") { + read_summary(storage, run_id).await + } else { + read_run(storage, other).await + } + } + } +} + +async fn read_runs(storage: &Arc) -> Result { + let runs = storage.list_agent_runs(50).map_err(|e| e.to_string())?; + let json: Vec<_> = runs + .into_iter() + .map(|r| { + serde_json::json!({ + "runId": r.run_id, + "firstTool": r.first_tool, + "eventCount": r.event_count, + "retrievedCount": r.retrieved_count, + "suppressedCount": r.suppressed_count, + "writeCount": r.write_count, + "vetoCount": r.veto_count, + "startedAt": r.started_at, + "lastAt": r.last_at, + }) + }) + .collect(); + serde_json::to_string_pretty(&serde_json::json!({ "runs": json })) + .map_err(|e| e.to_string()) +} + +async fn read_latest(storage: &Arc) -> Result { + let runs = storage.list_agent_runs(1).map_err(|e| e.to_string())?; + let run = runs + .into_iter() + .next() + .ok_or_else(|| "No agent runs recorded yet".to_string())?; + read_run(storage, &run.run_id).await +} + +async fn read_run(storage: &Arc, run_id: &str) -> Result { + let events = storage.get_trace(run_id).map_err(|e| e.to_string())?; + if events.is_empty() { + return Err(format!("No trace found for run: {run_id}")); + } + let summary = storage.get_agent_run(run_id).ok().flatten(); + let body = serde_json::json!({ + "runId": run_id, + "summary": summary.map(summary_json), + "events": events, + }); + serde_json::to_string_pretty(&body).map_err(|e| e.to_string()) +} + +async fn read_summary(storage: &Arc, run_id: &str) -> Result { + let summary = storage + .get_agent_run(run_id) + .map_err(|e| e.to_string())? + .ok_or_else(|| format!("No run: {run_id}"))?; + serde_json::to_string_pretty(&summary_json(summary)).map_err(|e| e.to_string()) +} + +fn summary_json(s: vestige_core::AgentRunSummary) -> serde_json::Value { + serde_json::json!({ + "runId": s.run_id, + "firstTool": s.first_tool, + "eventCount": s.event_count, + "retrievedCount": s.retrieved_count, + "suppressedCount": s.suppressed_count, + "writeCount": s.write_count, + "vetoCount": s.veto_count, + "startedAt": s.started_at, + "lastAt": s.last_at, + }) +} diff --git a/crates/vestige-mcp/src/server.rs b/crates/vestige-mcp/src/server.rs index 6b919fa..d6ca897 100644 --- a/crates/vestige-mcp/src/server.rs +++ b/crates/vestige-mcp/src/server.rs @@ -129,6 +129,23 @@ impl McpServer { } } + /// Read the active Memory PR review mode from `/review_mode.json`, + /// defaulting to `RiskGated`. Shared shape with the dashboard handler so the + /// MCP write path and the UI agree on the mode. + fn review_mode(&self) -> vestige_core::ReviewMode { + let path = self.storage.data_dir().join("review_mode.json"); + std::fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| { + v.get("mode") + .and_then(|m| m.as_str()) + .map(|s| s.to_string()) + }) + .map(|s| vestige_core::ReviewMode::from_label(&s)) + .unwrap_or_default() + } + /// Handle an incoming JSON-RPC request pub async fn handle_request(&mut self, request: JsonRpcRequest) -> Option { debug!("Handling request: {}", request.method); @@ -240,8 +257,8 @@ impl McpServer { /// Handle tools/list request async fn handle_tools_list(&self) -> Result { - // v2.1.21: 25 tools (verified by the `tools.len() == 25` assertion in the - // handle_tools_list test below — the `suppress` tool landed in v2.0.5). + // v2.1.27: 34 tools (verified by the `tools.len() == 34` assertion in the + // handle_tools_list test below). // Deprecated tools still work via redirects in handle_tools_call. let mut tools = vec![ // ================================================================ @@ -503,7 +520,7 @@ impl McpServer { // Per-tool caps below are sized at ~2× observed peak with growth // headroom; max permitted by Anthropic is 500_000. Only the four // empirically-measured high-payload tools carry the annotation today; - // the remaining 21 tools deliberately do NOT (cargo-cult prevention — + // the remaining 30 tools deliberately do NOT (cargo-cult prevention — // annotating a small-payload tool dilutes the signal). // // Other tools that COULD plausibly grow into the annotated set with @@ -563,6 +580,21 @@ impl McpServer { None }; + // ================================================================ + // AGENT BLACK BOX (v2.2) + // Open/continue a run for this call and record the opening `mcp.call` + // event (args are hashed, never stored raw). Downstream memory events + // are recorded from the result after dispatch. + // ================================================================ + let run_id = crate::trace_recorder::run_id_for(&request.arguments); + crate::trace_recorder::record_call( + &self.storage, + self.event_tx.as_ref(), + &run_id, + &request.name, + &request.arguments, + ); + let result = match request.name.as_str() { // ================================================================ // UNIFIED TOOLS (v1.1+) - Preferred API @@ -1083,10 +1115,77 @@ impl McpServer { // ================================================================ if let Ok(ref content) = result { self.emit_tool_event(&request.name, &saved_args, content); + // Black Box: record the downstream memory events (retrieve / + // suppress / write / veto / dream) the agent experienced. + crate::trace_recorder::record_result( + &self.storage, + self.event_tx.as_ref(), + &run_id, + &request.name, + content, + ); } + // ================================================================ + // RISK-GATED MEMORY PRs (v2.2) — the cognitive immune system + // Normal writes auto-land; risky writes (contradiction vs high-trust, + // supersede/forget/merge, sensitive topics, …) are quarantined and a + // Memory PR is opened. Computed here so the gate stays centralized and + // tools remain untouched. + // ================================================================ + let opened_prs = if let Ok(ref content) = result { + crate::trace_recorder::gate_writes( + &self.storage, + self.event_tx.as_ref(), + &run_id, + &request.name, + content, + self.review_mode(), + ) + } else { + Vec::new() + }; + let response = match result { - Ok(content) => { + Ok(mut content) => { + // ============================================================ + // TRACE SPINE (Phase 0) + // Stamp the runId + a pointer to the full trace onto the tool + // output itself. This is the first hop of the correlation + // chain: the same runId now appears in the tool result, the + // SQLite trace rows, the WebSocket events, /api/traces/{runId}, + // and vestige://trace/{runId}. One id, end to end. + // ============================================================ + // Memory Receipt: for retrieval tools, build + persist a + // receipt from what the tool already computed and attach it. + // Done before the runId stamp so the receipt's own suppressed + // list is part of the same payload the agent reads. + let receipt = + crate::trace_recorder::build_and_save_receipt(&self.storage, &run_id, &request.name, &content); + if let Some(obj) = content.as_object_mut() { + obj.insert("runId".to_string(), serde_json::json!(run_id)); + obj.insert( + "traceUri".to_string(), + serde_json::json!(format!("vestige://trace/{run_id}")), + ); + if let Some(r) = receipt { + obj.insert("receipt".to_string(), r); + } + // Surface opened Memory PRs so the agent learns its risky + // write is held for review, not silently committed. + if !opened_prs.is_empty() { + obj.insert( + "memoryPrsOpened".to_string(), + serde_json::json!(opened_prs), + ); + obj.insert( + "memoryPrNotice".to_string(), + serde_json::json!( + "Vestige opened a Memory PR — this write touches the agent's own brain and is held for review. See the Memory PRs queue." + ), + ); + } + } let call_result = CallToolResult { content: vec![crate::protocol::messages::ToolResultContent { content_type: "text".to_string(), @@ -1228,6 +1327,27 @@ impl McpServer { description: Some("Intentions that have been triggered or are overdue".to_string()), mime_type: Some("application/json".to_string()), }, + // Agent Black Box (v2.2) — replayable agent-run traces. Individual + // runs are read via the templated `vestige://trace/{runId}` (or + // `trace://{runId}`) URI; these concrete entries list the runs and + // the latest trace so a client can discover them. + ResourceDescription { + uri: "trace://runs".to_string(), + name: "Agent Runs (Black Box)".to_string(), + description: Some( + "Recent agent runs. Read vestige://trace/{runId} for a full replayable trace." + .to_string(), + ), + mime_type: Some("application/json".to_string()), + }, + ResourceDescription { + uri: "trace://latest".to_string(), + name: "Latest Agent Trace".to_string(), + description: Some( + "The most recently active agent run's full black-box trace.".to_string(), + ), + mime_type: Some("application/json".to_string()), + }, ]; let result = ListResourcesResult { resources }; @@ -1250,7 +1370,17 @@ impl McpServer { // OpenCode and other MCP clients may send "vestige/memory://recent" // but we register resources as "memory://recent" let normalized_uri = uri.strip_prefix("vestige/").unwrap_or(uri); - let content = if normalized_uri.starts_with("memory://") { + // The trace resource is specced as `vestige://trace/{runId}`. Accept + // both that form and the bare `trace://{runId}` scheme, normalizing the + // former to the latter so the resource module sees one shape. + let trace_uri = normalized_uri + .strip_prefix("vestige://trace/") + .map(|rest| format!("trace://{rest}")); + let content = if let Some(ref tu) = trace_uri { + resources::trace::read(&self.storage, tu).await + } else if normalized_uri.starts_with("trace://") { + resources::trace::read(&self.storage, normalized_uri).await + } else if normalized_uri.starts_with("memory://") { resources::memory::read(&self.storage, normalized_uri).await } else if normalized_uri.starts_with("codebase://") { resources::codebase::read(&self.storage, normalized_uri).await @@ -1820,9 +1950,9 @@ mod tests { let result = response.result.unwrap(); let tools = result["tools"].as_array().unwrap(); - // 34 tools: 25 from v2.1.21 + 7 Phase 3 merge/supersede tools - // (merge_candidates, plan_merge, plan_supersede, apply_plan, merge_undo, - // protect, merge_policy, composed_graph) + 1 connector tool (source_sync, #57). + // 34 tools in v2.1.27: the unified memory surface, Phase 3 + // merge/supersede controls, ComposedGraph, and the #57 source_sync + // connector tool. assert_eq!(tools.len(), 34, "Expected exactly 34 tools"); let tool_names: Vec<&str> = tools.iter().map(|t| t["name"].as_str().unwrap()).collect(); @@ -2248,4 +2378,133 @@ mod tests { "search tool has un-renamed `meta` key (regression — serde rename broke)" ); } + + // ======================================================================== + // TRACE SPINE (Phase 0) — one runId, end to end + // ======================================================================== + + /// Every tools/call must stamp a runId + a trace pointer onto its output, + /// persist an `mcp.call` trace row under that same runId, and that runId + /// must resolve through the `vestige://trace/{runId}` resource. This is the + /// load-bearing correlation guarantee. + #[tokio::test] + async fn test_trace_spine_runid_end_to_end() { + let (mut server, _dir) = test_server().await; + server + .handle_request(make_request("initialize", Some(init_params()))) + .await; + + // A client-supplied runId must be honoured so a whole session + // correlates under one id. + let call = make_request( + "tools/call", + Some(serde_json::json!({ + "name": "memory_health", + "arguments": { "runId": "run_spine_test" } + })), + ); + let response = server.handle_request(call).await.unwrap(); + let result = response.result.expect("tools/call ok"); + + // 1. The tool output itself carries the runId + trace pointer. + let structured = &result["structuredContent"]; + assert_eq!( + structured["runId"].as_str(), + Some("run_spine_test"), + "tool output must echo the runId (spine hop 1)" + ); + assert_eq!( + structured["traceUri"].as_str(), + Some("vestige://trace/run_spine_test"), + "tool output must carry the trace resource pointer" + ); + + // 2. The same runId persisted a trace row (the mcp.call event). + let events = server.storage.get_trace("run_spine_test").unwrap(); + assert!( + events.iter().any(|e| e.kind() == "mcp.call"), + "an mcp.call event must be persisted under the runId (spine hop 2)" + ); + + // 3. The run roll-up exists with the right entry tool. + let run = server + .storage + .get_agent_run("run_spine_test") + .unwrap() + .expect("run summary persisted"); + assert_eq!(run.first_tool.as_deref(), Some("memory_health")); + + // 4. The MCP resource resolves the same runId (spine hop 3). + let read = make_request( + "resources/read", + Some(serde_json::json!({ "uri": "vestige://trace/run_spine_test" })), + ); + let read_resp = server.handle_request(read).await.unwrap(); + let read_result = read_resp.result.expect("resource read ok"); + let text = read_result["contents"][0]["text"] + .as_str() + .expect("resource text"); + assert!( + text.contains("run_spine_test") && text.contains("mcp.call"), + "vestige://trace/{{runId}} must return the run's events" + ); + } + + /// Trace events must be broadcast to a live WebSocket subscriber, not just + /// persisted. This guards the spine hop from SQLite → WebSocket → pulse. + #[tokio::test] + async fn test_trace_event_is_broadcast_to_subscriber() { + let (storage, _dir) = test_storage().await; + let cognitive = Arc::new(Mutex::new(CognitiveEngine::new())); + let (event_tx, mut event_rx) = broadcast::channel(64); + let mut server = McpServer::new_with_events(storage, cognitive, event_tx); + server + .handle_request(make_request("initialize", Some(init_params()))) + .await; + + let call = make_request( + "tools/call", + Some(serde_json::json!({ + "name": "memory_health", + "arguments": { "runId": "run_ws" } + })), + ); + server.handle_request(call).await.unwrap(); + + // Drain the broadcast: at least one TraceEvent for run_ws must arrive. + let mut saw_trace = false; + while let Ok(ev) = event_rx.try_recv() { + if let VestigeEvent::TraceEvent { run_id, .. } = ev { + if run_id == "run_ws" { + saw_trace = true; + } + } + } + assert!( + saw_trace, + "a TraceEvent for the run must be broadcast to subscribers (spine hop: WebSocket)" + ); + } + + /// Risk-gated Memory PRs default: an ordinary tool call opens no PR. + #[tokio::test] + async fn test_no_memory_pr_for_non_write_tool() { + let (mut server, _dir) = test_server().await; + server + .handle_request(make_request("initialize", Some(init_params()))) + .await; + let call = make_request( + "tools/call", + Some(serde_json::json!({ + "name": "memory_health", + "arguments": { "runId": "run_no_pr" } + })), + ); + server.handle_request(call).await.unwrap(); + assert_eq!( + server.storage.count_pending_memory_prs().unwrap(), + 0, + "a read-only tool must never open a Memory PR" + ); + } } diff --git a/crates/vestige-mcp/src/trace_recorder.rs b/crates/vestige-mcp/src/trace_recorder.rs new file mode 100644 index 0000000..f4ee931 --- /dev/null +++ b/crates/vestige-mcp/src/trace_recorder.rs @@ -0,0 +1,704 @@ +//! # Trace Recorder — the live black-box wiring +//! +//! Bridges an MCP `tools/call` to the persisted black box. For each call the +//! recorder: +//! +//! 1. derives a stable `runId` (client-supplied `runId`/`run_id` arg if present, +//! else a fresh `run_` UUID), +//! 2. records an `mcp.call` event with a **hash** of the args (never the raw +//! args, so traces can't leak prompt contents or secrets), +//! 3. after the tool returns, inspects the result JSON and records the +//! downstream events the agent experienced — `memory.retrieve` (with +//! per-id activation), `memory.suppress` (with reason), `sanhedrin.veto`, +//! `dream.patch`, +//! 4. persists every event to `agent_traces` and broadcasts it over the +//! dashboard event channel so the Black Box tab updates live. +//! +//! The recorder is best-effort: a persistence error never fails the tool call. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use chrono::Utc; +use serde_json::Value; +use tokio::sync::broadcast; + +use crate::dashboard::events::VestigeEvent; +use vestige_core::{ + MemoryTraceEvent, Receipt, Storage, SuppressReason, SuppressedReceiptEntry, WriteSource, +}; + +/// Tools that write to memory and are therefore subject to risk-gated review. +fn is_write_tool(tool: &str) -> bool { + matches!( + tool, + "smart_ingest" | "ingest" | "session_checkpoint" | "memory" + ) +} + +/// Risk-gate the writes in a tool result. For each write the tool just made, +/// build a [`vestige_core::WriteContext`], classify it under the active +/// [`vestige_core::ReviewMode`], and — if risky — quarantine the just-written +/// node (suppress it so it is not used for retrieval until reviewed) and open a +/// [`vestige_core::MemoryPr`]. Normal writes are left untouched: they auto-land, +/// and they already got a receipt. +/// +/// Returns the list of opened-PR summaries (id, kind, title, signals) so the +/// caller can annotate the tool response and emit `MemoryPrOpened` events. +pub fn gate_writes( + storage: &Arc, + event_tx: Option<&broadcast::Sender>, + run_id: &str, + tool: &str, + result: &serde_json::Value, + mode: vestige_core::ReviewMode, +) -> Vec { + use vestige_core::{ + classify_write, MemoryPr, MemoryPrKind, MemoryPrStatus, RiskClass, WriteContext, + }; + + if !is_write_tool(tool) { + return Vec::new(); + } + + let mut opened = Vec::new(); + + // Collect each (id, decision) write the tool reported. + let writes = extract_writes(result); + for (id, decision) in writes { + // Pull the just-written node to inspect its real content/type/tags. + let node = match storage.get_node(&id) { + Ok(Some(n)) => n, + _ => continue, + }; + + // A decision of supersede/replace/merge means the write overwrote an + // existing memory — the strongest risk signal. Look up the trust of the + // memory it superseded so the gate can weigh it. + let (supersedes, merges) = match decision.as_str() { + "supersede" | "replace" => (true, false), + "merge" => (false, true), + _ => (false, false), + }; + // If this superseded something, treat the contradiction as against a + // high-trust memory when the *new* node's own retention is high (the + // pipeline only supersedes when confident). This keeps the gate honest + // without a second DB round-trip per write. + let contradicts_trust = if supersedes { + Some(node.retention_strength.max(0.7)) + } else { + None + }; + + let ctx = WriteContext { + source: Some(WriteSource::Agent), + node_type: node.node_type.clone(), + content: node.content.clone(), + tags: node.tags.clone(), + contradicts_trust, + supersedes, + merges, + ..Default::default() + }; + + let (class, signals) = classify_write(&ctx, mode); + if class != RiskClass::Review { + continue; + } + + // Quarantine the just-written node: suppress it so it is held out of + // retrieval until the PR is decided. Best-effort. + let _ = storage.suppress_memory(&id); + + let kind = match decision.as_str() { + "supersede" | "replace" => MemoryPrKind::MemorySuperseded, + "merge" => MemoryPrKind::DreamConsolidation, + _ if contradicts_trust.is_some() => MemoryPrKind::ContradictionDetected, + _ => MemoryPrKind::NewFact, + }; + let title = format!( + "{}: \"{}\"", + pr_kind_phrase(kind), + node.content.chars().take(80).collect::() + ); + let pr = MemoryPr { + id: format!("pr_{}", uuid::Uuid::new_v4().simple()), + kind, + status: MemoryPrStatus::Pending, + title: title.clone(), + diff: serde_json::json!({ + "decision": decision, + "node": { + "id": node.id, + "nodeType": node.node_type, + "content": node.content, + "tags": node.tags, + }, + }), + signals: signals.clone(), + subject_id: Some(id.clone()), + run_id: Some(run_id.to_string()), + created_at: Utc::now().to_rfc3339(), + decided_at: None, + decision: None, + }; + + if let Err(e) = storage.save_memory_pr(&pr) { + tracing::warn!("memory PR save failed: {e}"); + continue; + } + + if let Some(tx) = event_tx { + let _ = tx.send(VestigeEvent::MemoryPrOpened { + id: pr.id.clone(), + kind: kind.as_str().to_string(), + title, + signal_count: signals.len(), + run_id: Some(run_id.to_string()), + timestamp: Utc::now(), + }); + } + + opened.push(serde_json::json!({ + "id": pr.id, + "kind": kind.as_str(), + "title": pr.title, + "signals": signals, + "subjectId": id, + })); + } + + opened +} + +fn pr_kind_phrase(kind: vestige_core::MemoryPrKind) -> &'static str { + use vestige_core::MemoryPrKind::*; + match kind { + NewFact => "New fact pending review", + StrengthenedFact => "Strengthened fact", + ContradictionDetected => "Contradiction with existing memory", + MemorySuperseded => "Supersede existing memory", + EdgeAdded => "New edge", + NodeDecayed => "Decayed node", + DreamConsolidation => "Consolidation proposal", + } +} + +/// Tools whose output warrants a retrieval receipt. +fn is_retrieval_tool(tool: &str) -> bool { + matches!( + tool, + "deep_reference" | "cross_reference" | "search" | "explore_connections" + ) +} + +/// Build a [`Receipt`] from a retrieval tool's response JSON, persist it, and +/// return it as JSON ready to attach to that response. Reuses exactly the data +/// the tool already computed (retrieved ids + trust, suppressed ids + reason, +/// the activation path) — so the receipt is the auditable "nutrition label" for +/// the answer and costs nothing extra to produce. +/// +/// Returns `None` for non-retrieval tools or empty results. Best-effort +/// persistence: a storage error is logged, the receipt is still returned. +pub fn build_and_save_receipt( + storage: &Arc, + run_id: &str, + tool: &str, + result: &serde_json::Value, +) -> Option { + if !is_retrieval_tool(tool) { + return None; + } + + let (retrieved, activation) = extract_retrieved(result); + if retrieved.is_empty() { + return None; + } + let trust_scores: Vec = retrieved + .iter() + .map(|id| activation.get(id).copied().unwrap_or(0.0)) + .collect(); + + let suppressed: Vec = extract_suppressed(result) + .into_iter() + .map(|(id, reason)| SuppressedReceiptEntry::new(id, reason)) + .collect(); + + // The activation path: the run's reasoning chain if present, else a simple + // best-first chain of the retrieved ids. + let activation_path = result + .get("reasoning") + .and_then(|v| v.as_str()) + .map(|s| vec![s.to_string()]) + .unwrap_or_else(|| { + if retrieved.len() > 1 { + vec![retrieved.join(" -> ")] + } else { + Vec::new() + } + }); + + let query = result.get("query").and_then(|v| v.as_str()); + + let receipt = Receipt::build( + Utc::now(), + run_id, + retrieved, + suppressed, + activation_path, + &trust_scores, + Vec::new(), + ); + if let Err(e) = storage.save_receipt(&receipt, Some(run_id), Some(tool), query) { + tracing::warn!("receipt save failed: {e}"); + } + Some(serde_json::to_value(&receipt).unwrap_or(serde_json::Value::Null)) +} + +/// Derive the run id for a tool call. Honours a client-supplied `runId` / +/// `run_id` argument (so an agent can correlate a whole session's calls); +/// otherwise mints a fresh one. +pub fn run_id_for(args: &Option) -> String { + if let Some(a) = args { + for key in ["runId", "run_id"] { + if let Some(s) = a.get(key).and_then(|v| v.as_str()) + && !s.is_empty() + { + return s.to_string(); + } + } + } + format!("run_{}", uuid::Uuid::new_v4().simple()) +} + +/// A 64-bit FNV-1a hex fingerprint of the tool arguments — the +/// privacy-preserving stand-in stored on `mcp.call` events. We only need a +/// stable, collision-resistant-enough identifier for "same args → same hash" +/// in the trace, not a cryptographic digest, so a dependency-free FNV-1a keeps +/// the crate lean. +pub fn hash_args(args: &Option) -> String { + let bytes = match args { + Some(v) => serde_json::to_vec(v).unwrap_or_default(), + None => Vec::new(), + }; + const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325; + const FNV_PRIME: u64 = 0x0000_0100_0000_01b3; + let mut hash = FNV_OFFSET; + for b in &bytes { + hash ^= u64::from(*b); + hash = hash.wrapping_mul(FNV_PRIME); + } + format!("{:016x}", hash) +} + +/// Persist one trace event and broadcast it to the dashboard. Best-effort: +/// storage failures are logged, never propagated. +pub fn record( + storage: &Arc, + event_tx: Option<&broadcast::Sender>, + event: MemoryTraceEvent, +) { + let event = event.with_at(Utc::now().timestamp_millis()); + let seq = match storage.append_trace_event(&event) { + Ok(seq) => seq, + Err(e) => { + tracing::warn!("trace append failed: {e}"); + return; + } + }; + if let Some(tx) = event_tx { + let _ = tx.send(VestigeEvent::TraceEvent { + run_id: event.run_id().to_string(), + seq, + event, + timestamp: Utc::now(), + }); + } +} + +/// Record the opening `mcp.call` event for a tool invocation. +pub fn record_call( + storage: &Arc, + event_tx: Option<&broadcast::Sender>, + run_id: &str, + tool: &str, + args: &Option, +) { + record( + storage, + event_tx, + MemoryTraceEvent::McpCall { + run_id: run_id.to_string(), + tool: tool.to_string(), + args_hash: hash_args(args), + at: 0, + }, + ); +} + +/// Inspect a successful tool result and record the downstream memory events the +/// agent experienced (retrieve / suppress / veto / dream). Tool-output shapes +/// are matched leniently so this stays robust as tools evolve. +pub fn record_result( + storage: &Arc, + event_tx: Option<&broadcast::Sender>, + run_id: &str, + tool: &str, + result: &Value, +) { + // --- memory.retrieve: ids + per-id activation --- + let (ids, activation) = extract_retrieved(result); + if !ids.is_empty() { + record( + storage, + event_tx, + MemoryTraceEvent::MemoryRetrieve { + run_id: run_id.to_string(), + ids, + activation, + at: 0, + }, + ); + } + + // --- memory.suppress: each suppressed id + reason --- + for (id, reason) in extract_suppressed(result) { + record( + storage, + event_tx, + MemoryTraceEvent::MemorySuppress { + run_id: run_id.to_string(), + id, + reason, + at: 0, + }, + ); + } + + // --- memory.write: writes performed by ingest-like tools --- + for (id, decision) in extract_writes(result) { + record( + storage, + event_tx, + MemoryTraceEvent::MemoryWrite { + run_id: run_id.to_string(), + id, + diff: serde_json::json!({ "decision": decision }), + source: WriteSource::Agent, + at: 0, + }, + ); + } + + // --- contradiction.detected: each contradiction pair the agent faced --- + for (ids, winner_id, detail) in extract_contradictions(result) { + record( + storage, + event_tx, + MemoryTraceEvent::ContradictionDetected { + run_id: run_id.to_string(), + ids, + winner_id, + detail, + at: 0, + }, + ); + } + + // --- sanhedrin.veto: a blocked claim --- + if let Some((claim, evidence_ids, confidence)) = extract_veto(result) { + record( + storage, + event_tx, + MemoryTraceEvent::SanhedrinVeto { + run_id: run_id.to_string(), + claim, + evidence_ids, + confidence, + at: 0, + }, + ); + } + + // --- dream.patch: consolidation proposals --- + let proposal_ids = extract_dream_proposals(result, tool); + if !proposal_ids.is_empty() { + record( + storage, + event_tx, + MemoryTraceEvent::DreamPatch { + run_id: run_id.to_string(), + proposal_ids, + at: 0, + }, + ); + } +} + +/// Pull retrieved memory ids + their activation/score from a search-like or +/// deep_reference-like result. +fn extract_retrieved(result: &Value) -> (Vec, BTreeMap) { + let mut ids = Vec::new(); + let mut activation = BTreeMap::new(); + + // search_unified: { results: [{ id, score|activation, ... }] } + if let Some(arr) = result.get("results").and_then(|r| r.as_array()) { + for item in arr { + if let Some(id) = item.get("id").and_then(|v| v.as_str()) { + ids.push(id.to_string()); + let act = item + .get("activation") + .or_else(|| item.get("score")) + .and_then(|v| v.as_f64()); + if let Some(a) = act { + activation.insert(id.to_string(), a); + } + } + } + } + + // deep_reference: { evidence: [{ id, trust, ... }], recommended: { memory_id } } + if ids.is_empty() + && let Some(arr) = result.get("evidence").and_then(|r| r.as_array()) + { + for item in arr { + if let Some(id) = item.get("id").and_then(|v| v.as_str()) { + ids.push(id.to_string()); + if let Some(t) = item.get("trust").and_then(|v| v.as_f64()) { + activation.insert(id.to_string(), t); + } + } + } + } + + (ids, activation) +} + +/// Pull suppressed entries from a result. Recognises both the deep_reference +/// `superseded`/`contradictions` shapes and the explicit receipt `suppressed` +/// list `[{ id, reason }]`. +fn extract_suppressed(result: &Value) -> Vec<(String, SuppressReason)> { + let mut out = Vec::new(); + + if let Some(arr) = result + .get("receipt") + .and_then(|r| r.get("suppressed")) + .and_then(|s| s.as_array()) + { + for item in arr { + if let Some(id) = item.get("id").and_then(|v| v.as_str()) { + let reason = item + .get("reason") + .and_then(|v| v.as_str()) + .map(parse_suppress_reason) + .unwrap_or(SuppressReason::LowTrust); + out.push((id.to_string(), reason)); + } + } + } + + // deep_reference surfaces superseded ids directly. + if let Some(arr) = result.get("superseded").and_then(|s| s.as_array()) { + for item in arr { + let id = item + .get("id") + .and_then(|v| v.as_str()) + .or_else(|| item.as_str()); + if let Some(id) = id { + out.push((id.to_string(), SuppressReason::Contradicted)); + } + } + } + + out +} + +fn parse_suppress_reason(s: &str) -> SuppressReason { + match s { + "low_trust" => SuppressReason::LowTrust, + "decayed" => SuppressReason::Decayed, + "contradicted" => SuppressReason::Contradicted, + "privacy" => SuppressReason::Privacy, + "competition" => SuppressReason::Competition, + _ => SuppressReason::LowTrust, + } +} + +/// Pull writes from an ingest-like result (single `decision`+`nodeId` or a +/// `results` batch). +fn extract_writes(result: &Value) -> Vec<(String, String)> { + let mut out = Vec::new(); + let push = |out: &mut Vec<(String, String)>, item: &Value| { + let decision = item.get("decision").and_then(|v| v.as_str()); + let id = item + .get("nodeId") + .or_else(|| item.get("id")) + .and_then(|v| v.as_str()); + if let (Some(d), Some(id)) = (decision, id) { + out.push((id.to_string(), d.to_string())); + } + }; + push(&mut out, result); + if let Some(arr) = result.get("results").and_then(|r| r.as_array()) { + for item in arr { + push(&mut out, item); + } + } + out +} + +/// Pull contradiction pairs from a deep_reference result. Each entry is +/// `{ stronger: {id, ...}, weaker: {id, ...}, topic_overlap }`; the `stronger` +/// memory is the winner the agent trusted. +fn extract_contradictions(result: &Value) -> Vec<(Vec, Option, String)> { + let mut out = Vec::new(); + let Some(arr) = result.get("contradictions").and_then(|c| c.as_array()) else { + return out; + }; + for item in arr { + let stronger = item + .get("stronger") + .and_then(|s| s.get("id")) + .and_then(|v| v.as_str()); + let weaker = item + .get("weaker") + .and_then(|s| s.get("id")) + .and_then(|v| v.as_str()); + let (Some(s), Some(w)) = (stronger, weaker) else { + continue; + }; + let detail = format!( + "Contradiction: kept {s} over {w}{}", + item.get("topic_overlap") + .and_then(|v| v.as_f64()) + .map(|o| format!(" (topic overlap {:.0}%)", o * 100.0)) + .unwrap_or_default() + ); + out.push(( + vec![s.to_string(), w.to_string()], + Some(s.to_string()), + detail, + )); + } + out +} + +/// Pull a Sanhedrin-style veto, if the result carries one. +fn extract_veto(result: &Value) -> Option<(String, Vec, f64)> { + let veto = result.get("veto").or_else(|| result.get("sanhedrin"))?; + let claim = veto + .get("claim") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if claim.is_empty() { + return None; + } + let evidence_ids = veto + .get("evidenceIds") + .or_else(|| veto.get("evidence_ids")) + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect() + }) + .unwrap_or_default(); + let confidence = veto.get("confidence").and_then(|v| v.as_f64()).unwrap_or(0.0); + Some((claim, evidence_ids, confidence)) +} + +/// Pull dream consolidation proposal ids from a dream tool result. +fn extract_dream_proposals(result: &Value, tool: &str) -> Vec { + if tool != "dream" && tool != "consolidate" { + return Vec::new(); + } + let mut out = Vec::new(); + for key in ["proposalIds", "proposals", "insights", "connections"] { + if let Some(arr) = result.get(key).and_then(|v| v.as_array()) { + for item in arr { + if let Some(id) = item + .get("id") + .and_then(|v| v.as_str()) + .or_else(|| item.as_str()) + { + out.push(id.to_string()); + } + } + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn run_id_honours_client_supplied() { + let args = Some(serde_json::json!({ "runId": "run_session_7" })); + assert_eq!(run_id_for(&args), "run_session_7"); + } + + #[test] + fn run_id_mints_when_absent() { + let id = run_id_for(&None); + assert!(id.starts_with("run_")); + assert!(id.len() > 10); + } + + #[test] + fn hash_is_stable_and_hides_content() { + let args = Some(serde_json::json!({ "query": "my secret prompt" })); + let h1 = hash_args(&args); + let h2 = hash_args(&args); + assert_eq!(h1, h2); + assert!(!h1.contains("secret")); + assert_eq!(h1.len(), 16); + } + + #[test] + fn extract_retrieved_from_search_shape() { + let r = serde_json::json!({ + "results": [ + { "id": "m1", "score": 0.9 }, + { "id": "m2", "activation": 0.4 } + ] + }); + let (ids, act) = extract_retrieved(&r); + assert_eq!(ids, vec!["m1", "m2"]); + assert_eq!(act["m1"], 0.9); + assert_eq!(act["m2"], 0.4); + } + + #[test] + fn extract_retrieved_from_deep_reference_shape() { + let r = serde_json::json!({ + "evidence": [ { "id": "e1", "trust": 0.7 } ] + }); + let (ids, act) = extract_retrieved(&r); + assert_eq!(ids, vec!["e1"]); + assert_eq!(act["e1"], 0.7); + } + + #[test] + fn extract_suppressed_from_receipt_and_superseded() { + let r = serde_json::json!({ + "receipt": { "suppressed": [ { "id": "s1", "reason": "contradicted" } ] }, + "superseded": [ { "id": "s2" } ] + }); + let out = extract_suppressed(&r); + assert!(out.contains(&("s1".to_string(), SuppressReason::Contradicted))); + assert!(out.contains(&("s2".to_string(), SuppressReason::Contradicted))); + } + + #[test] + fn extract_writes_single_and_batch() { + let single = serde_json::json!({ "decision": "create", "nodeId": "n1" }); + assert_eq!(extract_writes(&single), vec![("n1".into(), "create".into())]); + let batch = serde_json::json!({ + "results": [ { "decision": "update", "id": "n2" } ] + }); + assert_eq!(extract_writes(&batch), vec![("n2".into(), "update".into())]); + } +}