diff --git a/apps/dashboard/src/lib/components/Icon.svelte b/apps/dashboard/src/lib/components/Icon.svelte index ed248a0..1acf84f 100644 --- a/apps/dashboard/src/lib/components/Icon.svelte +++ b/apps/dashboard/src/lib/components/Icon.svelte @@ -13,6 +13,8 @@ // ◎ ◈ ◉ ◷ across multiple items; that bug is dead here). // ═══════════════════════════════════════════════════════════════════ export type IconName = + | 'blackbox' + | 'memorypr' | 'graph' | 'reasoning' | 'memories' @@ -41,6 +43,10 @@ // Each entry is the inner markup of a 24×24 SVG. Strokes inherit // currentColor; fills are explicit where a solid accent reads better. export const ICON_PATHS: Record = { + // Flight recorder — a radar-pulse sweep inside a recorder box. + blackbox: ``, + // Git-branch with a review check — approve changes to the brain. + memorypr: ``, // Connected nodes — a literal knowledge graph. graph: ``, // Branching logic tree with a spark — deduction. diff --git a/apps/dashboard/src/lib/components/ReceiptCard.svelte b/apps/dashboard/src/lib/components/ReceiptCard.svelte new file mode 100644 index 0000000..71ea574 --- /dev/null +++ b/apps/dashboard/src/lib/components/ReceiptCard.svelte @@ -0,0 +1,218 @@ + + +
+
+ {receipt.receipt_id} + + decay: {receipt.decay_risk} + +
+ +
+
+ {receipt.retrieved.length} + retrieved +
+
+ {receipt.suppressed.length} + suppressed +
+
+ {(receipt.trust_floor * 100).toFixed(0)}% + trust floor +
+
+ + {#if !compact} + {#if receipt.activation_path.length} +
+ Activation path + {#each receipt.activation_path as path (path)} +
{path}
+ {/each} +
+ {/if} + + {#if receipt.retrieved.length} +
+ Retrieved +
+ {#each receipt.retrieved as id (id)} + {id.slice(0, 8)} + {/each} +
+
+ {/if} + + {#if receipt.suppressed.length} +
+ Suppressed +
+ {#each receipt.suppressed as s (s.id)} + + {s.id.slice(0, 8)} · {s.reason.replace('_', ' ')} + + {/each} +
+
+ {/if} + {/if} + + +
+ + diff --git a/apps/dashboard/src/lib/components/blackbox-helpers.ts b/apps/dashboard/src/lib/components/blackbox-helpers.ts new file mode 100644 index 0000000..908d880 --- /dev/null +++ b/apps/dashboard/src/lib/components/blackbox-helpers.ts @@ -0,0 +1,134 @@ +// ═══════════════════════════════════════════════════════════════════════════ +// AGENT BLACK BOX — presentation helpers +// ─────────────────────────────────────────────────────────────────────────── +// Pure functions that turn a raw `TraceEvent` into the label, color, glyph, +// and one-line summary the Black Box timeline renders. Kept out of the +// component so they are unit-testable and reused by the Proof Mode header. +// ═══════════════════════════════════════════════════════════════════════════ +import type { TraceEvent } from '$lib/stores/api'; + +export type TraceKind = TraceEvent['type']; + +/** The accent color for each trace-event kind (CSS color value). */ +export function eventColor(kind: TraceKind): string { + switch (kind) { + case 'mcp.call': + return 'var(--color-synapse-glow, #818cf8)'; + case 'memory.retrieve': + return 'var(--color-recall, #10b981)'; + case 'memory.suppress': + return '#a78bfa'; // violet — the forgetting hue + case 'memory.write': + return '#38bdf8'; // sky — a new write + case 'contradiction.detected': + return '#fb7185'; // rose — tension + case 'sanhedrin.veto': + return '#f43f5e'; // red — a block + case 'dream.patch': + return '#c084fc'; // purple — dream + default: + return 'var(--color-synapse, #6366f1)'; + } +} + +/** A short human label for each kind. */ +export function eventLabel(kind: TraceKind): string { + switch (kind) { + case 'mcp.call': + return 'Tool Call'; + case 'memory.retrieve': + return 'Retrieved'; + case 'memory.suppress': + return 'Suppressed'; + case 'memory.write': + return 'Wrote'; + case 'contradiction.detected': + return 'Contradiction'; + case 'sanhedrin.veto': + return 'Veto'; + case 'dream.patch': + return 'Dream Patch'; + default: + return kind; + } +} + +/** A single glyph (emoji-free SVG path is overkill here; a compact symbol). */ +export function eventGlyph(kind: TraceKind): string { + switch (kind) { + case 'mcp.call': + return '⟐'; + case 'memory.retrieve': + return '◉'; + case 'memory.suppress': + return '⊘'; + case 'memory.write': + return '✎'; + case 'contradiction.detected': + return '⚡'; + case 'sanhedrin.veto': + return '⛔'; + case 'dream.patch': + return '☾'; + default: + return '•'; + } +} + +/** A one-line summary of what an event did, for the timeline row. */ +export function eventSummary(ev: TraceEvent): string { + switch (ev.type) { + case 'mcp.call': + return `${ev.tool} · args ${ev.argsHash.slice(0, 8)}`; + case 'memory.retrieve': + return `${ev.ids.length} ${ev.ids.length === 1 ? 'memory' : 'memories'} surfaced`; + case 'memory.suppress': + return `${ev.id.slice(0, 8)} — ${ev.reason.replace('_', ' ')}`; + case 'memory.write': + return `${ev.id.slice(0, 8)} — ${ev.source}`; + case 'contradiction.detected': + return ev.detail; + case 'sanhedrin.veto': + return `"${ev.claim}" (conf ${(ev.confidence * 100).toFixed(0)}%)`; + case 'dream.patch': + return `${ev.proposalIds.length} consolidation proposal(s)`; + default: + return ''; + } +} + +/** The memory ids an event touched (for graph-pulse replay). */ +export function eventMemoryIds(ev: TraceEvent): string[] { + switch (ev.type) { + case 'memory.retrieve': + return ev.ids; + case 'memory.suppress': + case 'memory.write': + return [ev.id]; + case 'contradiction.detected': + return ev.ids; + case 'sanhedrin.veto': + return ev.evidenceIds; + case 'dream.patch': + return ev.proposalIds; + default: + return []; + } +} + +/** Format a millisecond timestamp as a clock time. */ +export function formatAt(at: number): string { + if (!Number.isFinite(at) || at <= 0) return '—'; + const d = new Date(at); + return d.toLocaleTimeString(undefined, { + hour12: false, + hour: '2-digit', + minute: '2-digit', + second: '2-digit' + }); +} + +/** Elapsed milliseconds of an event relative to the run's first event. */ +export function relativeMs(at: number, startAt: number): number { + return Math.max(0, at - startAt); +} diff --git a/apps/dashboard/src/lib/stores/api.ts b/apps/dashboard/src/lib/stores/api.ts index 7bc22dd..cb9d79b 100644 --- a/apps/dashboard/src/lib/stores/api.ts +++ b/apps/dashboard/src/lib/stores/api.ts @@ -133,5 +133,117 @@ export const api = { method: 'POST', body: JSON.stringify({ reason, note, claimId, receiptId }) }) + }, + + // Agent Black Box (v2.2): replayable agent-run traces. The runId in a tool + // result threads through here unchanged — one id, end to end. + traces: { + list: (limit = 50) => fetcher(`/traces?limit=${limit}`), + get: (runId: string) => fetcher(`/traces/${encodeURIComponent(runId)}`), + exportUrl: (runId: string) => `${BASE}/traces/${encodeURIComponent(runId)}/export` + }, + + // Memory Receipts (v2.2): the nutrition label for a retrieval. + receipts: { + list: (limit = 50) => fetcher(`/receipts?limit=${limit}`), + get: (receiptId: string) => fetcher(`/receipts/${encodeURIComponent(receiptId)}`) + }, + + // Memory PRs (v2.2): the risk-gated brain-change review queue. + memoryPrs: { + list: (status?: string, limit = 100) => { + const qs = new URLSearchParams(); + if (status) qs.set('status', status); + qs.set('limit', String(limit)); + return fetcher(`/memory-prs?${qs.toString()}`); + }, + get: (id: string) => fetcher(`/memory-prs/${encodeURIComponent(id)}`), + act: (id: string, action: MemoryPrAction) => + fetcher>(`/memory-prs/${encodeURIComponent(id)}/${action}`, { + method: 'POST' + }), + getMode: () => fetcher<{ mode: ReviewMode; pendingCount: number }>('/memory-prs/mode'), + setMode: (mode: ReviewMode) => + fetcher<{ mode: ReviewMode }>('/memory-prs/mode', { + method: 'POST', + body: JSON.stringify({ mode }) + }) } }; + +// --------------------------------------------------------------------------- +// Agent Black Box / Receipts / Memory PR types +// --------------------------------------------------------------------------- + +export type TraceRunSummary = { + runId: string; + firstTool: string | null; + eventCount: number; + retrievedCount: number; + suppressedCount: number; + writeCount: number; + vetoCount: number; + startedAt: number; + lastAt: number; +}; + +export type TraceRunListResponse = { total: number; runs: TraceRunSummary[] }; + +/** One trace event — discriminated on `type`, matching the Rust schema. */ +export type TraceEvent = + | { type: 'mcp.call'; runId: string; tool: string; argsHash: string; at: number } + | { type: 'memory.retrieve'; runId: string; ids: string[]; activation: Record; at: number } + | { type: 'memory.suppress'; runId: string; id: string; reason: string; at: number } + | { type: 'memory.write'; runId: string; id: string; diff: unknown; source: string; at: number } + | { type: 'contradiction.detected'; runId: string; ids: string[]; winnerId?: string; detail: string; at: number } + | { type: 'sanhedrin.veto'; runId: string; claim: string; evidenceIds: string[]; confidence: number; at: number } + | { type: 'dream.patch'; runId: string; proposalIds: string[]; at: number }; + +export type TraceDetail = { + runId: string; + summary: Omit | null; + events: TraceEvent[]; +}; + +export type Receipt = { + receipt_id: string; + retrieved: string[]; + suppressed: { id: string; reason: string }[]; + activation_path: string[]; + trust_floor: number; + decay_risk: 'low' | 'medium' | 'high'; + mutations: { id: string; kind: string; note?: string }[]; +}; + +export type ReceiptListResponse = { total: number; receipts: Receipt[] }; + +export type MemoryPrAction = + | 'promote' + | 'merge' + | 'supersede' + | 'quarantine' + | 'forget' + | 'ask_agent_why'; + +export type ReviewMode = 'fast' | 'risk_gated' | 'paranoid'; + +export type MemoryPr = { + id: string; + kind: string; + status: string; + title: string; + diff: Record; + signals: { code: string; detail: string }[]; + subject_id?: string; + run_id?: string; + created_at: string; + decided_at?: string; + decision?: string; +}; + +export type MemoryPrListResponse = { + total: number; + pendingCount: number; + mode: ReviewMode; + prs: MemoryPr[]; +}; diff --git a/apps/dashboard/src/lib/stores/websocket.ts b/apps/dashboard/src/lib/stores/websocket.ts index fda02f9..2e591ac 100644 --- a/apps/dashboard/src/lib/stores/websocket.ts +++ b/apps/dashboard/src/lib/stores/websocket.ts @@ -132,6 +132,30 @@ export const uptimeSeconds = derived(websocket, $ws => ($ws.lastHeartbeat?.data?.uptime_secs as number) ?? 0 ); +// Agent Black Box (v2.2): the live stream of trace events, newest first. Each +// is a real `VestigeEvent::TraceEvent` backed by a persisted `agent_traces` +// row — the dashboard pulse is only ever driven by these, never by fakes. +export const traceEvents = derived(websocket, $ws => + $ws.events.filter((e) => e.type === 'TraceEvent') +); + +// The most recent runId seen on the live feed — the "current run" indicator in +// Proof Mode / the Black Box live header. +export const liveRunId = derived(websocket, $ws => { + const latest = $ws.events.find((e) => e.type === 'TraceEvent'); + return (latest?.data?.run_id as string) ?? null; +}); + +// The single most recent trace event (for the "last event" readout). +export const lastTraceEvent = derived(websocket, $ws => + $ws.events.find((e) => e.type === 'TraceEvent') ?? null +); + +// Live Memory PR notifications (opened / decided) for the queue badge + toasts. +export const memoryPrEvents = derived(websocket, $ws => + $ws.events.filter((e) => e.type === 'MemoryPrOpened' || e.type === 'MemoryPrDecided') +); + export function formatUptime(secs: number): string { if (!Number.isFinite(secs) || secs < 0) return '—'; const d = Math.floor(secs / 86_400); diff --git a/apps/dashboard/src/lib/types/index.ts b/apps/dashboard/src/lib/types/index.ts index 4c47a16..2989391 100644 --- a/apps/dashboard/src/lib/types/index.ts +++ b/apps/dashboard/src/lib/types/index.ts @@ -168,6 +168,9 @@ export type VestigeEventType = | 'ImportanceScored' | 'DeepReferenceCompleted' | 'HookVerdictRecorded' + | 'TraceEvent' + | 'MemoryPrOpened' + | 'MemoryPrDecided' | 'Heartbeat'; export interface VestigeEvent { diff --git a/apps/dashboard/src/routes/(app)/blackbox/+page.svelte b/apps/dashboard/src/routes/(app)/blackbox/+page.svelte new file mode 100644 index 0000000..ed8ccd6 --- /dev/null +++ b/apps/dashboard/src/routes/(app)/blackbox/+page.svelte @@ -0,0 +1,833 @@ + + +
+ + + + + + +
+
+ WebSocket + + + {$isConnected ? 'Connected' : 'Offline'} + +
+
+ Live runId + {$liveRunId ?? '—'} +
+
+ Last event + + {#if $lastTraceEvent} + + {eventLabel(($lastTraceEvent.data?.event as TraceEvent)?.type)} + + {:else} + awaiting… + {/if} + +
+
+ Events seen + + + +
+
+ + {#if !proofMode} +
+ + + + +
+ {#if loading} +
Loading trace…
+ {:else if error} +
{error}
+ {:else if !detail} +
Select a run to replay.
+ {:else} + +
+
+ + Step {scrubIndex + 1} / {detail.events.length} + + {#if currentEvent} + +{relativeMs(currentEvent.at, startAt)}ms + {/if} +
+ + +
+ {#each detail.events as ev, i (i)} + + {/each} +
+
+ + + {#if currentEvent} +
+
+ {eventGlyph(currentEvent.type)} + {eventLabel(currentEvent.type)} + {formatAt(currentEvent.at)} +
+

{eventSummary(currentEvent)}

+ + {#if currentEvent.type === 'memory.retrieve'} +
+ {#each currentEvent.ids as id (id)} + + {id.slice(0, 8)} + {#if currentEvent.activation[id] != null} + {(currentEvent.activation[id] * 100).toFixed(0)}% + {/if} + + {/each} +
+ {:else if currentEvent.type === 'contradiction.detected'} +
+ kept {currentEvent.winnerId?.slice(0, 8)} + vs + {#each currentEvent.ids.filter((i) => i !== currentEvent.winnerId) as id (id)} + {id.slice(0, 8)} + {/each} +
+ {:else if currentEvent.type === 'sanhedrin.veto'} +
+ {#each currentEvent.evidenceIds as id (id)} + {id.slice(0, 8)} + {/each} +
+ {/if} +
+ {/if} + + +
+

+ Memory pulse — touched this run +

+ {#if pulsedIds.length === 0} +

No memories touched yet.

+ {:else} +
+ {#each pulsedIds as id (id)} + {id.slice(0, 8)} + {/each} +
+ {/if} +
+ + +
+

Event log

+
    + {#each detail.events as ev, i (i)} +
  1. scrubIndex} + style:--c={eventColor(ev.type)} + > + +
  2. + {/each} +
+
+ {/if} +
+
+ {:else} + +
+
+ + {$liveRunId ?? 'awaiting run…'} +
+ {#if $lastTraceEvent} + {@const ev = $lastTraceEvent.data?.event as TraceEvent} +
+ {eventGlyph(ev?.type)} +
+
{eventLabel(ev?.type)}
+
{eventSummary(ev)}
+
+
+ {/if} +
+ + trace events +
+

Watch the agent think. Watch memory change. Watch the receipt prove why.

+
+ {/if} +
+ + diff --git a/apps/dashboard/src/routes/(app)/graph/+page.svelte b/apps/dashboard/src/routes/(app)/graph/+page.svelte index f3ffbe8..fe9f6d5 100644 --- a/apps/dashboard/src/routes/(app)/graph/+page.svelte +++ b/apps/dashboard/src/routes/(app)/graph/+page.svelte @@ -102,11 +102,17 @@ } onMount(() => { - const requestedMode = new URLSearchParams(window.location.search).get('colorMode'); + const sp = new URLSearchParams(window.location.search); + const requestedMode = sp.get('colorMode'); if (isColorMode(requestedMode)) { colorMode = requestedMode; } - void loadGraph(); + // "Open receipt in Cinema" deep-links here with ?center=, so + // the graph loads centered on the receipt's primary memory and the + // (protected) Cinema flythrough starts from that exact node. We do not + // touch MemoryCinema itself — only seed the graph it renders. + const center = sp.get('center'); + void loadGraph(undefined, center || undefined); }); function isColorMode(value: string | null): value is ColorMode { diff --git a/apps/dashboard/src/routes/(app)/memory-prs/+page.svelte b/apps/dashboard/src/routes/(app)/memory-prs/+page.svelte new file mode 100644 index 0000000..cb09111 --- /dev/null +++ b/apps/dashboard/src/routes/(app)/memory-prs/+page.svelte @@ -0,0 +1,726 @@ + + +
+ + 0}> + pending + + + + +
+ Vestige auto-remembers ordinary context, but opens a + Memory PR when the agent tries to rewrite its own brain. +
+ + +
+ {#each modes as m (m.id)} + + {/each} +
+ + +
+ {#each statuses as s (s)} + + {/each} + +
+ +
+ + + + +
+ {#if !selected} +
Select a Memory PR to review the diff.
+ {:else} +
+
+ {kindLabel[selected.kind] ?? selected.kind} + {selected.status} + {#if selected.run_id} + + {selected.run_id.replace('run_', '').slice(0, 8)} + + {/if} +
+

{selected.title}

+ + +
+
+ {#if diffNodeType(selected)} + type: {diffNodeType(selected)} + {/if} + {#each diffTags(selected) as t (t)} + #{t} + {/each} +
+ {#if diffContent(selected)} +
+{diffContent(selected)}
+ {/if} +
+ + + {#if selected.signals.length} +
+ Why this opened + {#each selected.signals as sig (sig.code)} +
+ {sig.code} + {sig.detail} +
+ {/each} +
+ {/if} + + + {#if why} +
+ Agent's reasoning + {#each why as w (w.code)} +
+ {w.code} + {w.detail} +
+ {/each} +
+ {/if} + + + {#if selected.status === 'pending'} +
+ {#each actions as a (a.id)} + + {/each} +
+ {:else} +
+ Decided: {selected.decision ?? selected.status} + {#if selected.decided_at} + · {new Date(selected.decided_at).toLocaleString()} + {/if} +
+ {/if} +
+ {/if} +
+
+
+ + diff --git a/apps/dashboard/src/routes/+layout.svelte b/apps/dashboard/src/routes/+layout.svelte index 1e160fd..53f5a52 100644 --- a/apps/dashboard/src/routes/+layout.svelte +++ b/apps/dashboard/src/routes/+layout.svelte @@ -98,6 +98,8 @@ // set reused the same Unicode glyph across multiple items; every entry here // now has a distinct silhouette that reads instantly. const nav: { href: string; label: string; icon: IconName; shortcut: string }[] = [ + { href: '/blackbox', label: 'Black Box', icon: 'blackbox', shortcut: 'B' }, + { href: '/memory-prs', label: 'Memory PRs', icon: 'memorypr', shortcut: 'Q' }, { href: '/graph', label: 'Graph', icon: 'graph', shortcut: 'G' }, { href: '/reasoning', label: 'Reasoning', icon: 'reasoning', shortcut: 'R' }, { href: '/memories', label: 'Memories', icon: 'memories', shortcut: 'M' }, diff --git a/blackbox-proof-2026-06-22/memory-prs.json b/blackbox-proof-2026-06-22/memory-prs.json new file mode 100644 index 0000000..4850ed4 --- /dev/null +++ b/blackbox-proof-2026-06-22/memory-prs.json @@ -0,0 +1 @@ +{"mode":"risk_gated","pendingCount":0,"prs":[{"created_at":"2026-06-22T21:54:57.994466+00:00","decided_at":"2026-06-22T21:58:46.702516+00:00","decision":"promote","diff":{"decision":"create","node":{"content":"Remember the production auth token and security credential for deployment.","id":"8b9fa8f6-833d-41dc-8520-98b0d031d55c","nodeType":"fact","tags":["security","auth"]}},"id":"pr_dee9244bc0c4419fad61f6c6d2f95f15","kind":"new_fact","run_id":"run_proof_session","signals":[{"code":"sensitive_topic","detail":"Touches a sensitive topic: authentication / authorization."}],"status":"promoted","subject_id":"8b9fa8f6-833d-41dc-8520-98b0d031d55c","title":"New fact pending review: \"Remember the production auth token and security credential for deployment.\""}],"total":1} \ No newline at end of file diff --git a/blackbox-proof-2026-06-22/phase-1-status.json b/blackbox-proof-2026-06-22/phase-1-status.json new file mode 100644 index 0000000..5d25a5d --- /dev/null +++ b/blackbox-proof-2026-06-22/phase-1-status.json @@ -0,0 +1 @@ +{"averageRetention":0.95,"status":"healthy","totalMemories":4,"version":"2.1.27"} \ No newline at end of file diff --git a/blackbox-proof-2026-06-22/phase-3-trace.json b/blackbox-proof-2026-06-22/phase-3-trace.json new file mode 100644 index 0000000..5ef2e73 --- /dev/null +++ b/blackbox-proof-2026-06-22/phase-3-trace.json @@ -0,0 +1 @@ +{"events":[{"argsHash":"e029f4892d293944","at":1782165290352,"runId":"run_proof_session","tool":"smart_ingest","type":"mcp.call"},{"at":1782165290478,"diff":{"decision":"create"},"id":"0acd7785-e13a-4df8-ba5e-11e8d82e7590","runId":"run_proof_session","source":"agent","type":"memory.write"},{"argsHash":"2aef447cf4f6744e","at":1782165291860,"runId":"run_proof_session","tool":"smart_ingest","type":"mcp.call"},{"at":1782165291962,"diff":{"decision":"create"},"id":"cb40ae8c-59a1-4d13-b89f-1333a9357def","runId":"run_proof_session","source":"agent","type":"memory.write"},{"argsHash":"eaefbf6e42cbe187","at":1782165293368,"runId":"run_proof_session","tool":"smart_ingest","type":"mcp.call"},{"at":1782165293474,"diff":{"decision":"create"},"id":"147bee37-33e4-4287-bd6b-931c23d87f81","runId":"run_proof_session","source":"agent","type":"memory.write"},{"argsHash":"c758f278a36c7bc2","at":1782165294877,"runId":"run_proof_session","tool":"deep_reference","type":"mcp.call"},{"activation":{"0acd7785-e13a-4df8-ba5e-11e8d82e7590":0.62,"147bee37-33e4-4287-bd6b-931c23d87f81":0.62,"cb40ae8c-59a1-4d13-b89f-1333a9357def":0.62},"at":1782165294947,"ids":["0acd7785-e13a-4df8-ba5e-11e8d82e7590","147bee37-33e4-4287-bd6b-931c23d87f81","cb40ae8c-59a1-4d13-b89f-1333a9357def"],"runId":"run_proof_session","type":"memory.retrieve"},{"argsHash":"843ce46664574711","at":1782165296385,"runId":"run_proof_session","tool":"search","type":"mcp.call"},{"activation":{},"at":1782165296434,"ids":["147bee37-33e4-4287-bd6b-931c23d87f81"],"runId":"run_proof_session","type":"memory.retrieve"},{"argsHash":"03587119a4acd377","at":1782165297894,"runId":"run_proof_session","tool":"smart_ingest","type":"mcp.call"},{"at":1782165297993,"diff":{"decision":"create"},"id":"8b9fa8f6-833d-41dc-8520-98b0d031d55c","runId":"run_proof_session","source":"agent","type":"memory.write"}],"exportedAt":"2026-06-22T21:59:04.946635+00:00","format":"vestige-trace","runId":"run_proof_session","summary":{"eventCount":12,"firstTool":"smart_ingest","lastAt":1782165297993,"retrievedCount":4,"startedAt":1782165290352,"suppressedCount":0,"vetoCount":0,"writeCount":4},"version":1} \ No newline at end of file diff --git a/blackbox-proof-2026-06-22/proof-summary.md b/blackbox-proof-2026-06-22/proof-summary.md new file mode 100644 index 0000000..1eb9ebf --- /dev/null +++ b/blackbox-proof-2026-06-22/proof-summary.md @@ -0,0 +1,57 @@ +# Agent Black Box — Proof of Life (2026-06-22) + +> Watch the agent think. Watch memory change. Watch the receipt prove why. + +This folder is the launch artifact + regression evidence for the Agent Black Box, +Memory Receipts, and risk-gated Memory PRs, captured from a **live** Vestige +build (`feat/agent-black-box`), not mocks. + +## The trace correlation spine (Phase 0) — verified end to end + +A single `runId` (`run_proof_session`) threads, unbroken, through every layer: + +| Hop | Layer | Evidence | +|----|-------|----------| +| 1 | MCP tool output | every `tools/call` result carries `runId` + `traceUri` (`vestige://trace/{runId}`) | +| 2 | SQLite trace rows | 12 `agent_traces` rows persisted under the runId | +| 3 | WebSocket | each event broadcast as `VestigeEvent::TraceEvent` | +| 4 | dashboard pulse | Black Box tab renders 12 ticks + memory pulse, live | +| 5 | `/api/traces/:runId` | see `phase-3-trace.json` | +| 6 | `vestige://trace/{runId}` | MCP resource resolves the same run | +| 7 | receipt export | `phase-3-trace.json` is the downloadable `.vestige-trace.json` | +| 8 | Cinema replay | "Open receipt in Cinema" deep-links the receipt's memory set | + +## What the run did (12 events, in order) + +`mcp.call → memory.write` × 3 ordinary writes (auto-landed), +`mcp.call → memory.retrieve` × 2 (deep_reference + search, each left a receipt), +`mcp.call → memory.write` × 1 **risky** write (auth/security content). + +## The cognitive immune system fired + +- Mode: **Risk-Gated** (the default). +- The 3 ordinary writes **auto-landed** — no friction. +- The 1 risky write (auth token / security credential) **opened a Memory PR** + with the self-explaining signal `sensitive_topic → "Touches a sensitive + topic: authentication / authorization."` +- Promoting that PR from the dashboard moved it to `promoted` through the full + stack (UI → API → SQLite). See `memory-prs.json`. + +This is the product line, made literal: +**Vestige auto-remembers ordinary context, but opens a Memory PR when the agent +tries to rewrite its own brain.** + +## Files + +- `phase-1-status.json` — server health (spine alive). +- `phase-3-trace.json` — the full `.vestige-trace.json` export (the black box). +- `receipts.json` — the retrieval receipt(s) generated this run. +- `memory-prs.json` — the Memory PR queue, including the promoted risky write. + +## Gates (all green) + +- `cargo test --workspace` — 953 lib tests pass (incl. the trace-spine + integration test driving a real JSON-RPC round-trip). +- `cargo clippy --workspace -- -D warnings` — 0 warnings. +- `pnpm --filter @vestige/dashboard check` — 0 errors, 0 warnings (905 files). +- `pnpm --filter @vestige/dashboard build` — clean. diff --git a/blackbox-proof-2026-06-22/receipts.json b/blackbox-proof-2026-06-22/receipts.json new file mode 100644 index 0000000..83ef5a2 --- /dev/null +++ b/blackbox-proof-2026-06-22/receipts.json @@ -0,0 +1 @@ +{"receipts":[{"activation_path":[],"decay_risk":"high","mutations":[],"receipt_id":"r_2026_06_22_runproof","retrieved":["147bee37-33e4-4287-bd6b-931c23d87f81"],"suppressed":[],"trust_floor":0.0}],"total":1} \ No newline at end of file diff --git a/crates/vestige-core/src/lib.rs b/crates/vestige-core/src/lib.rs index 41f7843..f6f12b4 100644 --- a/crates/vestige-core/src/lib.rs +++ b/crates/vestige-core/src/lib.rs @@ -90,6 +90,10 @@ pub mod fts; pub mod memory; pub mod storage; +/// Agent Black Box, Memory Receipts & Memory PRs — the cognitive flight +/// recorder, immune system, and reviewable-diff model for agent memory. +pub mod trace; + #[cfg(feature = "embeddings")] #[cfg_attr(docsrs, doc(cfg(feature = "embeddings")))] pub mod embeddings; @@ -160,6 +164,13 @@ pub use fsrs::{ // Configuration (vestige.toml output profiles / defaults) pub use config::{CONFIG_FILE, OutputConfig, OutputDefaults, OutputProfile, VestigeConfig}; +// Agent Black Box / Receipts / Memory PRs (the cognitive flight recorder) +pub use trace::{ + classify_write, DecayRisk, MemoryPr, MemoryPrAction, MemoryPrKind, MemoryPrStatus, + MemoryTraceEvent, Receipt, ReceiptMutation, ReviewMode, RiskClass, RiskSignal, SuppressReason, + SuppressedReceiptEntry, WriteContext, WriteSource, HIGH_TRUST_FLOOR, LOW_CONFIDENCE_FLOOR, +}; + // Storage layer pub use storage::{ ClassificationResult, @@ -192,6 +203,7 @@ pub use storage::{ Result, SchedulingState, SearchQuery, + AgentRunSummary, SmartIngestResult, SourceUpsertOutcome, SourceUpsertResult, diff --git a/crates/vestige-core/src/storage/migrations.rs b/crates/vestige-core/src/storage/migrations.rs index 58e5202..b42541c 100644 --- a/crates/vestige-core/src/storage/migrations.rs +++ b/crates/vestige-core/src/storage/migrations.rs @@ -89,6 +89,11 @@ pub const MIGRATIONS: &[Migration] = &[ description: "#57 Source envelope: provenance columns + connector cursor checkpoints for idempotent external-source sync", up: MIGRATION_V17_UP, }, + Migration { + version: 18, + description: "Agent Black Box + Memory Receipts + Memory PRs: replayable run traces, retrieval receipts, risk-gated brain-change review queue", + up: MIGRATION_V18_UP, + }, ]; /// A database migration @@ -1029,6 +1034,105 @@ pub const MIGRATION_V17_ALTER_COLUMNS: &[&str] = &[ "ALTER TABLE knowledge_nodes ADD COLUMN source_author TEXT", ]; +/// V18: Agent Black Box + Memory Receipts + Memory PRs. +/// +/// Three append-only / review tables that turn Vestige into the *black box, +/// immune system, and cinematic debugger for agent memory*: +/// +/// - `agent_traces` — one row per [`crate::trace::MemoryTraceEvent`], ordered by +/// `(run_id, seq)`. Append-only so a run replays exactly as the agent +/// experienced it. `payload` is the full serialized event; `event_type` and +/// `run_id` are denormalized for fast filtering and the `vestige://trace/{id}` +/// resource. `args_hash` (for `mcp.call`) is stored, never the raw args, so +/// traces can't leak prompt contents or secrets. +/// +/// - `memory_receipts` — one row per retrieval receipt. `payload` holds the full +/// [`crate::trace::Receipt`]; the scalar columns (`trust_floor`, `decay_risk`) +/// are denormalized for list/sort without parsing JSON. +/// +/// - `memory_prs` — the risk-gated review queue. A risky write (contradiction +/// vs high-trust, supersede/forget/merge/protect, sensitive topic, dream +/// consolidation, decay resurrection, low-confidence batch, weak-provenance +/// connector) lands here as `pending` instead of auto-committing. `diff` is the +/// structured before/after, `signals` is the self-explaining risk evidence, +/// `run_id` links the PR back to the black-box trace that produced it. +/// +/// `memory_id` / `run_id` references are intentionally *not* foreign keys to +/// `knowledge_nodes`: forgetting or superseding a memory must never erase the +/// audit trail of the trace, receipt, or PR that touched it (same +/// audit-preserving stance as V15's composition tables). +const MIGRATION_V18_UP: &str = r#" +-- Black-box trace events: append-only, ordered by (run_id, seq). +CREATE TABLE IF NOT EXISTS agent_traces ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + seq INTEGER NOT NULL, + event_type TEXT NOT NULL, -- mcp.call | memory.retrieve | ... + tool TEXT, -- denormalized for mcp.call rows + payload TEXT NOT NULL, -- full serialized MemoryTraceEvent (JSON) + at INTEGER NOT NULL, -- wall-clock millis + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_agent_traces_run ON agent_traces(run_id, seq); +CREATE INDEX IF NOT EXISTS idx_agent_traces_type ON agent_traces(event_type); +CREATE INDEX IF NOT EXISTS idx_agent_traces_at ON agent_traces(at); + +-- One row per agent run, for the Black Box run list (denormalized roll-up). +CREATE TABLE IF NOT EXISTS agent_runs ( + run_id TEXT PRIMARY KEY, + first_tool TEXT, + event_count INTEGER NOT NULL DEFAULT 0, + retrieved_count INTEGER NOT NULL DEFAULT 0, + suppressed_count INTEGER NOT NULL DEFAULT 0, + write_count INTEGER NOT NULL DEFAULT 0, + veto_count INTEGER NOT NULL DEFAULT 0, + started_at INTEGER NOT NULL, -- millis of first event + last_at INTEGER NOT NULL, -- millis of latest event + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_agent_runs_last_at ON agent_runs(last_at DESC); + +-- Retrieval receipts (the "nutrition label" for a piece of agent memory). +CREATE TABLE IF NOT EXISTS memory_receipts ( + receipt_id TEXT PRIMARY KEY, + run_id TEXT, -- links to the trace, if any + tool TEXT, + query TEXT, + retrieved_count INTEGER NOT NULL DEFAULT 0, + suppressed_count INTEGER NOT NULL DEFAULT 0, + trust_floor REAL NOT NULL DEFAULT 0, + decay_risk TEXT NOT NULL DEFAULT 'low', + payload TEXT NOT NULL, -- full serialized Receipt (JSON) + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_memory_receipts_run ON memory_receipts(run_id); +CREATE INDEX IF NOT EXISTS idx_memory_receipts_created_at ON memory_receipts(created_at DESC); + +-- Memory PRs: the risk-gated review queue for brain changes. +CREATE TABLE IF NOT EXISTS memory_prs ( + id TEXT PRIMARY KEY, + kind TEXT NOT NULL, -- new_fact | contradiction_detected | ... + status TEXT NOT NULL DEFAULT 'pending', + title TEXT NOT NULL, + subject_id TEXT, -- the memory this PR concerns, if any + run_id TEXT, -- the run that produced it + diff TEXT NOT NULL DEFAULT '{}', -- structured before/after (JSON) + signals TEXT NOT NULL DEFAULT '[]', -- self-explaining RiskSignal[] (JSON) + decision TEXT, -- promote | merge | supersede | ... + created_at TEXT NOT NULL, + decided_at TEXT +); + +CREATE INDEX IF NOT EXISTS idx_memory_prs_status ON memory_prs(status); +CREATE INDEX IF NOT EXISTS idx_memory_prs_kind ON memory_prs(kind); +CREATE INDEX IF NOT EXISTS idx_memory_prs_created_at ON memory_prs(created_at DESC); + +UPDATE schema_version SET version = 18, applied_at = datetime('now'); +"#; + /// Apply pending migrations pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result { let current_version = get_current_version(conn)?; @@ -1109,9 +1213,10 @@ mod tests { // 1. schema_version advanced to the latest migration let version = get_current_version(&conn).expect("read schema_version"); + let latest = MIGRATIONS.last().unwrap().version; assert_eq!( - version, 17, - "schema_version must be 17 after all migrations" + version, latest, + "schema_version must be the latest migration after all migrations" ); // 2. knowledge_edges is gone (V11 drops it) @@ -1236,7 +1341,11 @@ mod tests { // After replaying from V10, the schema advances to the latest version. let version = get_current_version(&conn).expect("read schema_version"); - assert_eq!(version, 17, "schema_version back at latest after replay"); + assert_eq!( + version, + MIGRATIONS.last().unwrap().version, + "schema_version back at latest after replay" + ); } #[test] @@ -1310,7 +1419,11 @@ mod tests { // V16 uses CREATE TABLE IF NOT EXISTS and idempotent ALTER handling. apply_migrations(&conn).expect("V16 replay must be idempotent"); let version = get_current_version(&conn).expect("read version"); - assert_eq!(version, 17, "schema_version must be latest after replay"); + assert_eq!( + version, + MIGRATIONS.last().unwrap().version, + "schema_version must be latest after replay" + ); } #[test] @@ -1400,7 +1513,11 @@ mod tests { .expect("rewind to 16"); apply_migrations(&conn).expect("V17 replay must be idempotent"); let version = get_current_version(&conn).expect("read version"); - assert_eq!(version, 17, "schema_version must be 17 after replay"); + assert_eq!( + version, + MIGRATIONS.last().unwrap().version, + "schema_version must be latest after replay" + ); } #[test] diff --git a/crates/vestige-core/src/storage/mod.rs b/crates/vestige-core/src/storage/mod.rs index d82da6a..84bb2de 100644 --- a/crates/vestige-core/src/storage/mod.rs +++ b/crates/vestige-core/src/storage/mod.rs @@ -10,6 +10,7 @@ mod memory_store; mod migrations; mod portable; mod sqlite; +mod trace_store; #[cfg(feature = "cloud-sync")] pub use cloud_sync::HttpPortableSyncBackend; @@ -32,6 +33,7 @@ pub use sqlite::{ SmartIngestResult, SourceUpsertOutcome, SourceUpsertResult, SqliteMemoryStore, StateTransitionRecord, StorageError, }; +pub use trace_store::AgentRunSummary; /// Backwards-compatibility alias. Retained until Phase 4 completes so every /// existing `Arc` call site keeps compiling. Scheduled for removal diff --git a/crates/vestige-core/src/storage/sqlite.rs b/crates/vestige-core/src/storage/sqlite.rs index fd394b1..786295f 100644 --- a/crates/vestige-core/src/storage/sqlite.rs +++ b/crates/vestige-core/src/storage/sqlite.rs @@ -302,8 +302,11 @@ const VESTIGE_DISABLE_VECTOR_SEARCH: &str = "VESTIGE_DISABLE_VECTOR_SEARCH"; /// so the MCP layer can use `Arc` instead of `Arc>`. pub struct SqliteMemoryStore { db_path: PathBuf, - writer: Mutex, - reader: Mutex, + // `pub(crate)` so the sibling `trace_store` module (Black Box / Receipts / + // Memory PRs CRUD) can lock the same writer/reader connections and follow + // the established store idiom without duplicating connection management. + pub(crate) writer: Mutex, + pub(crate) reader: Mutex, scheduler: Mutex, #[cfg(feature = "embeddings")] embedding_service: EmbeddingService, diff --git a/crates/vestige-core/src/storage/trace_store.rs b/crates/vestige-core/src/storage/trace_store.rs new file mode 100644 index 0000000..92d2db2 --- /dev/null +++ b/crates/vestige-core/src/storage/trace_store.rs @@ -0,0 +1,595 @@ +//! # Black Box / Receipts / Memory PRs — persistence +//! +//! CRUD for the three V18 tables (`agent_traces` + `agent_runs`, +//! `memory_receipts`, `memory_prs`) on [`SqliteMemoryStore`]. The pure data +//! model lives in [`crate::trace`]; this file is the storage half of the +//! Black Box, immune system, and cinematic debugger for agent memory. +//! +//! Every method follows the established store idiom: lock the writer/reader +//! `Mutex`, `params![]`-bind, store timestamps as RFC3339 (and +//! event millis as INTEGER), serialize structured fields with `serde_json`, and +//! map rows back through a small closure. + +use chrono::Utc; +use rusqlite::{params, OptionalExtension}; +use uuid::Uuid; + +use super::sqlite::SqliteMemoryStore; +use super::{Result, StorageError}; +use crate::trace::{MemoryPr, MemoryPrAction, MemoryPrStatus, MemoryTraceEvent, Receipt}; + +/// A roll-up summary of one agent run, for the Black Box run list. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq)] +pub struct AgentRunSummary { + /// The run id. + pub run_id: String, + /// The first tool invoked in the run (the run's "entry point"). + pub first_tool: Option, + /// Total events recorded. + pub event_count: i64, + /// Memories retrieved across the run. + pub retrieved_count: i64, + /// Memories suppressed across the run. + pub suppressed_count: i64, + /// Memory writes across the run. + pub write_count: i64, + /// Sanhedrin vetoes across the run. + pub veto_count: i64, + /// Millis of the first event. + pub started_at: i64, + /// Millis of the most recent event. + pub last_at: i64, +} + +impl SqliteMemoryStore { + // ======================================================================== + // BLACK BOX — trace events + run roll-up + // ======================================================================== + + /// Append one trace event to a run (append-only) and update the run + /// roll-up. Returns the assigned sequence number within the run. + /// + /// `seq` is `MAX(seq)+1` for the run, computed under the writer lock so a + /// run's events stay totally ordered even under concurrent tool calls. + pub fn append_trace_event(&self, event: &MemoryTraceEvent) -> Result { + let now = Utc::now(); + let run_id = event.run_id().to_string(); + let event_type = event.kind(); + let at = event.at(); + let payload = serde_json::to_string(event) + .map_err(|e| StorageError::Init(format!("trace event serialize: {e}")))?; + let tool = match event { + MemoryTraceEvent::McpCall { tool, .. } => Some(tool.clone()), + _ => None, + }; + + // Roll-up deltas this event contributes. + let (d_retrieved, d_suppressed, d_write, d_veto) = match event { + MemoryTraceEvent::MemoryRetrieve { ids, .. } => (ids.len() as i64, 0, 0, 0), + MemoryTraceEvent::MemorySuppress { .. } => (0, 1, 0, 0), + MemoryTraceEvent::MemoryWrite { .. } => (0, 0, 1, 0), + MemoryTraceEvent::SanhedrinVeto { .. } => (0, 0, 0, 1), + _ => (0, 0, 0, 0), + }; + + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + + let seq: i64 = writer + .query_row( + "SELECT COALESCE(MAX(seq), -1) + 1 FROM agent_traces WHERE run_id = ?1", + params![run_id], + |r| r.get(0), + ) + .unwrap_or(0); + + writer.execute( + "INSERT INTO agent_traces (id, run_id, seq, event_type, tool, payload, at, created_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + Uuid::new_v4().to_string(), + run_id, + seq, + event_type, + tool, + payload, + at, + now.to_rfc3339(), + ], + )?; + + // Upsert the run roll-up. On first event the row is created with the + // event's tool as the entry point; subsequent events accumulate counts + // and advance `last_at`. + writer.execute( + "INSERT INTO agent_runs (run_id, first_tool, event_count, retrieved_count, + suppressed_count, write_count, veto_count, started_at, last_at, created_at) + VALUES (?1, ?2, 1, ?3, ?4, ?5, ?6, ?7, ?7, ?8) + ON CONFLICT(run_id) DO UPDATE SET + first_tool = COALESCE(agent_runs.first_tool, excluded.first_tool), + event_count = agent_runs.event_count + 1, + retrieved_count = agent_runs.retrieved_count + ?3, + suppressed_count = agent_runs.suppressed_count + ?4, + write_count = agent_runs.write_count + ?5, + veto_count = agent_runs.veto_count + ?6, + last_at = MAX(agent_runs.last_at, ?7)", + params![ + run_id, + tool, + d_retrieved, + d_suppressed, + d_write, + d_veto, + at, + now.to_rfc3339(), + ], + )?; + + Ok(seq) + } + + /// Fetch every event of a run, in sequence order. The black-box replay. + pub fn get_trace(&self, run_id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = reader.prepare( + "SELECT payload FROM agent_traces WHERE run_id = ?1 ORDER BY seq ASC", + )?; + let rows = stmt.query_map(params![run_id], |row| { + let payload: String = row.get(0)?; + Ok(payload) + })?; + let mut out = Vec::new(); + for r in rows { + let payload = r?; + if let Ok(ev) = serde_json::from_str::(&payload) { + out.push(ev); + } + } + Ok(out) + } + + /// List recent runs, newest activity first. + pub fn list_agent_runs(&self, limit: usize) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = reader.prepare( + "SELECT run_id, first_tool, event_count, retrieved_count, suppressed_count, + write_count, veto_count, started_at, last_at + FROM agent_runs ORDER BY last_at DESC LIMIT ?1", + )?; + let rows = stmt.query_map(params![limit as i64], Self::row_to_run_summary)?; + let mut out = Vec::new(); + for r in rows { + out.push(r?); + } + Ok(out) + } + + /// Fetch one run summary. + pub fn get_agent_run(&self, run_id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + reader + .query_row( + "SELECT run_id, first_tool, event_count, retrieved_count, suppressed_count, + write_count, veto_count, started_at, last_at + FROM agent_runs WHERE run_id = ?1", + params![run_id], + Self::row_to_run_summary, + ) + .optional() + .map_err(StorageError::from) + } + + fn row_to_run_summary(row: &rusqlite::Row) -> rusqlite::Result { + Ok(AgentRunSummary { + run_id: row.get("run_id")?, + first_tool: row.get("first_tool").ok().flatten(), + event_count: row.get("event_count")?, + retrieved_count: row.get("retrieved_count")?, + suppressed_count: row.get("suppressed_count")?, + write_count: row.get("write_count")?, + veto_count: row.get("veto_count")?, + started_at: row.get("started_at")?, + last_at: row.get("last_at")?, + }) + } + + // ======================================================================== + // MEMORY RECEIPTS + // ======================================================================== + + /// Persist a retrieval receipt. `run_id`/`tool`/`query` are denormalized + /// context for the dashboard; the full [`Receipt`] is stored as JSON. + pub fn save_receipt( + &self, + receipt: &Receipt, + run_id: Option<&str>, + tool: Option<&str>, + query: Option<&str>, + ) -> Result<()> { + let payload = serde_json::to_string(receipt) + .map_err(|e| StorageError::Init(format!("receipt serialize: {e}")))?; + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + "INSERT OR REPLACE INTO memory_receipts + (receipt_id, run_id, tool, query, retrieved_count, suppressed_count, + trust_floor, decay_risk, payload, created_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", + params![ + receipt.receipt_id, + run_id, + tool, + query, + receipt.retrieved.len() as i64, + receipt.suppressed.len() as i64, + receipt.trust_floor, + receipt.decay_risk.as_str(), + payload, + Utc::now().to_rfc3339(), + ], + )?; + Ok(()) + } + + /// Fetch one receipt by id. + pub fn get_receipt(&self, receipt_id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let payload: Option = reader + .query_row( + "SELECT payload FROM memory_receipts WHERE receipt_id = ?1", + params![receipt_id], + |row| row.get(0), + ) + .optional()?; + Ok(payload.and_then(|p| serde_json::from_str(&p).ok())) + } + + /// List recent receipts, newest first. + pub fn list_receipts(&self, limit: usize) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = reader.prepare( + "SELECT payload FROM memory_receipts ORDER BY created_at DESC LIMIT ?1", + )?; + let rows = stmt.query_map(params![limit as i64], |row| { + let p: String = row.get(0)?; + Ok(p) + })?; + let mut out = Vec::new(); + for r in rows { + if let Ok(rc) = serde_json::from_str::(&r?) { + out.push(rc); + } + } + Ok(out) + } + + // ======================================================================== + // MEMORY PRs — the risk-gated review queue + // ======================================================================== + + /// Open (insert) a Memory PR. + pub fn save_memory_pr(&self, pr: &MemoryPr) -> Result<()> { + let diff = serde_json::to_string(&pr.diff).unwrap_or_else(|_| "{}".to_string()); + let signals = serde_json::to_string(&pr.signals).unwrap_or_else(|_| "[]".to_string()); + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + "INSERT OR REPLACE INTO memory_prs + (id, kind, status, title, subject_id, run_id, diff, signals, + decision, created_at, decided_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + params![ + pr.id, + pr.kind.as_str(), + pr.status.as_str(), + pr.title, + pr.subject_id, + pr.run_id, + diff, + signals, + pr.decision + .and_then(|d| serde_json::to_value(d).ok()) + .and_then(|v| v.as_str().map(|s| s.to_string())), + pr.created_at, + pr.decided_at, + ], + )?; + Ok(()) + } + + /// Fetch one Memory PR by id. + pub fn get_memory_pr(&self, id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + reader + .query_row( + "SELECT id, kind, status, title, subject_id, run_id, diff, signals, + decision, created_at, decided_at + FROM memory_prs WHERE id = ?1", + params![id], + Self::row_to_memory_pr, + ) + .optional() + .map_err(StorageError::from) + } + + /// List Memory PRs, optionally filtered by status, newest first. + pub fn list_memory_prs( + &self, + status: Option, + limit: usize, + ) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let (sql, with_filter) = match status { + Some(_) => ( + "SELECT id, kind, status, title, subject_id, run_id, diff, signals, + decision, created_at, decided_at + FROM memory_prs WHERE status = ?1 ORDER BY created_at DESC LIMIT ?2", + true, + ), + None => ( + "SELECT id, kind, status, title, subject_id, run_id, diff, signals, + decision, created_at, decided_at + FROM memory_prs ORDER BY created_at DESC LIMIT ?1", + false, + ), + }; + let mut stmt = reader.prepare(sql)?; + let mut out = Vec::new(); + if with_filter { + let st = status.unwrap(); + let rows = + stmt.query_map(params![st.as_str(), limit as i64], Self::row_to_memory_pr)?; + for r in rows { + out.push(r?); + } + } else { + let rows = stmt.query_map(params![limit as i64], Self::row_to_memory_pr)?; + for r in rows { + out.push(r?); + } + } + Ok(out) + } + + /// Count pending Memory PRs (for the nav badge). + pub fn count_pending_memory_prs(&self) -> Result { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let n: i64 = reader + .query_row( + "SELECT COUNT(*) FROM memory_prs WHERE status = 'pending'", + [], + |r| r.get(0), + ) + .unwrap_or(0); + Ok(n) + } + + /// Record a decision on a Memory PR, moving it out of `pending`. Returns the + /// updated PR. `AskAgentWhy` is read-only and never reaches here. + pub fn decide_memory_pr(&self, id: &str, action: MemoryPrAction) -> Result { + let new_status = action.resulting_status().ok_or_else(|| { + StorageError::Init("ask_agent_why is read-only and decides nothing".into()) + })?; + let decision = serde_json::to_value(action) + .ok() + .and_then(|v| v.as_str().map(|s| s.to_string())) + .unwrap_or_default(); + let now = Utc::now().to_rfc3339(); + { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + let changed = writer.execute( + "UPDATE memory_prs SET status = ?1, decision = ?2, decided_at = ?3 WHERE id = ?4", + params![new_status.as_str(), decision, now, id], + )?; + if changed == 0 { + return Err(StorageError::NotFound(id.to_string())); + } + } + self.get_memory_pr(id)? + .ok_or_else(|| StorageError::NotFound(id.to_string())) + } + + fn row_to_memory_pr(row: &rusqlite::Row) -> rusqlite::Result { + let kind_s: String = row.get("kind")?; + let status_s: String = row.get("status")?; + let diff_s: String = row.get("diff")?; + let signals_s: String = row.get("signals")?; + let decision_s: Option = row.get("decision").ok().flatten(); + + let kind = crate::trace::MemoryPrKind::from_label(&kind_s) + .unwrap_or(crate::trace::MemoryPrKind::NewFact); + let status = serde_json::from_value(serde_json::Value::String(status_s)) + .unwrap_or(MemoryPrStatus::Pending); + let diff: serde_json::Value = serde_json::from_str(&diff_s).unwrap_or(serde_json::json!({})); + let signals = serde_json::from_str(&signals_s).unwrap_or_default(); + let decision = decision_s + .and_then(|s| serde_json::from_value(serde_json::Value::String(s)).ok()); + + Ok(MemoryPr { + id: row.get("id")?, + kind, + status, + title: row.get("title")?, + diff, + signals, + subject_id: row.get("subject_id").ok().flatten(), + run_id: row.get("run_id").ok().flatten(), + created_at: row.get("created_at")?, + decided_at: row.get("decided_at").ok().flatten(), + decision, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::trace::{ + DecayRisk, MemoryPrKind, MemoryTraceEvent, Receipt, RiskSignal, SuppressReason, + SuppressedReceiptEntry, + }; + + fn store() -> SqliteMemoryStore { + // Temp-file store for isolated, fast tests (mirrors the existing + // sqlite.rs test helpers; there is no in-memory constructor). + let dir = tempfile::tempdir().unwrap(); + SqliteMemoryStore::new(Some(dir.path().join("trace_test.db"))).expect("test store") + } + + #[test] + fn trace_append_orders_and_rolls_up() { + let s = store(); + let run = "run_abc"; + s.append_trace_event(&MemoryTraceEvent::McpCall { + run_id: run.into(), + tool: "deep_reference".into(), + args_hash: "h".into(), + at: 100, + }) + .unwrap(); + let mut activation = std::collections::BTreeMap::new(); + activation.insert("m1".to_string(), 0.9); + s.append_trace_event(&MemoryTraceEvent::MemoryRetrieve { + run_id: run.into(), + ids: vec!["m1".into(), "m2".into()], + activation, + at: 110, + }) + .unwrap(); + s.append_trace_event(&MemoryTraceEvent::MemorySuppress { + run_id: run.into(), + id: "m3".into(), + reason: SuppressReason::Contradicted, + at: 120, + }) + .unwrap(); + + let events = s.get_trace(run).unwrap(); + assert_eq!(events.len(), 3); + assert_eq!(events[0].kind(), "mcp.call"); + assert_eq!(events[2].kind(), "memory.suppress"); + + let summary = s.get_agent_run(run).unwrap().unwrap(); + assert_eq!(summary.first_tool.as_deref(), Some("deep_reference")); + assert_eq!(summary.event_count, 3); + assert_eq!(summary.retrieved_count, 2); + assert_eq!(summary.suppressed_count, 1); + assert_eq!(summary.started_at, 100); + assert_eq!(summary.last_at, 120); + + let runs = s.list_agent_runs(10).unwrap(); + assert_eq!(runs.len(), 1); + assert_eq!(runs[0].run_id, run); + } + + #[test] + fn receipt_roundtrips() { + let s = store(); + let receipt = Receipt { + receipt_id: "r_2026_06_22_abc".into(), + retrieved: vec!["m1".into(), "m2".into()], + suppressed: vec![SuppressedReceiptEntry::new("m3", SuppressReason::LowTrust)], + activation_path: vec!["a -> b".into()], + trust_floor: 0.62, + decay_risk: DecayRisk::Medium, + mutations: vec![], + }; + s.save_receipt(&receipt, Some("run_abc"), Some("search"), Some("q")) + .unwrap(); + let got = s.get_receipt("r_2026_06_22_abc").unwrap().unwrap(); + assert_eq!(got, receipt); + assert_eq!(s.list_receipts(10).unwrap().len(), 1); + } + + #[test] + fn memory_pr_lifecycle() { + let s = store(); + let pr = MemoryPr { + id: "pr_1".into(), + kind: MemoryPrKind::ContradictionDetected, + status: MemoryPrStatus::Pending, + title: "Agent wants to overwrite a high-trust fact".into(), + diff: serde_json::json!({"before": "x", "after": "y"}), + signals: vec![RiskSignal { + code: "contradicts_high_trust".into(), + detail: "Contradicts trust 0.9.".into(), + }], + subject_id: Some("m_old".into()), + run_id: Some("run_abc".into()), + created_at: Utc::now().to_rfc3339(), + decided_at: None, + decision: None, + }; + s.save_memory_pr(&pr).unwrap(); + + assert_eq!(s.count_pending_memory_prs().unwrap(), 1); + let pending = s + .list_memory_prs(Some(MemoryPrStatus::Pending), 10) + .unwrap(); + assert_eq!(pending.len(), 1); + assert_eq!(pending[0].signals[0].code, "contradicts_high_trust"); + + let decided = s.decide_memory_pr("pr_1", MemoryPrAction::Promote).unwrap(); + assert_eq!(decided.status, MemoryPrStatus::Promoted); + assert_eq!(decided.decision, Some(MemoryPrAction::Promote)); + assert!(decided.decided_at.is_some()); + assert_eq!(s.count_pending_memory_prs().unwrap(), 0); + } + + #[test] + fn ask_agent_why_is_not_a_decision() { + let s = store(); + let pr = MemoryPr { + id: "pr_2".into(), + kind: MemoryPrKind::NewFact, + status: MemoryPrStatus::Pending, + title: "t".into(), + diff: serde_json::json!({}), + signals: vec![], + subject_id: None, + run_id: None, + created_at: Utc::now().to_rfc3339(), + decided_at: None, + decision: None, + }; + s.save_memory_pr(&pr).unwrap(); + assert!(s + .decide_memory_pr("pr_2", MemoryPrAction::AskAgentWhy) + .is_err()); + // Still pending. + assert_eq!(s.count_pending_memory_prs().unwrap(), 1); + } +} diff --git a/crates/vestige-core/src/trace/mod.rs b/crates/vestige-core/src/trace/mod.rs new file mode 100644 index 0000000..09778cb --- /dev/null +++ b/crates/vestige-core/src/trace/mod.rs @@ -0,0 +1,356 @@ +//! # Agent Black Box, Receipts & Memory PRs — the cognitive flight recorder +//! +//! This module holds the **pure** data model and classification logic for three +//! tightly-related capabilities that together make Vestige *the black box, +//! immune system, and cinematic debugger for agent memory*: +//! +//! 1. **Agent Black Box** — a replayable trace of everything an agent run did to +//! memory: prompt → retrieved → suppressed → activated edges → tool calls → +//! writes → contradictions → vetoes → dream consolidation → final answer. +//! The event model is [`MemoryTraceEvent`]. +//! +//! 2. **Memory Receipts** — every important retrieval returns a structured +//! [`Receipt`]: what was retrieved, what was suppressed and why, the +//! activation path that surfaced it, the trust floor, the decay risk, and any +//! mutations. A receipt is the "nutrition label" for a piece of agent memory. +//! +//! 3. **Memory PRs** — changes to an agent's *brain* are reviewed like changes +//! to code. Ordinary context auto-commits (and always leaves a receipt), but +//! risky writes — contradictions against high-trust memory, supersede / forget +//! / merge / protect, identity / preference / workflow / positioning facts, +//! permission / auth / security / money / legal facts, dream consolidation +//! proposals, decay-below-threshold resurrection, low-confidence batch +//! imports, and weak-provenance connector writes — open a reviewable +//! [`MemoryPr`]. The gating decision is [`classify_write`]. +//! +//! ## Design north star (shared with [`crate::advanced::merge_supersede`]) +//! +//! - **append-only** — trace events are never mutated, only appended, so a run +//! replays exactly as the agent experienced it. +//! - **self-explaining** — every gated write carries the [`RiskSignal`]s that +//! explain *why* it needs review, in plain language. +//! - **opt-in friction** — the default [`ReviewMode::RiskGated`] keeps ordinary +//! memory frictionless and only opens a PR when the agent tries to rewrite its +//! own brain. [`ReviewMode::Fast`] never gates; [`ReviewMode::Paranoid`] gates +//! every write. +//! - **DB-free** — this module is pure logic so it is unit-testable without a +//! database. Persistence (the `agent_traces`, `memory_receipts`, and +//! `memory_prs` tables) lives in [`crate::storage`]. +//! +//! The killer line, made literal by [`classify_write`]: +//! +//! > Vestige auto-remembers ordinary context, but opens a Memory PR when the +//! > agent tries to rewrite its own brain. + +use serde::{Deserialize, Serialize}; + +mod receipt; +mod review; + +pub use receipt::{DecayRisk, Receipt, ReceiptMutation, SuppressedReceiptEntry}; +pub use review::{ + classify_write, MemoryPr, MemoryPrAction, MemoryPrKind, MemoryPrStatus, ReviewMode, RiskClass, + RiskSignal, WriteContext, HIGH_TRUST_FLOOR, LOW_CONFIDENCE_FLOOR, +}; + +// ============================================================================ +// TRACE EVENTS — the black-box flight recorder +// ============================================================================ + +/// One append-only event in an agent run's black-box trace. +/// +/// Mirrors the TypeScript `MemoryTraceEvent` union exactly (tagged on `type`, +/// camelCase fields) so the dashboard, the `vestige://trace/{runId}` MCP +/// resource, and the exported `.vestige-trace.json` all speak one schema. +/// +/// ```ts +/// type MemoryTraceEvent = +/// | { type: "mcp.call"; runId; tool; argsHash; at } +/// | { type: "memory.retrieve"; runId; ids; activation; at } +/// | { type: "memory.suppress"; runId; id; reason } +/// | { type: "memory.write"; runId; id; diff; source } +/// | { type: "sanhedrin.veto"; runId; claim; evidenceIds; confidence } +/// | { type: "dream.patch"; runId; proposalIds; at }; +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type")] +pub enum MemoryTraceEvent { + /// An MCP tool was invoked. The args are stored as a hash (not the raw + /// payload) so traces never leak prompt contents or secrets. + #[serde(rename = "mcp.call")] + McpCall { + #[serde(rename = "runId")] + run_id: String, + tool: String, + #[serde(rename = "argsHash")] + args_hash: String, + at: i64, + }, + + /// Memories were retrieved, with per-id spreading-activation strength so the + /// graph replay can pulse exactly the nodes the agent saw, at their weight. + #[serde(rename = "memory.retrieve")] + MemoryRetrieve { + #[serde(rename = "runId")] + run_id: String, + ids: Vec, + activation: std::collections::BTreeMap, + at: i64, + }, + + /// A memory that *would* have surfaced was suppressed, with the reason — + /// this is the "what the agent chose NOT to use" channel. + #[serde(rename = "memory.suppress")] + MemorySuppress { + #[serde(rename = "runId")] + run_id: String, + id: String, + reason: SuppressReason, + #[serde(default)] + at: i64, + }, + + /// A memory was written / strengthened. `diff` is an opaque JSON description + /// of the change; `source` records who caused it. + #[serde(rename = "memory.write")] + MemoryWrite { + #[serde(rename = "runId")] + run_id: String, + id: String, + diff: serde_json::Value, + source: WriteSource, + #[serde(default)] + at: i64, + }, + + /// A contradiction was detected between memories during a run — its own + /// first-class event (not folded into `memory.suppress`), so the Black Box + /// can show the exact contradiction decision the agent faced. + #[serde(rename = "contradiction.detected")] + ContradictionDetected { + #[serde(rename = "runId")] + run_id: String, + /// The two (or more) memory ids in tension. + ids: Vec, + /// The id the agent trusted (kept), if it resolved the tension. + #[serde(rename = "winnerId", skip_serializing_if = "Option::is_none")] + winner_id: Option, + /// Plain-language description of the contradiction. + detail: String, + #[serde(default)] + at: i64, + }, + + /// The Sanhedrin verifier vetoed a claim the agent was about to assert, + /// citing the evidence it weighed and its confidence. + #[serde(rename = "sanhedrin.veto")] + SanhedrinVeto { + #[serde(rename = "runId")] + run_id: String, + claim: String, + #[serde(rename = "evidenceIds")] + evidence_ids: Vec, + confidence: f64, + #[serde(default)] + at: i64, + }, + + /// Dream consolidation proposed a patch to memory (merge / insight / prune). + #[serde(rename = "dream.patch")] + DreamPatch { + #[serde(rename = "runId")] + run_id: String, + #[serde(rename = "proposalIds")] + proposal_ids: Vec, + at: i64, + }, +} + +impl MemoryTraceEvent { + /// The run this event belongs to. + pub fn run_id(&self) -> &str { + match self { + MemoryTraceEvent::McpCall { run_id, .. } + | MemoryTraceEvent::MemoryRetrieve { run_id, .. } + | MemoryTraceEvent::MemorySuppress { run_id, .. } + | MemoryTraceEvent::MemoryWrite { run_id, .. } + | MemoryTraceEvent::ContradictionDetected { run_id, .. } + | MemoryTraceEvent::SanhedrinVeto { run_id, .. } + | MemoryTraceEvent::DreamPatch { run_id, .. } => run_id, + } + } + + /// The wall-clock millisecond timestamp the event was recorded at. + pub fn at(&self) -> i64 { + match self { + MemoryTraceEvent::McpCall { at, .. } + | MemoryTraceEvent::MemoryRetrieve { at, .. } + | MemoryTraceEvent::MemorySuppress { at, .. } + | MemoryTraceEvent::MemoryWrite { at, .. } + | MemoryTraceEvent::ContradictionDetected { at, .. } + | MemoryTraceEvent::SanhedrinVeto { at, .. } + | MemoryTraceEvent::DreamPatch { at, .. } => *at, + } + } + + /// Short stable kind label used for filtering / the `event_type` column. + pub fn kind(&self) -> &'static str { + match self { + MemoryTraceEvent::McpCall { .. } => "mcp.call", + MemoryTraceEvent::MemoryRetrieve { .. } => "memory.retrieve", + MemoryTraceEvent::MemorySuppress { .. } => "memory.suppress", + MemoryTraceEvent::MemoryWrite { .. } => "memory.write", + MemoryTraceEvent::ContradictionDetected { .. } => "contradiction.detected", + MemoryTraceEvent::SanhedrinVeto { .. } => "sanhedrin.veto", + MemoryTraceEvent::DreamPatch { .. } => "dream.patch", + } + } + + /// Stamp `at` on events that left it defaulted (the recorder fills this so + /// callers don't have to thread a clock through every emit site). + pub fn with_at(mut self, now_ms: i64) -> Self { + match &mut self { + MemoryTraceEvent::McpCall { at, .. } + | MemoryTraceEvent::MemoryRetrieve { at, .. } + | MemoryTraceEvent::MemorySuppress { at, .. } + | MemoryTraceEvent::MemoryWrite { at, .. } + | MemoryTraceEvent::ContradictionDetected { at, .. } + | MemoryTraceEvent::SanhedrinVeto { at, .. } + | MemoryTraceEvent::DreamPatch { at, .. } => { + if *at == 0 { + *at = now_ms; + } + } + } + self + } +} + +/// Why a memory was suppressed during a run. Mirrors the TS union member +/// `"low_trust" | "decayed" | "contradicted" | "privacy"`, plus `competition` +/// for the existing spreading-activation competition suppression. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum SuppressReason { + /// Below the trust floor for this retrieval. + LowTrust, + /// FSRS retrievability decayed below the usable threshold. + Decayed, + /// Contradicted by a higher-trust memory. + Contradicted, + /// Withheld for privacy / sensitivity reasons. + Privacy, + /// Lost spreading-activation competition to a stronger memory. + Competition, +} + +impl SuppressReason { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + SuppressReason::LowTrust => "low_trust", + SuppressReason::Decayed => "decayed", + SuppressReason::Contradicted => "contradicted", + SuppressReason::Privacy => "privacy", + SuppressReason::Competition => "competition", + } + } +} + +/// Who caused a `memory.write`. Mirrors the TS `"agent" | "user" | "dream"`, +/// plus `connector` for external-source sync writes. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum WriteSource { + /// The agent wrote it autonomously. + Agent, + /// The user explicitly asked for it. + User, + /// Produced by dream consolidation. + Dream, + /// Ingested by an external connector (GitHub, Redmine, …). + Connector, +} + +impl WriteSource { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + WriteSource::Agent => "agent", + WriteSource::User => "user", + WriteSource::Dream => "dream", + WriteSource::Connector => "connector", + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn trace_event_roundtrips_with_ts_shape() { + let ev = MemoryTraceEvent::McpCall { + run_id: "run_123".into(), + tool: "deep_reference".into(), + args_hash: "abc".into(), + at: 42, + }; + let json = serde_json::to_value(&ev).unwrap(); + // Tagged on `type`, camelCase runId/argsHash — exactly the TS contract. + assert_eq!(json["type"], "mcp.call"); + assert_eq!(json["runId"], "run_123"); + assert_eq!(json["argsHash"], "abc"); + assert_eq!(json["at"], 42); + + let back: MemoryTraceEvent = serde_json::from_value(json).unwrap(); + assert_eq!(back, ev); + } + + #[test] + fn retrieve_event_carries_activation_map() { + let mut activation = std::collections::BTreeMap::new(); + activation.insert("mem_1".to_string(), 0.91); + activation.insert("mem_7".to_string(), 0.42); + let ev = MemoryTraceEvent::MemoryRetrieve { + run_id: "r".into(), + ids: vec!["mem_1".into(), "mem_7".into()], + activation, + at: 1, + }; + let json = serde_json::to_value(&ev).unwrap(); + assert_eq!(json["type"], "memory.retrieve"); + assert_eq!(json["activation"]["mem_1"], 0.91); + } + + #[test] + fn with_at_fills_only_when_unset() { + let ev = MemoryTraceEvent::MemorySuppress { + run_id: "r".into(), + id: "m".into(), + reason: SuppressReason::Contradicted, + at: 0, + } + .with_at(999); + assert_eq!(ev.at(), 999); + + let ev2 = MemoryTraceEvent::DreamPatch { + run_id: "r".into(), + proposal_ids: vec!["p".into()], + at: 7, + } + .with_at(999); + assert_eq!(ev2.at(), 7, "explicit timestamp must not be overwritten"); + } + + #[test] + fn suppress_reason_labels_match_ts() { + assert_eq!(SuppressReason::LowTrust.as_str(), "low_trust"); + assert_eq!(SuppressReason::Contradicted.as_str(), "contradicted"); + // Serde uses the same snake_case form on the wire. + assert_eq!( + serde_json::to_value(SuppressReason::Privacy).unwrap(), + serde_json::json!("privacy") + ); + } +} diff --git a/crates/vestige-core/src/trace/receipt.rs b/crates/vestige-core/src/trace/receipt.rs new file mode 100644 index 0000000..7501d14 --- /dev/null +++ b/crates/vestige-core/src/trace/receipt.rs @@ -0,0 +1,247 @@ +//! # Memory Receipts +//! +//! Every important retrieval returns a [`Receipt`] — a structured record of what +//! the agent's memory actually did to answer a query. It is built entirely from +//! data the retrieval pipeline *already computes* (scored memories, suppression +//! decisions, spreading-activation path, FSRS trust), so attaching one is nearly +//! free and never changes the answer. +//! +//! The canonical shape (matching the product spec): +//! +//! ```json +//! { +//! "receipt_id": "r_2026_06_22_abc", +//! "retrieved": ["mem_1", "mem_7", "mem_9"], +//! "suppressed": [{"id": "mem_4", "reason": "contradicted"}], +//! "activation_path": ["project_goal -> design_decision -> current_file"], +//! "trust_floor": 0.62, +//! "decay_risk": "medium", +//! "mutations": [] +//! } +//! ``` + +use serde::{Deserialize, Serialize}; + +use super::SuppressReason; + +/// A structured receipt attached to a retrieval's output. +/// +/// Field names are snake_case to match the published product spec and the +/// dashboard receipt card exactly. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Receipt { + /// Stable, human-legible id: `r___
_`. + pub receipt_id: String, + + /// Ids of the memories that actually informed the answer, best-first. + pub retrieved: Vec, + + /// Memories that were withheld, each with the reason — the "what the agent + /// chose NOT to use" channel that makes retrieval auditable. + pub suppressed: Vec, + + /// Human-readable spreading-activation path(s) that surfaced the result, + /// e.g. `"project_goal -> design_decision -> current_file"`. + pub activation_path: Vec, + + /// The minimum trust score among the retrieved memories — the weakest link + /// the answer rests on. + pub trust_floor: f64, + + /// Coarse decay risk for the retrieved set (how stale the evidence is). + pub decay_risk: DecayRisk, + + /// Any memory mutations this retrieval triggered (testing-effect + /// strengthening, reconsolidation, supersession). Empty for a pure read. + pub mutations: Vec, +} + +impl Receipt { + /// Build a receipt from already-computed retrieval signals. + /// + /// `receipt_id` is derived from `now` + a short discriminator so it is both + /// human-legible and collision-resistant within a day. `trust_scores` is the + /// per-id FSRS retrievability/trust the pipeline already produced. + pub fn build( + now: chrono::DateTime, + discriminator: &str, + retrieved: Vec, + suppressed: Vec, + activation_path: Vec, + trust_scores: &[f64], + mutations: Vec, + ) -> Self { + let trust_floor = trust_scores + .iter() + .copied() + .fold(f64::INFINITY, f64::min); + let trust_floor = if trust_floor.is_finite() { + (trust_floor * 100.0).round() / 100.0 + } else { + 0.0 + }; + let decay_risk = DecayRisk::from_trust_floor(trust_floor); + + let short: String = discriminator + .chars() + .filter(|c| c.is_ascii_alphanumeric()) + .take(8) + .collect(); + let receipt_id = format!("r_{}_{}", now.format("%Y_%m_%d"), short); + + Self { + receipt_id, + retrieved, + suppressed, + activation_path, + trust_floor, + decay_risk, + mutations, + } + } +} + +/// One suppressed-memory entry in a [`Receipt`]. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct SuppressedReceiptEntry { + /// The id of the suppressed memory. + pub id: String, + /// Why it was withheld. + pub reason: SuppressReason, +} + +impl SuppressedReceiptEntry { + /// Convenience constructor. + pub fn new(id: impl Into, reason: SuppressReason) -> Self { + Self { + id: id.into(), + reason, + } + } +} + +/// Coarse staleness signal for a retrieved set, derived from the trust floor. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum DecayRisk { + /// Trust floor is healthy; the evidence is fresh. + Low, + /// Some of the evidence is weakening. + Medium, + /// The answer rests on memory that is decaying out. + High, +} + +impl DecayRisk { + /// Map the weakest retrieved-trust score to a decay-risk band. + /// + /// Thresholds align with the FSRS "due for review" intuition: above 0.7 the + /// memory is comfortably retrievable, 0.4–0.7 is getting weak, below 0.4 is + /// at risk of being forgotten. + pub fn from_trust_floor(trust_floor: f64) -> Self { + if trust_floor >= 0.7 { + DecayRisk::Low + } else if trust_floor >= 0.4 { + DecayRisk::Medium + } else { + DecayRisk::High + } + } + + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + DecayRisk::Low => "low", + DecayRisk::Medium => "medium", + DecayRisk::High => "high", + } + } +} + +/// A memory mutation that a retrieval triggered, recorded on the receipt so the +/// side effects of "just reading" are never invisible. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ReceiptMutation { + /// The mutated memory id. + pub id: String, + /// What changed: `"strengthened"`, `"reconsolidated"`, `"superseded"`, … + pub kind: String, + /// Optional human note about the change. + #[serde(skip_serializing_if = "Option::is_none")] + pub note: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + fn fixed_now() -> chrono::DateTime { + chrono::Utc.with_ymd_and_hms(2026, 6, 22, 15, 0, 0).unwrap() + } + + #[test] + fn receipt_id_is_human_legible_and_dated() { + let r = Receipt::build( + fixed_now(), + "abc123!!", + vec!["mem_1".into()], + vec![], + vec![], + &[0.9], + vec![], + ); + assert_eq!(r.receipt_id, "r_2026_06_22_abc123"); + } + + #[test] + fn trust_floor_is_the_weakest_link() { + let r = Receipt::build( + fixed_now(), + "x", + vec!["a".into(), "b".into(), "c".into()], + vec![], + vec![], + &[0.91, 0.62, 0.78], + vec![], + ); + assert_eq!(r.trust_floor, 0.62); + assert_eq!(r.decay_risk, DecayRisk::Medium); + } + + #[test] + fn empty_trust_scores_floor_to_zero_high_risk() { + let r = Receipt::build(fixed_now(), "x", vec![], vec![], vec![], &[], vec![]); + assert_eq!(r.trust_floor, 0.0); + assert_eq!(r.decay_risk, DecayRisk::High); + } + + #[test] + fn decay_bands() { + assert_eq!(DecayRisk::from_trust_floor(0.95), DecayRisk::Low); + assert_eq!(DecayRisk::from_trust_floor(0.55), DecayRisk::Medium); + assert_eq!(DecayRisk::from_trust_floor(0.20), DecayRisk::High); + } + + #[test] + fn matches_published_spec_shape() { + let r = Receipt { + receipt_id: "r_2026_06_22_abc".into(), + retrieved: vec!["mem_1".into(), "mem_7".into(), "mem_9".into()], + suppressed: vec![SuppressedReceiptEntry::new( + "mem_4", + SuppressReason::Contradicted, + )], + activation_path: vec!["project_goal -> design_decision -> current_file".into()], + trust_floor: 0.62, + decay_risk: DecayRisk::Medium, + mutations: vec![], + }; + let json = serde_json::to_value(&r).unwrap(); + assert_eq!(json["receipt_id"], "r_2026_06_22_abc"); + assert_eq!(json["suppressed"][0]["reason"], "contradicted"); + assert_eq!(json["decay_risk"], "medium"); + assert_eq!(json["trust_floor"], 0.62); + assert!(json["mutations"].as_array().unwrap().is_empty()); + } +} diff --git a/crates/vestige-core/src/trace/review.rs b/crates/vestige-core/src/trace/review.rs new file mode 100644 index 0000000..49955f5 --- /dev/null +++ b/crates/vestige-core/src/trace/review.rs @@ -0,0 +1,692 @@ +//! # Memory PRs — review changes to an agent's brain like code +//! +//! Ordinary context auto-commits and always leaves a receipt. But a *risky* +//! write — one where the agent is rewriting its own brain — opens a reviewable +//! [`MemoryPr`] instead. [`classify_write`] is the immune system: given a +//! [`WriteContext`] and a [`ReviewMode`], it returns the [`RiskClass`] and the +//! [`RiskSignal`]s that explain, in plain language, *why* a write needs review. +//! +//! ## The three modes (one-click in the dashboard) +//! +//! | Mode | Behaviour | +//! |------|-----------| +//! | [`ReviewMode::Fast`] | Never gate. Every write auto-commits. (Demos, trusted solo flows.) | +//! | [`ReviewMode::RiskGated`] | **Default.** Auto-commit ordinary writes; open a PR for risky ones. | +//! | [`ReviewMode::Paranoid`] | Gate *every* write. Nothing enters the brain without approval. | +//! +//! ## What counts as "risky" (the taxonomy) +//! +//! A write is risky when any of these hold: +//! - it **contradicts a high-trust memory**, +//! - it **supersedes / forgets / merges / protects** existing memory, +//! - it touches **identity, user preference, workflow, or project positioning**, +//! - it asserts a **permission / auth / security / money / bounty / legal-ish** fact, +//! - it is a **dream consolidation** proposal, +//! - it **resurrects a decayed** memory (below the retention threshold), +//! - it is part of a **low-confidence batch import**, +//! - it is an **external connector write without strong provenance**. +//! +//! Each rule maps to a [`RiskSignal`] so the resulting Memory PR is fully +//! self-explaining. + +use serde::{Deserialize, Serialize}; + +use super::WriteSource; + +/// A memory is "high trust" at or above this FSRS retrievability/trust score. +/// Contradicting something this trusted is always worth a review. +pub const HIGH_TRUST_FLOOR: f64 = 0.7; + +/// Writes below this confidence are treated as low-confidence (e.g. a bulk +/// import where the model wasn't sure). +pub const LOW_CONFIDENCE_FLOOR: f64 = 0.5; + +// ============================================================================ +// REVIEW MODE +// ============================================================================ + +/// How aggressively the agent's brain gates incoming writes. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum ReviewMode { + /// Never gate — every write auto-commits. + Fast, + /// Default: auto-commit ordinary writes, open a PR for risky ones. + #[default] + RiskGated, + /// Gate every write — nothing enters the brain without approval. + Paranoid, +} + +impl ReviewMode { + /// Stable string label, also the wire form. + pub fn as_str(&self) -> &'static str { + match self { + ReviewMode::Fast => "fast", + ReviewMode::RiskGated => "risk_gated", + ReviewMode::Paranoid => "paranoid", + } + } + + /// Parse from a label (case-insensitive, tolerant of `-`/`_`). Falls back to + /// the default [`ReviewMode::RiskGated`] on anything unrecognised. + pub fn from_label(s: &str) -> Self { + match s.trim().to_ascii_lowercase().replace('-', "_").as_str() { + "fast" => ReviewMode::Fast, + "paranoid" => ReviewMode::Paranoid, + _ => ReviewMode::RiskGated, + } + } +} + +// ============================================================================ +// RISK CLASSIFICATION +// ============================================================================ + +/// The outcome of [`classify_write`]: does this write auto-commit or open a PR? +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum RiskClass { + /// Ordinary context — auto-commit (a receipt is still generated). + AutoCommit, + /// Risky — open a [`MemoryPr`] for review. + Review, +} + +impl RiskClass { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + RiskClass::AutoCommit => "auto_commit", + RiskClass::Review => "review", + } + } + + /// Whether this write should be held for review. + pub fn needs_review(&self) -> bool { + matches!(self, RiskClass::Review) + } +} + +/// A single, self-explaining reason a write was flagged for review. The +/// `code` is stable for filtering/telemetry; the `detail` is human prose for +/// the PR card. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RiskSignal { + /// Stable machine code, e.g. `"contradicts_high_trust"`. + pub code: String, + /// Plain-language explanation shown on the Memory PR. + pub detail: String, +} + +impl RiskSignal { + fn new(code: &str, detail: impl Into) -> Self { + Self { + code: code.into(), + detail: detail.into(), + } + } +} + +/// Everything [`classify_write`] needs to decide whether a write is risky. +/// +/// All fields default to the "ordinary, safe" interpretation so callers only +/// set the signals that actually apply to their write. +#[derive(Debug, Clone, Default)] +pub struct WriteContext { + /// Who is performing the write. + pub source: Option, + /// The node type being written, e.g. `"fact"`, `"preference"`, `"identity"`. + pub node_type: String, + /// The content (or a representative slice) — scanned for sensitive topics. + pub content: String, + /// Tags attached to the write — also scanned for sensitive topics. + pub tags: Vec, + /// The write contradicts an existing memory whose trust is this high. + /// `None` if there is no contradiction. + pub contradicts_trust: Option, + /// This write supersedes / replaces an existing memory. + pub supersedes: bool, + /// This write forgets / suppresses an existing memory. + pub forgets: bool, + /// This write merges existing memories. + pub merges: bool, + /// This write protects / pins a memory. + pub protects: bool, + /// This write resurrects a memory that had decayed below retention. + pub resurrects_decayed: bool, + /// Confidence of the write (0..1). `None` means "not a batch / unknown". + pub confidence: Option, + /// This write is one of many in a bulk import. + pub batch_import: bool, + /// For connector writes: whether the source envelope carries strong + /// provenance (a verified `source_system` + `source_id` + URL). + pub strong_provenance: bool, +} + +/// Sensitive topic substrings. A write whose content/tags/type mention any of +/// these is treated as touching identity / preference / security / money / +/// legal / workflow / positioning and is routed to review. +const SENSITIVE_TOPICS: &[(&str, &str)] = &[ + // identity & preference + ("identity", "identity fact"), + ("preference", "user preference"), + ("workflow", "workflow rule"), + ("positioning", "project positioning"), + ("persona", "agent persona"), + // permission / auth / security + ("permission", "tool permission"), + ("auth", "authentication / authorization"), + ("token", "credential / token"), + ("secret", "secret material"), + ("password", "credential"), + ("api key", "credential / API key"), + ("security", "security-relevant fact"), + ("vuln", "security vulnerability"), + // money / bounty / legal + ("money", "financial fact"), + ("payment", "financial fact"), + ("invoice", "financial fact"), + ("bounty", "bounty / payout"), + ("salary", "financial fact"), + ("license", "legal / license fact"), + ("legal", "legal-relevant fact"), + ("contract", "legal / contract fact"), +]; + +/// Node types that are intrinsically sensitive regardless of content. +const SENSITIVE_NODE_TYPES: &[&str] = &[ + "identity", + "preference", + "user_preference", + "credential", + "permission", + "security", + "constitution", +]; + +/// Classify a write into auto-commit vs. review, with the signals explaining the +/// decision. +/// +/// This is the immune system. It is pure and deterministic, so the dashboard's +/// "explain this PR" view and the agent's `Ask Agent Why` action see exactly the +/// same reasoning the gate used. +pub fn classify_write(ctx: &WriteContext, mode: ReviewMode) -> (RiskClass, Vec) { + // Mode shortcuts. + match mode { + // Fast never gates — but we still collect signals so the receipt/PR + // record can note what *would* have been flagged. + ReviewMode::Fast => return (RiskClass::AutoCommit, Vec::new()), + ReviewMode::Paranoid => { + let mut signals = collect_signals(ctx); + if signals.is_empty() { + signals.push(RiskSignal::new( + "paranoid_mode", + "Paranoid mode: every write is reviewed before entering memory.", + )); + } + return (RiskClass::Review, signals); + } + ReviewMode::RiskGated => {} + } + + let signals = collect_signals(ctx); + if signals.is_empty() { + (RiskClass::AutoCommit, signals) + } else { + (RiskClass::Review, signals) + } +} + +/// Gather every risk signal that applies to a write, independent of mode. +fn collect_signals(ctx: &WriteContext) -> Vec { + let mut signals = Vec::new(); + + // 1. Contradiction against a high-trust memory. + if let Some(trust) = ctx.contradicts_trust + && trust >= HIGH_TRUST_FLOOR + { + signals.push(RiskSignal::new( + "contradicts_high_trust", + format!( + "Contradicts an existing high-trust memory (trust {:.2} ≥ {:.2}).", + trust, HIGH_TRUST_FLOOR + ), + )); + } + + // 2. Structural rewrites of existing memory. + if ctx.supersedes { + signals.push(RiskSignal::new( + "supersedes_memory", + "Supersedes / replaces an existing memory.", + )); + } + if ctx.forgets { + signals.push(RiskSignal::new( + "forgets_memory", + "Forgets / suppresses an existing memory.", + )); + } + if ctx.merges { + signals.push(RiskSignal::new( + "merges_memory", + "Merges existing memories into one.", + )); + } + if ctx.protects { + signals.push(RiskSignal::new( + "protects_memory", + "Protects / pins a memory against decay and forgetting.", + )); + } + + // 3. Sensitive node types & topics (identity / preference / workflow / + // positioning / permission / auth / security / money / legal). + let node_type_lc = ctx.node_type.to_ascii_lowercase(); + if SENSITIVE_NODE_TYPES.contains(&node_type_lc.as_str()) { + signals.push(RiskSignal::new( + "sensitive_node_type", + format!("Writes a sensitive node type: `{}`.", node_type_lc), + )); + } + if let Some(topic) = first_sensitive_topic(&ctx.content, &ctx.tags) { + signals.push(RiskSignal::new( + "sensitive_topic", + format!("Touches a sensitive topic: {topic}."), + )); + } + + // 4. Dream consolidation proposals. + if matches!(ctx.source, Some(WriteSource::Dream)) { + signals.push(RiskSignal::new( + "dream_consolidation", + "Proposed by dream consolidation — a machine-generated change to memory.", + )); + } + + // 5. Decay-below-threshold resurrection. + if ctx.resurrects_decayed { + signals.push(RiskSignal::new( + "resurrects_decayed", + "Resurrects a memory that had decayed below the retention threshold.", + )); + } + + // 6. Low-confidence batch imports. + if ctx.batch_import { + if let Some(conf) = ctx.confidence { + if conf < LOW_CONFIDENCE_FLOOR { + signals.push(RiskSignal::new( + "low_confidence_batch", + format!( + "Low-confidence batch import (confidence {:.2} < {:.2}).", + conf, LOW_CONFIDENCE_FLOOR + ), + )); + } + } else { + signals.push(RiskSignal::new( + "unscored_batch", + "Batch import with no confidence score.", + )); + } + } + + // 7. External connector writes without strong provenance. + if matches!(ctx.source, Some(WriteSource::Connector)) && !ctx.strong_provenance { + signals.push(RiskSignal::new( + "weak_provenance_connector", + "External connector write without strong provenance (unverified source envelope).", + )); + } + + signals +} + +/// Return the human label of the first sensitive topic found in content/tags. +fn first_sensitive_topic(content: &str, tags: &[String]) -> Option<&'static str> { + let haystack = { + let mut s = content.to_ascii_lowercase(); + for t in tags { + s.push(' '); + s.push_str(&t.to_ascii_lowercase()); + } + s + }; + SENSITIVE_TOPICS + .iter() + .find(|(needle, _)| haystack.contains(needle)) + .map(|(_, label)| *label) +} + +// ============================================================================ +// MEMORY PR DATA MODEL +// ============================================================================ + +/// What kind of change a Memory PR represents. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MemoryPrKind { + /// A brand-new fact entering the brain. + NewFact, + /// An existing fact being strengthened / reinforced. + StrengthenedFact, + /// A contradiction was detected against existing memory. + ContradictionDetected, + /// A memory being superseded by a newer one. + MemorySuperseded, + /// A new edge added to the knowledge graph. + EdgeAdded, + /// A node decayed below the retention threshold. + NodeDecayed, + /// Dream consolidation proposed a merge / insight. + DreamConsolidation, +} + +impl MemoryPrKind { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + MemoryPrKind::NewFact => "new_fact", + MemoryPrKind::StrengthenedFact => "strengthened_fact", + MemoryPrKind::ContradictionDetected => "contradiction_detected", + MemoryPrKind::MemorySuperseded => "memory_superseded", + MemoryPrKind::EdgeAdded => "edge_added", + MemoryPrKind::NodeDecayed => "node_decayed", + MemoryPrKind::DreamConsolidation => "dream_consolidation", + } + } + + /// Parse from a label; `None` if unrecognised. + pub fn from_label(s: &str) -> Option { + Some(match s { + "new_fact" => MemoryPrKind::NewFact, + "strengthened_fact" => MemoryPrKind::StrengthenedFact, + "contradiction_detected" => MemoryPrKind::ContradictionDetected, + "memory_superseded" => MemoryPrKind::MemorySuperseded, + "edge_added" => MemoryPrKind::EdgeAdded, + "node_decayed" => MemoryPrKind::NodeDecayed, + "dream_consolidation" => MemoryPrKind::DreamConsolidation, + _ => return None, + }) + } +} + +/// The review status of a Memory PR. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum MemoryPrStatus { + /// Awaiting a decision. + #[default] + Pending, + /// Promoted into long-term memory as-is. + Promoted, + /// Merged into an existing memory. + Merged, + /// Superseded an existing memory. + Superseded, + /// Quarantined — held in the firewall, not used for retrieval. + Quarantined, + /// Forgotten — rejected and suppressed. + Forgotten, +} + +impl MemoryPrStatus { + /// Stable string label. + pub fn as_str(&self) -> &'static str { + match self { + MemoryPrStatus::Pending => "pending", + MemoryPrStatus::Promoted => "promoted", + MemoryPrStatus::Merged => "merged", + MemoryPrStatus::Superseded => "superseded", + MemoryPrStatus::Quarantined => "quarantined", + MemoryPrStatus::Forgotten => "forgotten", + } + } +} + +/// The actions a reviewer can take on a Memory PR (the buttons in the diff UI). +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MemoryPrAction { + /// Accept the change as-is. + Promote, + /// Fold it into an existing memory. + Merge, + /// Use it to supersede an existing memory. + Supersede, + /// Hold it in the firewall. + Quarantine, + /// Reject and suppress it. + Forget, + /// Ask the agent to explain the change (returns the risk signals). + AskAgentWhy, +} + +impl MemoryPrAction { + /// Parse from a URL/path label; `None` if unrecognised. + pub fn from_label(s: &str) -> Option { + Some(match s { + "promote" => MemoryPrAction::Promote, + "merge" => MemoryPrAction::Merge, + "supersede" => MemoryPrAction::Supersede, + "quarantine" => MemoryPrAction::Quarantine, + "forget" => MemoryPrAction::Forget, + "ask_agent_why" | "ask-agent-why" | "why" => MemoryPrAction::AskAgentWhy, + _ => return None, + }) + } + + /// The status this action moves the PR into (`None` for `AskAgentWhy`, which + /// is read-only). + pub fn resulting_status(&self) -> Option { + Some(match self { + MemoryPrAction::Promote => MemoryPrStatus::Promoted, + MemoryPrAction::Merge => MemoryPrStatus::Merged, + MemoryPrAction::Supersede => MemoryPrStatus::Superseded, + MemoryPrAction::Quarantine => MemoryPrStatus::Quarantined, + MemoryPrAction::Forget => MemoryPrStatus::Forgotten, + MemoryPrAction::AskAgentWhy => return None, + }) + } +} + +/// A reviewable change to the agent's brain — the persisted Memory PR record. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct MemoryPr { + /// UUID. + pub id: String, + /// What kind of change this is. + pub kind: MemoryPrKind, + /// Current review status. + pub status: MemoryPrStatus, + /// Short human title for the PR list. + pub title: String, + /// The proposed change as a structured diff (before/after, ids, payload). + pub diff: serde_json::Value, + /// The self-explaining risk signals that opened this PR. + pub signals: Vec, + /// The memory id this PR concerns, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub subject_id: Option, + /// The run that produced this change, linking the PR back to the black box. + #[serde(skip_serializing_if = "Option::is_none")] + pub run_id: Option, + /// RFC3339 creation time. + pub created_at: String, + /// RFC3339 decision time, once decided. + #[serde(skip_serializing_if = "Option::is_none")] + pub decided_at: Option, + /// The action that resolved this PR, once decided. + #[serde(skip_serializing_if = "Option::is_none")] + pub decision: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + fn ordinary() -> WriteContext { + WriteContext { + source: Some(WriteSource::Agent), + node_type: "fact".into(), + content: "The build uses cargo and pnpm.".into(), + tags: vec!["build".into()], + ..Default::default() + } + } + + #[test] + fn ordinary_write_auto_commits_in_risk_gated() { + let (class, signals) = classify_write(&ordinary(), ReviewMode::RiskGated); + assert_eq!(class, RiskClass::AutoCommit); + assert!(signals.is_empty()); + } + + #[test] + fn fast_mode_never_gates_even_risky_writes() { + let mut ctx = ordinary(); + ctx.supersedes = true; + ctx.contradicts_trust = Some(0.95); + let (class, _) = classify_write(&ctx, ReviewMode::Fast); + assert_eq!(class, RiskClass::AutoCommit); + } + + #[test] + fn paranoid_mode_gates_even_ordinary_writes() { + let (class, signals) = classify_write(&ordinary(), ReviewMode::Paranoid); + assert_eq!(class, RiskClass::Review); + assert_eq!(signals[0].code, "paranoid_mode"); + } + + #[test] + fn contradiction_against_high_trust_is_risky() { + let mut ctx = ordinary(); + ctx.contradicts_trust = Some(0.82); + let (class, signals) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + assert!(signals.iter().any(|s| s.code == "contradicts_high_trust")); + } + + #[test] + fn contradiction_against_low_trust_is_fine() { + let mut ctx = ordinary(); + ctx.contradicts_trust = Some(0.3); + let (class, _) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::AutoCommit); + } + + #[test] + fn supersede_forget_merge_protect_all_gate() { + for set in [ + |c: &mut WriteContext| c.supersedes = true, + |c: &mut WriteContext| c.forgets = true, + |c: &mut WriteContext| c.merges = true, + |c: &mut WriteContext| c.protects = true, + ] { + let mut ctx = ordinary(); + set(&mut ctx); + let (class, _) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + } + } + + #[test] + fn sensitive_topics_gate() { + for topic in [ + "remember my auth token is xyz", + "Sam's salary is confidential", + "the bounty payout terms", + "user preference: dark mode", + "this is a security vulnerability", + ] { + let mut ctx = ordinary(); + ctx.content = topic.into(); + let (class, signals) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review, "should gate: {topic}"); + assert!(signals.iter().any(|s| s.code == "sensitive_topic")); + } + } + + #[test] + fn sensitive_node_type_gates() { + let mut ctx = ordinary(); + ctx.node_type = "identity".into(); + let (class, signals) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + assert!(signals.iter().any(|s| s.code == "sensitive_node_type")); + } + + #[test] + fn dream_consolidation_gates() { + let mut ctx = ordinary(); + ctx.source = Some(WriteSource::Dream); + let (class, signals) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + assert!(signals.iter().any(|s| s.code == "dream_consolidation")); + } + + #[test] + fn decayed_resurrection_gates() { + let mut ctx = ordinary(); + ctx.resurrects_decayed = true; + let (class, _) = classify_write(&ctx, ReviewMode::RiskGated); + assert_eq!(class, RiskClass::Review); + } + + #[test] + fn low_confidence_batch_gates_but_confident_batch_does_not() { + let mut low = ordinary(); + low.batch_import = true; + low.confidence = Some(0.3); + assert_eq!( + classify_write(&low, ReviewMode::RiskGated).0, + RiskClass::Review + ); + + let mut high = ordinary(); + high.batch_import = true; + high.confidence = Some(0.9); + assert_eq!( + classify_write(&high, ReviewMode::RiskGated).0, + RiskClass::AutoCommit + ); + } + + #[test] + fn weak_provenance_connector_gates_strong_does_not() { + let mut weak = ordinary(); + weak.source = Some(WriteSource::Connector); + weak.strong_provenance = false; + assert_eq!( + classify_write(&weak, ReviewMode::RiskGated).0, + RiskClass::Review + ); + + let mut strong = ordinary(); + strong.source = Some(WriteSource::Connector); + strong.strong_provenance = true; + assert_eq!( + classify_write(&strong, ReviewMode::RiskGated).0, + RiskClass::AutoCommit + ); + } + + #[test] + fn mode_label_roundtrip() { + assert_eq!(ReviewMode::from_label("FAST"), ReviewMode::Fast); + assert_eq!(ReviewMode::from_label("risk-gated"), ReviewMode::RiskGated); + assert_eq!(ReviewMode::from_label("paranoid"), ReviewMode::Paranoid); + assert_eq!(ReviewMode::from_label("garbage"), ReviewMode::RiskGated); + } + + #[test] + fn action_resulting_status() { + assert_eq!( + MemoryPrAction::Promote.resulting_status(), + Some(MemoryPrStatus::Promoted) + ); + assert_eq!(MemoryPrAction::AskAgentWhy.resulting_status(), None); + } +} diff --git a/crates/vestige-mcp/src/autopilot.rs b/crates/vestige-mcp/src/autopilot.rs index 2db04a8..3e355fb 100644 --- a/crates/vestige-mcp/src/autopilot.rs +++ b/crates/vestige-mcp/src/autopilot.rs @@ -443,7 +443,10 @@ async fn handle_event( | VestigeEvent::ConsolidationCompleted { .. } | VestigeEvent::RetentionDecayed { .. } | VestigeEvent::ConnectionDiscovered { .. } - | VestigeEvent::ActivationSpread { .. } => {} + | VestigeEvent::ActivationSpread { .. } + | VestigeEvent::TraceEvent { .. } + | VestigeEvent::MemoryPrOpened { .. } + | VestigeEvent::MemoryPrDecided { .. } => {} } } diff --git a/crates/vestige-mcp/src/dashboard/events.rs b/crates/vestige-mcp/src/dashboard/events.rs index 8edb238..4d5c8d4 100644 --- a/crates/vestige-mcp/src/dashboard/events.rs +++ b/crates/vestige-mcp/src/dashboard/events.rs @@ -167,6 +167,39 @@ pub enum VestigeEvent { timestamp: DateTime, }, + // -- Agent Black Box (v2.2) -- + // One replayable trace event from an agent run. The dashboard Black Box tab + // appends these to the live timeline and pulses the graph exactly as the + // agent experienced it. The inner event is the canonical + // `vestige_core::MemoryTraceEvent`, serialized with its own `type` tag, so + // the wire shape is `{ "type": "TraceEvent", "data": { "runId": ..., "event": { "type": "mcp.call", ... } } }`. + TraceEvent { + run_id: String, + seq: i64, + event: vestige_core::MemoryTraceEvent, + timestamp: DateTime, + }, + + // -- Memory PRs (v2.2) — the cognitive immune system -- + // A risky write opened a Memory PR. The dashboard raises the PR-queue badge + // and can surface a toast: "Vestige opened a Memory PR — the agent tried to + // rewrite its own brain." + MemoryPrOpened { + id: String, + kind: String, + title: String, + signal_count: usize, + run_id: Option, + timestamp: DateTime, + }, + // A Memory PR was decided (promote / merge / supersede / quarantine / forget). + MemoryPrDecided { + id: String, + decision: String, + status: String, + timestamp: DateTime, + }, + // -- System -- Heartbeat { uptime_secs: u64, diff --git a/crates/vestige-mcp/src/dashboard/handlers.rs b/crates/vestige-mcp/src/dashboard/handlers.rs index 39f80ff..0d6af3c 100644 --- a/crates/vestige-mcp/src/dashboard/handlers.rs +++ b/crates/vestige-mcp/src/dashboard/handlers.rs @@ -1983,6 +1983,288 @@ pub async fn deep_reference_query( Ok(Json(response)) } +// ============================================================================ +// AGENT BLACK BOX (v2.2) — replayable agent-run traces +// ============================================================================ + +#[derive(Debug, Deserialize)] +pub struct TraceListParams { + pub limit: Option, +} + +/// List recent agent runs (newest activity first) for the Black Box run picker. +pub async fn list_traces( + State(state): State, + Query(params): Query, +) -> Result, StatusCode> { + let limit = params.limit.unwrap_or(50).clamp(1, 500); + let runs = state + .storage + .list_agent_runs(limit) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let runs_json: Vec = runs + .into_iter() + .map(|r| { + serde_json::json!({ + "runId": r.run_id, + "firstTool": r.first_tool, + "eventCount": r.event_count, + "retrievedCount": r.retrieved_count, + "suppressedCount": r.suppressed_count, + "writeCount": r.write_count, + "vetoCount": r.veto_count, + "startedAt": r.started_at, + "lastAt": r.last_at, + }) + }) + .collect(); + Ok(Json(serde_json::json!({ + "total": runs_json.len(), + "runs": runs_json, + }))) +} + +/// Fetch the full event timeline for one run — the black-box replay payload. +pub async fn get_trace( + State(state): State, + Path(run_id): Path, +) -> Result, StatusCode> { + let events = state + .storage + .get_trace(&run_id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + if events.is_empty() { + return Err(StatusCode::NOT_FOUND); + } + let summary = state.storage.get_agent_run(&run_id).ok().flatten(); + Ok(Json(serde_json::json!({ + "runId": run_id, + "summary": summary.map(|s| serde_json::json!({ + "firstTool": s.first_tool, + "eventCount": s.event_count, + "retrievedCount": s.retrieved_count, + "suppressedCount": s.suppressed_count, + "writeCount": s.write_count, + "vetoCount": s.veto_count, + "startedAt": s.started_at, + "lastAt": s.last_at, + })), + "events": events, + }))) +} + +/// Export a run as a downloadable `.vestige-trace.json` artifact. +pub async fn export_trace( + State(state): State, + Path(run_id): Path, +) -> Result<([(axum::http::HeaderName, String); 2], Json), StatusCode> { + let events = state + .storage + .get_trace(&run_id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + if events.is_empty() { + return Err(StatusCode::NOT_FOUND); + } + let summary = state.storage.get_agent_run(&run_id).ok().flatten(); + let body = serde_json::json!({ + "format": "vestige-trace", + "version": 1, + "runId": run_id, + "exportedAt": Utc::now().to_rfc3339(), + "summary": summary.map(|s| serde_json::json!({ + "firstTool": s.first_tool, + "eventCount": s.event_count, + "retrievedCount": s.retrieved_count, + "suppressedCount": s.suppressed_count, + "writeCount": s.write_count, + "vetoCount": s.veto_count, + "startedAt": s.started_at, + "lastAt": s.last_at, + })), + "events": events, + }); + let headers = [ + ( + axum::http::header::CONTENT_TYPE, + "application/json".to_string(), + ), + ( + axum::http::header::CONTENT_DISPOSITION, + format!("attachment; filename=\"{run_id}.vestige-trace.json\""), + ), + ]; + Ok((headers, Json(body))) +} + +// ============================================================================ +// MEMORY RECEIPTS (v2.2) +// ============================================================================ + +/// List recent retrieval receipts. +pub async fn list_receipts( + State(state): State, + Query(params): Query, +) -> Result, StatusCode> { + let limit = params.limit.unwrap_or(50).clamp(1, 500); + let receipts = state + .storage + .list_receipts(limit) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Json(serde_json::json!({ + "total": receipts.len(), + "receipts": receipts, + }))) +} + +/// Fetch one receipt by id — the payload behind "Open receipt in Cinema". +pub async fn get_receipt( + State(state): State, + Path(receipt_id): Path, +) -> Result, StatusCode> { + let receipt = state + .storage + .get_receipt(&receipt_id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + Ok(Json(serde_json::to_value(receipt).unwrap_or_default())) +} + +// ============================================================================ +// MEMORY PRs (v2.2) — risk-gated brain-change review queue +// ============================================================================ + +#[derive(Debug, Deserialize)] +pub struct MemoryPrListParams { + pub status: Option, + pub limit: Option, +} + +/// List Memory PRs, optionally filtered by status. +pub async fn list_memory_prs( + State(state): State, + Query(params): Query, +) -> Result, StatusCode> { + let limit = params.limit.unwrap_or(100).clamp(1, 500); + let status = params.status.as_deref().and_then(|s| { + serde_json::from_value::(serde_json::Value::String( + s.to_string(), + )) + .ok() + }); + let prs = state + .storage + .list_memory_prs(status, limit) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let pending = state.storage.count_pending_memory_prs().unwrap_or(0); + Ok(Json(serde_json::json!({ + "total": prs.len(), + "pendingCount": pending, + "mode": read_review_mode(&state).as_str(), + "prs": prs, + }))) +} + +/// Fetch one Memory PR by id. +pub async fn get_memory_pr( + State(state): State, + Path(id): Path, +) -> Result, StatusCode> { + let pr = state + .storage + .get_memory_pr(&id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + Ok(Json(serde_json::to_value(pr).unwrap_or_default())) +} + +/// Act on a Memory PR: promote / merge / supersede / quarantine / forget / +/// ask_agent_why. `ask_agent_why` is read-only and returns the risk signals. +pub async fn act_on_memory_pr( + State(state): State, + Path((id, action)): Path<(String, String)>, +) -> Result, StatusCode> { + let action = vestige_core::MemoryPrAction::from_label(&action) + .ok_or(StatusCode::BAD_REQUEST)?; + + // Ask Agent Why is read-only — return the self-explaining signals. + if matches!(action, vestige_core::MemoryPrAction::AskAgentWhy) { + let pr = state + .storage + .get_memory_pr(&id) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? + .ok_or(StatusCode::NOT_FOUND)?; + return Ok(Json(serde_json::json!({ + "id": pr.id, + "kind": pr.kind.as_str(), + "title": pr.title, + "why": pr.signals, + "explanation": "These are the risk signals that opened this Memory PR.", + }))); + } + + let decided = state + .storage + .decide_memory_pr(&id, action) + .map_err(|_| StatusCode::NOT_FOUND)?; + + state.emit(VestigeEvent::MemoryPrDecided { + id: decided.id.clone(), + decision: decided + .decision + .and_then(|d| serde_json::to_value(d).ok()) + .and_then(|v| v.as_str().map(String::from)) + .unwrap_or_default(), + status: decided.status.as_str().to_string(), + timestamp: Utc::now(), + }); + + Ok(Json(serde_json::to_value(&decided).unwrap_or_default())) +} + +#[derive(Debug, Deserialize)] +pub struct ReviewModeBody { + pub mode: String, +} + +/// Get the current review mode (fast / risk_gated / paranoid). +pub async fn get_review_mode(State(state): State) -> Json { + let mode = read_review_mode(&state); + Json(serde_json::json!({ + "mode": mode.as_str(), + "pendingCount": state.storage.count_pending_memory_prs().unwrap_or(0), + })) +} + +/// Set the review mode. Persisted to a small JSON file in the data dir so it +/// survives restarts (local-first, no extra config service). +pub async fn set_review_mode( + State(state): State, + Json(body): Json, +) -> Result, StatusCode> { + let mode = vestige_core::ReviewMode::from_label(&body.mode); + let path = review_mode_path(&state); + let payload = serde_json::json!({ "mode": mode.as_str() }); + fs::write(&path, serde_json::to_vec_pretty(&payload).unwrap_or_default()) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + Ok(Json(serde_json::json!({ "mode": mode.as_str() }))) +} + +/// Path to the persisted review-mode file. +fn review_mode_path(state: &AppState) -> PathBuf { + state.storage.data_dir().join("review_mode.json") +} + +/// Read the persisted review mode, defaulting to RiskGated. +pub fn read_review_mode(state: &AppState) -> vestige_core::ReviewMode { + let path = review_mode_path(state); + fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| v.get("mode").and_then(|m| m.as_str()).map(String::from)) + .map(|s| vestige_core::ReviewMode::from_label(&s)) + .unwrap_or_default() +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/vestige-mcp/src/dashboard/mod.rs b/crates/vestige-mcp/src/dashboard/mod.rs index e6336cb..55c9662 100644 --- a/crates/vestige-mcp/src/dashboard/mod.rs +++ b/crates/vestige-mcp/src/dashboard/mod.rs @@ -183,6 +183,28 @@ fn build_router_inner(state: AppState, port: u16) -> (Router, AppState) { get(handlers::get_sanhedrin_telemetry), ) .route("/api/sanhedrin/appeal", post(handlers::appeal_sanhedrin)) + // ============================================================ + // AGENT BLACK BOX (v2.2) — replayable agent-run traces + // ============================================================ + .route("/api/traces", get(handlers::list_traces)) + .route("/api/traces/{run_id}", get(handlers::get_trace)) + .route("/api/traces/{run_id}/export", get(handlers::export_trace)) + // ============================================================ + // MEMORY RECEIPTS (v2.2) — the nutrition label for a retrieval + // ============================================================ + .route("/api/receipts", get(handlers::list_receipts)) + .route("/api/receipts/{receipt_id}", get(handlers::get_receipt)) + // ============================================================ + // MEMORY PRs (v2.2) — risk-gated brain-change review queue + // ============================================================ + .route("/api/memory-prs", get(handlers::list_memory_prs)) + .route("/api/memory-prs/{id}", get(handlers::get_memory_pr)) + .route( + "/api/memory-prs/{id}/{action}", + post(handlers::act_on_memory_pr), + ) + .route("/api/memory-prs/mode", get(handlers::get_review_mode)) + .route("/api/memory-prs/mode", post(handlers::set_review_mode)) .layer( ServiceBuilder::new() .concurrency_limit(50) diff --git a/crates/vestige-mcp/src/lib.rs b/crates/vestige-mcp/src/lib.rs index 8784409..1df4fc5 100644 --- a/crates/vestige-mcp/src/lib.rs +++ b/crates/vestige-mcp/src/lib.rs @@ -9,3 +9,4 @@ pub mod protocol; pub mod resources; pub mod server; pub mod tools; +pub mod trace_recorder; diff --git a/crates/vestige-mcp/src/resources/mod.rs b/crates/vestige-mcp/src/resources/mod.rs index e021c06..63b728f 100644 --- a/crates/vestige-mcp/src/resources/mod.rs +++ b/crates/vestige-mcp/src/resources/mod.rs @@ -4,3 +4,4 @@ pub mod codebase; pub mod memory; +pub mod trace; diff --git a/crates/vestige-mcp/src/resources/trace.rs b/crates/vestige-mcp/src/resources/trace.rs new file mode 100644 index 0000000..e6f0e35 --- /dev/null +++ b/crates/vestige-mcp/src/resources/trace.rs @@ -0,0 +1,103 @@ +//! Agent Black Box Resources +//! +//! `trace://` URI scheme — exposes replayable agent-run traces as MCP resources +//! so a coding agent can read its *own* black box back. This closes the trace +//! correlation spine on the MCP side: the same `runId` an agent received in a +//! tool result's `traceUri` resolves here to the full event timeline. +//! +//! - `trace://{runId}` — the full ordered event log for a run. +//! - `trace://{runId}/summary` — just the roll-up counts. +//! - `trace://runs` — recent runs (the run picker). +//! - `trace://latest` — the most recently active run's full trace. + +use std::sync::Arc; + +use vestige_core::Storage; + +/// Read a `trace://` resource. +pub async fn read(storage: &Arc, uri: &str) -> Result { + let path = uri.strip_prefix("trace://").unwrap_or(""); + let (path, _query) = match path.split_once('?') { + Some((p, q)) => (p, Some(q)), + None => (path, None), + }; + + match path { + "" | "runs" => read_runs(storage).await, + "latest" => read_latest(storage).await, + other => { + if let Some(run_id) = other.strip_suffix("/summary") { + read_summary(storage, run_id).await + } else { + read_run(storage, other).await + } + } + } +} + +async fn read_runs(storage: &Arc) -> Result { + let runs = storage.list_agent_runs(50).map_err(|e| e.to_string())?; + let json: Vec<_> = runs + .into_iter() + .map(|r| { + serde_json::json!({ + "runId": r.run_id, + "firstTool": r.first_tool, + "eventCount": r.event_count, + "retrievedCount": r.retrieved_count, + "suppressedCount": r.suppressed_count, + "writeCount": r.write_count, + "vetoCount": r.veto_count, + "startedAt": r.started_at, + "lastAt": r.last_at, + }) + }) + .collect(); + serde_json::to_string_pretty(&serde_json::json!({ "runs": json })) + .map_err(|e| e.to_string()) +} + +async fn read_latest(storage: &Arc) -> Result { + let runs = storage.list_agent_runs(1).map_err(|e| e.to_string())?; + let run = runs + .into_iter() + .next() + .ok_or_else(|| "No agent runs recorded yet".to_string())?; + read_run(storage, &run.run_id).await +} + +async fn read_run(storage: &Arc, run_id: &str) -> Result { + let events = storage.get_trace(run_id).map_err(|e| e.to_string())?; + if events.is_empty() { + return Err(format!("No trace found for run: {run_id}")); + } + let summary = storage.get_agent_run(run_id).ok().flatten(); + let body = serde_json::json!({ + "runId": run_id, + "summary": summary.map(summary_json), + "events": events, + }); + serde_json::to_string_pretty(&body).map_err(|e| e.to_string()) +} + +async fn read_summary(storage: &Arc, run_id: &str) -> Result { + let summary = storage + .get_agent_run(run_id) + .map_err(|e| e.to_string())? + .ok_or_else(|| format!("No run: {run_id}"))?; + serde_json::to_string_pretty(&summary_json(summary)).map_err(|e| e.to_string()) +} + +fn summary_json(s: vestige_core::AgentRunSummary) -> serde_json::Value { + serde_json::json!({ + "runId": s.run_id, + "firstTool": s.first_tool, + "eventCount": s.event_count, + "retrievedCount": s.retrieved_count, + "suppressedCount": s.suppressed_count, + "writeCount": s.write_count, + "vetoCount": s.veto_count, + "startedAt": s.started_at, + "lastAt": s.last_at, + }) +} diff --git a/crates/vestige-mcp/src/server.rs b/crates/vestige-mcp/src/server.rs index 6b919fa..d6ca897 100644 --- a/crates/vestige-mcp/src/server.rs +++ b/crates/vestige-mcp/src/server.rs @@ -129,6 +129,23 @@ impl McpServer { } } + /// Read the active Memory PR review mode from `/review_mode.json`, + /// defaulting to `RiskGated`. Shared shape with the dashboard handler so the + /// MCP write path and the UI agree on the mode. + fn review_mode(&self) -> vestige_core::ReviewMode { + let path = self.storage.data_dir().join("review_mode.json"); + std::fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| { + v.get("mode") + .and_then(|m| m.as_str()) + .map(|s| s.to_string()) + }) + .map(|s| vestige_core::ReviewMode::from_label(&s)) + .unwrap_or_default() + } + /// Handle an incoming JSON-RPC request pub async fn handle_request(&mut self, request: JsonRpcRequest) -> Option { debug!("Handling request: {}", request.method); @@ -240,8 +257,8 @@ impl McpServer { /// Handle tools/list request async fn handle_tools_list(&self) -> Result { - // v2.1.21: 25 tools (verified by the `tools.len() == 25` assertion in the - // handle_tools_list test below — the `suppress` tool landed in v2.0.5). + // v2.1.27: 34 tools (verified by the `tools.len() == 34` assertion in the + // handle_tools_list test below). // Deprecated tools still work via redirects in handle_tools_call. let mut tools = vec![ // ================================================================ @@ -503,7 +520,7 @@ impl McpServer { // Per-tool caps below are sized at ~2× observed peak with growth // headroom; max permitted by Anthropic is 500_000. Only the four // empirically-measured high-payload tools carry the annotation today; - // the remaining 21 tools deliberately do NOT (cargo-cult prevention — + // the remaining 30 tools deliberately do NOT (cargo-cult prevention — // annotating a small-payload tool dilutes the signal). // // Other tools that COULD plausibly grow into the annotated set with @@ -563,6 +580,21 @@ impl McpServer { None }; + // ================================================================ + // AGENT BLACK BOX (v2.2) + // Open/continue a run for this call and record the opening `mcp.call` + // event (args are hashed, never stored raw). Downstream memory events + // are recorded from the result after dispatch. + // ================================================================ + let run_id = crate::trace_recorder::run_id_for(&request.arguments); + crate::trace_recorder::record_call( + &self.storage, + self.event_tx.as_ref(), + &run_id, + &request.name, + &request.arguments, + ); + let result = match request.name.as_str() { // ================================================================ // UNIFIED TOOLS (v1.1+) - Preferred API @@ -1083,10 +1115,77 @@ impl McpServer { // ================================================================ if let Ok(ref content) = result { self.emit_tool_event(&request.name, &saved_args, content); + // Black Box: record the downstream memory events (retrieve / + // suppress / write / veto / dream) the agent experienced. + crate::trace_recorder::record_result( + &self.storage, + self.event_tx.as_ref(), + &run_id, + &request.name, + content, + ); } + // ================================================================ + // RISK-GATED MEMORY PRs (v2.2) — the cognitive immune system + // Normal writes auto-land; risky writes (contradiction vs high-trust, + // supersede/forget/merge, sensitive topics, …) are quarantined and a + // Memory PR is opened. Computed here so the gate stays centralized and + // tools remain untouched. + // ================================================================ + let opened_prs = if let Ok(ref content) = result { + crate::trace_recorder::gate_writes( + &self.storage, + self.event_tx.as_ref(), + &run_id, + &request.name, + content, + self.review_mode(), + ) + } else { + Vec::new() + }; + let response = match result { - Ok(content) => { + Ok(mut content) => { + // ============================================================ + // TRACE SPINE (Phase 0) + // Stamp the runId + a pointer to the full trace onto the tool + // output itself. This is the first hop of the correlation + // chain: the same runId now appears in the tool result, the + // SQLite trace rows, the WebSocket events, /api/traces/{runId}, + // and vestige://trace/{runId}. One id, end to end. + // ============================================================ + // Memory Receipt: for retrieval tools, build + persist a + // receipt from what the tool already computed and attach it. + // Done before the runId stamp so the receipt's own suppressed + // list is part of the same payload the agent reads. + let receipt = + crate::trace_recorder::build_and_save_receipt(&self.storage, &run_id, &request.name, &content); + if let Some(obj) = content.as_object_mut() { + obj.insert("runId".to_string(), serde_json::json!(run_id)); + obj.insert( + "traceUri".to_string(), + serde_json::json!(format!("vestige://trace/{run_id}")), + ); + if let Some(r) = receipt { + obj.insert("receipt".to_string(), r); + } + // Surface opened Memory PRs so the agent learns its risky + // write is held for review, not silently committed. + if !opened_prs.is_empty() { + obj.insert( + "memoryPrsOpened".to_string(), + serde_json::json!(opened_prs), + ); + obj.insert( + "memoryPrNotice".to_string(), + serde_json::json!( + "Vestige opened a Memory PR — this write touches the agent's own brain and is held for review. See the Memory PRs queue." + ), + ); + } + } let call_result = CallToolResult { content: vec![crate::protocol::messages::ToolResultContent { content_type: "text".to_string(), @@ -1228,6 +1327,27 @@ impl McpServer { description: Some("Intentions that have been triggered or are overdue".to_string()), mime_type: Some("application/json".to_string()), }, + // Agent Black Box (v2.2) — replayable agent-run traces. Individual + // runs are read via the templated `vestige://trace/{runId}` (or + // `trace://{runId}`) URI; these concrete entries list the runs and + // the latest trace so a client can discover them. + ResourceDescription { + uri: "trace://runs".to_string(), + name: "Agent Runs (Black Box)".to_string(), + description: Some( + "Recent agent runs. Read vestige://trace/{runId} for a full replayable trace." + .to_string(), + ), + mime_type: Some("application/json".to_string()), + }, + ResourceDescription { + uri: "trace://latest".to_string(), + name: "Latest Agent Trace".to_string(), + description: Some( + "The most recently active agent run's full black-box trace.".to_string(), + ), + mime_type: Some("application/json".to_string()), + }, ]; let result = ListResourcesResult { resources }; @@ -1250,7 +1370,17 @@ impl McpServer { // OpenCode and other MCP clients may send "vestige/memory://recent" // but we register resources as "memory://recent" let normalized_uri = uri.strip_prefix("vestige/").unwrap_or(uri); - let content = if normalized_uri.starts_with("memory://") { + // The trace resource is specced as `vestige://trace/{runId}`. Accept + // both that form and the bare `trace://{runId}` scheme, normalizing the + // former to the latter so the resource module sees one shape. + let trace_uri = normalized_uri + .strip_prefix("vestige://trace/") + .map(|rest| format!("trace://{rest}")); + let content = if let Some(ref tu) = trace_uri { + resources::trace::read(&self.storage, tu).await + } else if normalized_uri.starts_with("trace://") { + resources::trace::read(&self.storage, normalized_uri).await + } else if normalized_uri.starts_with("memory://") { resources::memory::read(&self.storage, normalized_uri).await } else if normalized_uri.starts_with("codebase://") { resources::codebase::read(&self.storage, normalized_uri).await @@ -1820,9 +1950,9 @@ mod tests { let result = response.result.unwrap(); let tools = result["tools"].as_array().unwrap(); - // 34 tools: 25 from v2.1.21 + 7 Phase 3 merge/supersede tools - // (merge_candidates, plan_merge, plan_supersede, apply_plan, merge_undo, - // protect, merge_policy, composed_graph) + 1 connector tool (source_sync, #57). + // 34 tools in v2.1.27: the unified memory surface, Phase 3 + // merge/supersede controls, ComposedGraph, and the #57 source_sync + // connector tool. assert_eq!(tools.len(), 34, "Expected exactly 34 tools"); let tool_names: Vec<&str> = tools.iter().map(|t| t["name"].as_str().unwrap()).collect(); @@ -2248,4 +2378,133 @@ mod tests { "search tool has un-renamed `meta` key (regression — serde rename broke)" ); } + + // ======================================================================== + // TRACE SPINE (Phase 0) — one runId, end to end + // ======================================================================== + + /// Every tools/call must stamp a runId + a trace pointer onto its output, + /// persist an `mcp.call` trace row under that same runId, and that runId + /// must resolve through the `vestige://trace/{runId}` resource. This is the + /// load-bearing correlation guarantee. + #[tokio::test] + async fn test_trace_spine_runid_end_to_end() { + let (mut server, _dir) = test_server().await; + server + .handle_request(make_request("initialize", Some(init_params()))) + .await; + + // A client-supplied runId must be honoured so a whole session + // correlates under one id. + let call = make_request( + "tools/call", + Some(serde_json::json!({ + "name": "memory_health", + "arguments": { "runId": "run_spine_test" } + })), + ); + let response = server.handle_request(call).await.unwrap(); + let result = response.result.expect("tools/call ok"); + + // 1. The tool output itself carries the runId + trace pointer. + let structured = &result["structuredContent"]; + assert_eq!( + structured["runId"].as_str(), + Some("run_spine_test"), + "tool output must echo the runId (spine hop 1)" + ); + assert_eq!( + structured["traceUri"].as_str(), + Some("vestige://trace/run_spine_test"), + "tool output must carry the trace resource pointer" + ); + + // 2. The same runId persisted a trace row (the mcp.call event). + let events = server.storage.get_trace("run_spine_test").unwrap(); + assert!( + events.iter().any(|e| e.kind() == "mcp.call"), + "an mcp.call event must be persisted under the runId (spine hop 2)" + ); + + // 3. The run roll-up exists with the right entry tool. + let run = server + .storage + .get_agent_run("run_spine_test") + .unwrap() + .expect("run summary persisted"); + assert_eq!(run.first_tool.as_deref(), Some("memory_health")); + + // 4. The MCP resource resolves the same runId (spine hop 3). + let read = make_request( + "resources/read", + Some(serde_json::json!({ "uri": "vestige://trace/run_spine_test" })), + ); + let read_resp = server.handle_request(read).await.unwrap(); + let read_result = read_resp.result.expect("resource read ok"); + let text = read_result["contents"][0]["text"] + .as_str() + .expect("resource text"); + assert!( + text.contains("run_spine_test") && text.contains("mcp.call"), + "vestige://trace/{{runId}} must return the run's events" + ); + } + + /// Trace events must be broadcast to a live WebSocket subscriber, not just + /// persisted. This guards the spine hop from SQLite → WebSocket → pulse. + #[tokio::test] + async fn test_trace_event_is_broadcast_to_subscriber() { + let (storage, _dir) = test_storage().await; + let cognitive = Arc::new(Mutex::new(CognitiveEngine::new())); + let (event_tx, mut event_rx) = broadcast::channel(64); + let mut server = McpServer::new_with_events(storage, cognitive, event_tx); + server + .handle_request(make_request("initialize", Some(init_params()))) + .await; + + let call = make_request( + "tools/call", + Some(serde_json::json!({ + "name": "memory_health", + "arguments": { "runId": "run_ws" } + })), + ); + server.handle_request(call).await.unwrap(); + + // Drain the broadcast: at least one TraceEvent for run_ws must arrive. + let mut saw_trace = false; + while let Ok(ev) = event_rx.try_recv() { + if let VestigeEvent::TraceEvent { run_id, .. } = ev { + if run_id == "run_ws" { + saw_trace = true; + } + } + } + assert!( + saw_trace, + "a TraceEvent for the run must be broadcast to subscribers (spine hop: WebSocket)" + ); + } + + /// Risk-gated Memory PRs default: an ordinary tool call opens no PR. + #[tokio::test] + async fn test_no_memory_pr_for_non_write_tool() { + let (mut server, _dir) = test_server().await; + server + .handle_request(make_request("initialize", Some(init_params()))) + .await; + let call = make_request( + "tools/call", + Some(serde_json::json!({ + "name": "memory_health", + "arguments": { "runId": "run_no_pr" } + })), + ); + server.handle_request(call).await.unwrap(); + assert_eq!( + server.storage.count_pending_memory_prs().unwrap(), + 0, + "a read-only tool must never open a Memory PR" + ); + } } diff --git a/crates/vestige-mcp/src/trace_recorder.rs b/crates/vestige-mcp/src/trace_recorder.rs new file mode 100644 index 0000000..f4ee931 --- /dev/null +++ b/crates/vestige-mcp/src/trace_recorder.rs @@ -0,0 +1,704 @@ +//! # Trace Recorder — the live black-box wiring +//! +//! Bridges an MCP `tools/call` to the persisted black box. For each call the +//! recorder: +//! +//! 1. derives a stable `runId` (client-supplied `runId`/`run_id` arg if present, +//! else a fresh `run_` UUID), +//! 2. records an `mcp.call` event with a **hash** of the args (never the raw +//! args, so traces can't leak prompt contents or secrets), +//! 3. after the tool returns, inspects the result JSON and records the +//! downstream events the agent experienced — `memory.retrieve` (with +//! per-id activation), `memory.suppress` (with reason), `sanhedrin.veto`, +//! `dream.patch`, +//! 4. persists every event to `agent_traces` and broadcasts it over the +//! dashboard event channel so the Black Box tab updates live. +//! +//! The recorder is best-effort: a persistence error never fails the tool call. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use chrono::Utc; +use serde_json::Value; +use tokio::sync::broadcast; + +use crate::dashboard::events::VestigeEvent; +use vestige_core::{ + MemoryTraceEvent, Receipt, Storage, SuppressReason, SuppressedReceiptEntry, WriteSource, +}; + +/// Tools that write to memory and are therefore subject to risk-gated review. +fn is_write_tool(tool: &str) -> bool { + matches!( + tool, + "smart_ingest" | "ingest" | "session_checkpoint" | "memory" + ) +} + +/// Risk-gate the writes in a tool result. For each write the tool just made, +/// build a [`vestige_core::WriteContext`], classify it under the active +/// [`vestige_core::ReviewMode`], and — if risky — quarantine the just-written +/// node (suppress it so it is not used for retrieval until reviewed) and open a +/// [`vestige_core::MemoryPr`]. Normal writes are left untouched: they auto-land, +/// and they already got a receipt. +/// +/// Returns the list of opened-PR summaries (id, kind, title, signals) so the +/// caller can annotate the tool response and emit `MemoryPrOpened` events. +pub fn gate_writes( + storage: &Arc, + event_tx: Option<&broadcast::Sender>, + run_id: &str, + tool: &str, + result: &serde_json::Value, + mode: vestige_core::ReviewMode, +) -> Vec { + use vestige_core::{ + classify_write, MemoryPr, MemoryPrKind, MemoryPrStatus, RiskClass, WriteContext, + }; + + if !is_write_tool(tool) { + return Vec::new(); + } + + let mut opened = Vec::new(); + + // Collect each (id, decision) write the tool reported. + let writes = extract_writes(result); + for (id, decision) in writes { + // Pull the just-written node to inspect its real content/type/tags. + let node = match storage.get_node(&id) { + Ok(Some(n)) => n, + _ => continue, + }; + + // A decision of supersede/replace/merge means the write overwrote an + // existing memory — the strongest risk signal. Look up the trust of the + // memory it superseded so the gate can weigh it. + let (supersedes, merges) = match decision.as_str() { + "supersede" | "replace" => (true, false), + "merge" => (false, true), + _ => (false, false), + }; + // If this superseded something, treat the contradiction as against a + // high-trust memory when the *new* node's own retention is high (the + // pipeline only supersedes when confident). This keeps the gate honest + // without a second DB round-trip per write. + let contradicts_trust = if supersedes { + Some(node.retention_strength.max(0.7)) + } else { + None + }; + + let ctx = WriteContext { + source: Some(WriteSource::Agent), + node_type: node.node_type.clone(), + content: node.content.clone(), + tags: node.tags.clone(), + contradicts_trust, + supersedes, + merges, + ..Default::default() + }; + + let (class, signals) = classify_write(&ctx, mode); + if class != RiskClass::Review { + continue; + } + + // Quarantine the just-written node: suppress it so it is held out of + // retrieval until the PR is decided. Best-effort. + let _ = storage.suppress_memory(&id); + + let kind = match decision.as_str() { + "supersede" | "replace" => MemoryPrKind::MemorySuperseded, + "merge" => MemoryPrKind::DreamConsolidation, + _ if contradicts_trust.is_some() => MemoryPrKind::ContradictionDetected, + _ => MemoryPrKind::NewFact, + }; + let title = format!( + "{}: \"{}\"", + pr_kind_phrase(kind), + node.content.chars().take(80).collect::() + ); + let pr = MemoryPr { + id: format!("pr_{}", uuid::Uuid::new_v4().simple()), + kind, + status: MemoryPrStatus::Pending, + title: title.clone(), + diff: serde_json::json!({ + "decision": decision, + "node": { + "id": node.id, + "nodeType": node.node_type, + "content": node.content, + "tags": node.tags, + }, + }), + signals: signals.clone(), + subject_id: Some(id.clone()), + run_id: Some(run_id.to_string()), + created_at: Utc::now().to_rfc3339(), + decided_at: None, + decision: None, + }; + + if let Err(e) = storage.save_memory_pr(&pr) { + tracing::warn!("memory PR save failed: {e}"); + continue; + } + + if let Some(tx) = event_tx { + let _ = tx.send(VestigeEvent::MemoryPrOpened { + id: pr.id.clone(), + kind: kind.as_str().to_string(), + title, + signal_count: signals.len(), + run_id: Some(run_id.to_string()), + timestamp: Utc::now(), + }); + } + + opened.push(serde_json::json!({ + "id": pr.id, + "kind": kind.as_str(), + "title": pr.title, + "signals": signals, + "subjectId": id, + })); + } + + opened +} + +fn pr_kind_phrase(kind: vestige_core::MemoryPrKind) -> &'static str { + use vestige_core::MemoryPrKind::*; + match kind { + NewFact => "New fact pending review", + StrengthenedFact => "Strengthened fact", + ContradictionDetected => "Contradiction with existing memory", + MemorySuperseded => "Supersede existing memory", + EdgeAdded => "New edge", + NodeDecayed => "Decayed node", + DreamConsolidation => "Consolidation proposal", + } +} + +/// Tools whose output warrants a retrieval receipt. +fn is_retrieval_tool(tool: &str) -> bool { + matches!( + tool, + "deep_reference" | "cross_reference" | "search" | "explore_connections" + ) +} + +/// Build a [`Receipt`] from a retrieval tool's response JSON, persist it, and +/// return it as JSON ready to attach to that response. Reuses exactly the data +/// the tool already computed (retrieved ids + trust, suppressed ids + reason, +/// the activation path) — so the receipt is the auditable "nutrition label" for +/// the answer and costs nothing extra to produce. +/// +/// Returns `None` for non-retrieval tools or empty results. Best-effort +/// persistence: a storage error is logged, the receipt is still returned. +pub fn build_and_save_receipt( + storage: &Arc, + run_id: &str, + tool: &str, + result: &serde_json::Value, +) -> Option { + if !is_retrieval_tool(tool) { + return None; + } + + let (retrieved, activation) = extract_retrieved(result); + if retrieved.is_empty() { + return None; + } + let trust_scores: Vec = retrieved + .iter() + .map(|id| activation.get(id).copied().unwrap_or(0.0)) + .collect(); + + let suppressed: Vec = extract_suppressed(result) + .into_iter() + .map(|(id, reason)| SuppressedReceiptEntry::new(id, reason)) + .collect(); + + // The activation path: the run's reasoning chain if present, else a simple + // best-first chain of the retrieved ids. + let activation_path = result + .get("reasoning") + .and_then(|v| v.as_str()) + .map(|s| vec![s.to_string()]) + .unwrap_or_else(|| { + if retrieved.len() > 1 { + vec![retrieved.join(" -> ")] + } else { + Vec::new() + } + }); + + let query = result.get("query").and_then(|v| v.as_str()); + + let receipt = Receipt::build( + Utc::now(), + run_id, + retrieved, + suppressed, + activation_path, + &trust_scores, + Vec::new(), + ); + if let Err(e) = storage.save_receipt(&receipt, Some(run_id), Some(tool), query) { + tracing::warn!("receipt save failed: {e}"); + } + Some(serde_json::to_value(&receipt).unwrap_or(serde_json::Value::Null)) +} + +/// Derive the run id for a tool call. Honours a client-supplied `runId` / +/// `run_id` argument (so an agent can correlate a whole session's calls); +/// otherwise mints a fresh one. +pub fn run_id_for(args: &Option) -> String { + if let Some(a) = args { + for key in ["runId", "run_id"] { + if let Some(s) = a.get(key).and_then(|v| v.as_str()) + && !s.is_empty() + { + return s.to_string(); + } + } + } + format!("run_{}", uuid::Uuid::new_v4().simple()) +} + +/// A 64-bit FNV-1a hex fingerprint of the tool arguments — the +/// privacy-preserving stand-in stored on `mcp.call` events. We only need a +/// stable, collision-resistant-enough identifier for "same args → same hash" +/// in the trace, not a cryptographic digest, so a dependency-free FNV-1a keeps +/// the crate lean. +pub fn hash_args(args: &Option) -> String { + let bytes = match args { + Some(v) => serde_json::to_vec(v).unwrap_or_default(), + None => Vec::new(), + }; + const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325; + const FNV_PRIME: u64 = 0x0000_0100_0000_01b3; + let mut hash = FNV_OFFSET; + for b in &bytes { + hash ^= u64::from(*b); + hash = hash.wrapping_mul(FNV_PRIME); + } + format!("{:016x}", hash) +} + +/// Persist one trace event and broadcast it to the dashboard. Best-effort: +/// storage failures are logged, never propagated. +pub fn record( + storage: &Arc, + event_tx: Option<&broadcast::Sender>, + event: MemoryTraceEvent, +) { + let event = event.with_at(Utc::now().timestamp_millis()); + let seq = match storage.append_trace_event(&event) { + Ok(seq) => seq, + Err(e) => { + tracing::warn!("trace append failed: {e}"); + return; + } + }; + if let Some(tx) = event_tx { + let _ = tx.send(VestigeEvent::TraceEvent { + run_id: event.run_id().to_string(), + seq, + event, + timestamp: Utc::now(), + }); + } +} + +/// Record the opening `mcp.call` event for a tool invocation. +pub fn record_call( + storage: &Arc, + event_tx: Option<&broadcast::Sender>, + run_id: &str, + tool: &str, + args: &Option, +) { + record( + storage, + event_tx, + MemoryTraceEvent::McpCall { + run_id: run_id.to_string(), + tool: tool.to_string(), + args_hash: hash_args(args), + at: 0, + }, + ); +} + +/// Inspect a successful tool result and record the downstream memory events the +/// agent experienced (retrieve / suppress / veto / dream). Tool-output shapes +/// are matched leniently so this stays robust as tools evolve. +pub fn record_result( + storage: &Arc, + event_tx: Option<&broadcast::Sender>, + run_id: &str, + tool: &str, + result: &Value, +) { + // --- memory.retrieve: ids + per-id activation --- + let (ids, activation) = extract_retrieved(result); + if !ids.is_empty() { + record( + storage, + event_tx, + MemoryTraceEvent::MemoryRetrieve { + run_id: run_id.to_string(), + ids, + activation, + at: 0, + }, + ); + } + + // --- memory.suppress: each suppressed id + reason --- + for (id, reason) in extract_suppressed(result) { + record( + storage, + event_tx, + MemoryTraceEvent::MemorySuppress { + run_id: run_id.to_string(), + id, + reason, + at: 0, + }, + ); + } + + // --- memory.write: writes performed by ingest-like tools --- + for (id, decision) in extract_writes(result) { + record( + storage, + event_tx, + MemoryTraceEvent::MemoryWrite { + run_id: run_id.to_string(), + id, + diff: serde_json::json!({ "decision": decision }), + source: WriteSource::Agent, + at: 0, + }, + ); + } + + // --- contradiction.detected: each contradiction pair the agent faced --- + for (ids, winner_id, detail) in extract_contradictions(result) { + record( + storage, + event_tx, + MemoryTraceEvent::ContradictionDetected { + run_id: run_id.to_string(), + ids, + winner_id, + detail, + at: 0, + }, + ); + } + + // --- sanhedrin.veto: a blocked claim --- + if let Some((claim, evidence_ids, confidence)) = extract_veto(result) { + record( + storage, + event_tx, + MemoryTraceEvent::SanhedrinVeto { + run_id: run_id.to_string(), + claim, + evidence_ids, + confidence, + at: 0, + }, + ); + } + + // --- dream.patch: consolidation proposals --- + let proposal_ids = extract_dream_proposals(result, tool); + if !proposal_ids.is_empty() { + record( + storage, + event_tx, + MemoryTraceEvent::DreamPatch { + run_id: run_id.to_string(), + proposal_ids, + at: 0, + }, + ); + } +} + +/// Pull retrieved memory ids + their activation/score from a search-like or +/// deep_reference-like result. +fn extract_retrieved(result: &Value) -> (Vec, BTreeMap) { + let mut ids = Vec::new(); + let mut activation = BTreeMap::new(); + + // search_unified: { results: [{ id, score|activation, ... }] } + if let Some(arr) = result.get("results").and_then(|r| r.as_array()) { + for item in arr { + if let Some(id) = item.get("id").and_then(|v| v.as_str()) { + ids.push(id.to_string()); + let act = item + .get("activation") + .or_else(|| item.get("score")) + .and_then(|v| v.as_f64()); + if let Some(a) = act { + activation.insert(id.to_string(), a); + } + } + } + } + + // deep_reference: { evidence: [{ id, trust, ... }], recommended: { memory_id } } + if ids.is_empty() + && let Some(arr) = result.get("evidence").and_then(|r| r.as_array()) + { + for item in arr { + if let Some(id) = item.get("id").and_then(|v| v.as_str()) { + ids.push(id.to_string()); + if let Some(t) = item.get("trust").and_then(|v| v.as_f64()) { + activation.insert(id.to_string(), t); + } + } + } + } + + (ids, activation) +} + +/// Pull suppressed entries from a result. Recognises both the deep_reference +/// `superseded`/`contradictions` shapes and the explicit receipt `suppressed` +/// list `[{ id, reason }]`. +fn extract_suppressed(result: &Value) -> Vec<(String, SuppressReason)> { + let mut out = Vec::new(); + + if let Some(arr) = result + .get("receipt") + .and_then(|r| r.get("suppressed")) + .and_then(|s| s.as_array()) + { + for item in arr { + if let Some(id) = item.get("id").and_then(|v| v.as_str()) { + let reason = item + .get("reason") + .and_then(|v| v.as_str()) + .map(parse_suppress_reason) + .unwrap_or(SuppressReason::LowTrust); + out.push((id.to_string(), reason)); + } + } + } + + // deep_reference surfaces superseded ids directly. + if let Some(arr) = result.get("superseded").and_then(|s| s.as_array()) { + for item in arr { + let id = item + .get("id") + .and_then(|v| v.as_str()) + .or_else(|| item.as_str()); + if let Some(id) = id { + out.push((id.to_string(), SuppressReason::Contradicted)); + } + } + } + + out +} + +fn parse_suppress_reason(s: &str) -> SuppressReason { + match s { + "low_trust" => SuppressReason::LowTrust, + "decayed" => SuppressReason::Decayed, + "contradicted" => SuppressReason::Contradicted, + "privacy" => SuppressReason::Privacy, + "competition" => SuppressReason::Competition, + _ => SuppressReason::LowTrust, + } +} + +/// Pull writes from an ingest-like result (single `decision`+`nodeId` or a +/// `results` batch). +fn extract_writes(result: &Value) -> Vec<(String, String)> { + let mut out = Vec::new(); + let push = |out: &mut Vec<(String, String)>, item: &Value| { + let decision = item.get("decision").and_then(|v| v.as_str()); + let id = item + .get("nodeId") + .or_else(|| item.get("id")) + .and_then(|v| v.as_str()); + if let (Some(d), Some(id)) = (decision, id) { + out.push((id.to_string(), d.to_string())); + } + }; + push(&mut out, result); + if let Some(arr) = result.get("results").and_then(|r| r.as_array()) { + for item in arr { + push(&mut out, item); + } + } + out +} + +/// Pull contradiction pairs from a deep_reference result. Each entry is +/// `{ stronger: {id, ...}, weaker: {id, ...}, topic_overlap }`; the `stronger` +/// memory is the winner the agent trusted. +fn extract_contradictions(result: &Value) -> Vec<(Vec, Option, String)> { + let mut out = Vec::new(); + let Some(arr) = result.get("contradictions").and_then(|c| c.as_array()) else { + return out; + }; + for item in arr { + let stronger = item + .get("stronger") + .and_then(|s| s.get("id")) + .and_then(|v| v.as_str()); + let weaker = item + .get("weaker") + .and_then(|s| s.get("id")) + .and_then(|v| v.as_str()); + let (Some(s), Some(w)) = (stronger, weaker) else { + continue; + }; + let detail = format!( + "Contradiction: kept {s} over {w}{}", + item.get("topic_overlap") + .and_then(|v| v.as_f64()) + .map(|o| format!(" (topic overlap {:.0}%)", o * 100.0)) + .unwrap_or_default() + ); + out.push(( + vec![s.to_string(), w.to_string()], + Some(s.to_string()), + detail, + )); + } + out +} + +/// Pull a Sanhedrin-style veto, if the result carries one. +fn extract_veto(result: &Value) -> Option<(String, Vec, f64)> { + let veto = result.get("veto").or_else(|| result.get("sanhedrin"))?; + let claim = veto + .get("claim") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if claim.is_empty() { + return None; + } + let evidence_ids = veto + .get("evidenceIds") + .or_else(|| veto.get("evidence_ids")) + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect() + }) + .unwrap_or_default(); + let confidence = veto.get("confidence").and_then(|v| v.as_f64()).unwrap_or(0.0); + Some((claim, evidence_ids, confidence)) +} + +/// Pull dream consolidation proposal ids from a dream tool result. +fn extract_dream_proposals(result: &Value, tool: &str) -> Vec { + if tool != "dream" && tool != "consolidate" { + return Vec::new(); + } + let mut out = Vec::new(); + for key in ["proposalIds", "proposals", "insights", "connections"] { + if let Some(arr) = result.get(key).and_then(|v| v.as_array()) { + for item in arr { + if let Some(id) = item + .get("id") + .and_then(|v| v.as_str()) + .or_else(|| item.as_str()) + { + out.push(id.to_string()); + } + } + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn run_id_honours_client_supplied() { + let args = Some(serde_json::json!({ "runId": "run_session_7" })); + assert_eq!(run_id_for(&args), "run_session_7"); + } + + #[test] + fn run_id_mints_when_absent() { + let id = run_id_for(&None); + assert!(id.starts_with("run_")); + assert!(id.len() > 10); + } + + #[test] + fn hash_is_stable_and_hides_content() { + let args = Some(serde_json::json!({ "query": "my secret prompt" })); + let h1 = hash_args(&args); + let h2 = hash_args(&args); + assert_eq!(h1, h2); + assert!(!h1.contains("secret")); + assert_eq!(h1.len(), 16); + } + + #[test] + fn extract_retrieved_from_search_shape() { + let r = serde_json::json!({ + "results": [ + { "id": "m1", "score": 0.9 }, + { "id": "m2", "activation": 0.4 } + ] + }); + let (ids, act) = extract_retrieved(&r); + assert_eq!(ids, vec!["m1", "m2"]); + assert_eq!(act["m1"], 0.9); + assert_eq!(act["m2"], 0.4); + } + + #[test] + fn extract_retrieved_from_deep_reference_shape() { + let r = serde_json::json!({ + "evidence": [ { "id": "e1", "trust": 0.7 } ] + }); + let (ids, act) = extract_retrieved(&r); + assert_eq!(ids, vec!["e1"]); + assert_eq!(act["e1"], 0.7); + } + + #[test] + fn extract_suppressed_from_receipt_and_superseded() { + let r = serde_json::json!({ + "receipt": { "suppressed": [ { "id": "s1", "reason": "contradicted" } ] }, + "superseded": [ { "id": "s2" } ] + }); + let out = extract_suppressed(&r); + assert!(out.contains(&("s1".to_string(), SuppressReason::Contradicted))); + assert!(out.contains(&("s2".to_string(), SuppressReason::Contradicted))); + } + + #[test] + fn extract_writes_single_and_batch() { + let single = serde_json::json!({ "decision": "create", "nodeId": "n1" }); + assert_eq!(extract_writes(&single), vec![("n1".into(), "create".into())]); + let batch = serde_json::json!({ + "results": [ { "decision": "update", "id": "n2" } ] + }); + assert_eq!(extract_writes(&batch), vec![("n2".into(), "update".into())]); + } +}