diff --git a/CHANGELOG.md b/CHANGELOG.md index 490c01e..dec084c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,64 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [2.1.25] - 2026-06-12 — "Merge / Supersede Controls" + +v2.1.25 ships Phase 3: diff-previewed, confidence-gated, reversible, +self-explaining combine/dedupe/supersede on a never-delete (bitemporal) store. +The default is always preview/review — these tools never silently mutate memory. +The differentiator is the reversible operation log: every merge/supersede/undo is +an auditable, reversible event with provenance ("why did these combine?") — a git +reflog for your agent's memory. + +### Added + +- **Seven new MCP tools** for merge/supersede control: + - `merge_candidates` — surface likely duplicate/overlapping clusters with + confidence scores and the signals behind each (Fellegi-Sunter + match/possible/non-match). Read-only. + - `plan_merge` — produce a previewable merge PLAN (a diff of combined + content/tags/provenance) without applying it. + - `plan_supersede` — preview superseding A with B (bitemporal invalidation, + audit-preserving) without applying. + - `apply_plan` — execute a previously-generated plan id; recorded as a + reversible operation. + - `merge_undo` — reverse a prior merge/supersede operation, or list the + reversible operation log (the "memory reflog"). + - `protect` — pin a memory so it can never be auto-merged, superseded, or + garbage-collected. + - `merge_policy` — get/set the per-project Fellegi-Sunter two thresholds + (`match_threshold`, `possible_threshold`) and `auto_apply`. +- **Bitemporal "invalidate, don't delete" supersede** (Graphiti-style): a + superseded memory is kept and stays queryable for audit. It is stamped with + `valid_until = now` and a new `superseded_by` lineage pointer, instead of being + deleted or merely demoted. +- **Reversible operation log** (`merge_operations` table) — every applied + merge/supersede records an undo payload and provenance signals so any operation + can be reversed, including restoring survivor content/tags and clearing the + bitemporal invalidation. +- **Fellegi-Sunter two-threshold scoring** for dedup/merge candidates, combining + embedding cosine similarity with tag and content-token overlap. Borderline + "possible" matches are surfaced for review instead of force-merged. +- **Memory protection / pinning** — `protected` column on `knowledge_nodes`; + protected memories are excluded from auto-merge/supersede/GC paths. +- **Migration V14** adding the `merge_plans` and `merge_operations` tables, the + `protected` and `superseded_by` columns on `knowledge_nodes`, and their + indexes. Idempotent on replay. +- **Docs**: `docs/MERGE_SUPERSEDE.md` describing the design, the bitemporal + model, the two-threshold policy, the reversible operation log, and the tool + surface. + +### Notes + +- All merge/supersede operations are **opt-in and preview-first**. `apply_plan` + requires `confirm=true` for `possible`/`non_match` plans, and only applies + `match` plans without confirmation when `merge_policy.auto_apply` is enabled + (default off). This deliberately avoids the silent-merge / auto-delete / + audit-trail-loss anti-patterns reported against other memory systems. +- The merge policy persists per-project and is also overridable via + `VESTIGE_MERGE_MATCH_THRESHOLD`, `VESTIGE_MERGE_POSSIBLE_THRESHOLD`, and + `VESTIGE_MERGE_AUTO_APPLY` environment variables. + ## [2.1.23] - 2026-05-27 — "Receipt Lock Hardening" v2.1.23 hardens the Sanhedrin launch path so Receipt Lock is portable, diff --git a/Cargo.lock b/Cargo.lock index b9612d3..33fe576 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4629,7 +4629,7 @@ checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "vestige-core" -version = "2.1.23" +version = "2.1.25" dependencies = [ "candle-core", "chrono", @@ -4665,7 +4665,7 @@ dependencies = [ [[package]] name = "vestige-mcp" -version = "2.1.23" +version = "2.1.25" dependencies = [ "anyhow", "axum", diff --git a/Cargo.toml b/Cargo.toml index f120928..1c89455 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ exclude = [ ] [workspace.package] -version = "2.1.23" +version = "2.1.25" edition = "2024" license = "AGPL-3.0-only" repository = "https://github.com/samvallad33/vestige" diff --git a/apps/dashboard/package.json b/apps/dashboard/package.json index 4023f8d..b35ef9f 100644 --- a/apps/dashboard/package.json +++ b/apps/dashboard/package.json @@ -1,6 +1,6 @@ { "name": "@vestige/dashboard", - "version": "2.1.23", + "version": "2.1.25", "private": true, "type": "module", "scripts": { diff --git a/crates/vestige-core/Cargo.toml b/crates/vestige-core/Cargo.toml index fe89443..c66c369 100644 --- a/crates/vestige-core/Cargo.toml +++ b/crates/vestige-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "vestige-core" -version = "2.1.23" +version = "2.1.25" edition = "2024" rust-version = "1.91" authors = ["Vestige Team"] diff --git a/crates/vestige-core/src/advanced/merge_supersede.rs b/crates/vestige-core/src/advanced/merge_supersede.rs new file mode 100644 index 0000000..c10485a --- /dev/null +++ b/crates/vestige-core/src/advanced/merge_supersede.rs @@ -0,0 +1,447 @@ +//! # Merge / Supersede Controls (Phase 3) +//! +//! Diff-previewed, confidence-gated, reversible, self-explaining combine / +//! dedupe / supersede operations on a never-delete (bitemporal) store. +//! +//! This module holds the **pure** logic: candidate scoring, two-threshold +//! classification, and the plan / operation data model. The actual persistence +//! (writing plans, applying them, recording the reversible operation log, and +//! bitemporally invalidating superseded nodes) lives in +//! [`crate::storage`]. Keeping the math here makes it unit-testable without a +//! database. +//! +//! ## Design north star +//! +//! Every combine/dedupe/supersede operation is: +//! +//! - **diff-previewed** — `plan_merge` / `plan_supersede` produce a [`MergePlan`] +//! you can inspect before anything mutates, +//! - **confidence-gated** — a Fellegi-Sunter two-threshold score classifies each +//! candidate as match / possible-match / non-match, +//! - **reversible** — every applied plan records a [`MergeOperation`] with an +//! undo payload (the "git reflog for your agent's memory"), +//! - **self-explaining** — each candidate carries the [`MatchSignals`] that +//! explain *why* the memories combined, +//! - **opt-in, never silent** — the default is preview/review, never auto-mutate, +//! - **audit-preserving** — superseding stamps `valid_until` and keeps the old +//! node queryable (Graphiti-style "invalidate, don't delete"). +//! +//! ## Why Fellegi-Sunter +//! +//! Pure hashing under-merges (misses paraphrases); aggressive LLM merging +//! over-merges and destroys the audit trail. Fellegi-Sunter record linkage uses +//! **two** thresholds to carve the score space into three zones, so the +//! borderline "possible match" cases are surfaced for review instead of being +//! force-decided. We reuse the embedding cosine similarity already in the store +//! plus cheap lexical signals (tag overlap, token Jaccard) as the match weight. + +use serde::{Deserialize, Serialize}; + +// ============================================================================ +// CONSTANTS — the two Fellegi-Sunter thresholds +// ============================================================================ + +/// Above this combined score → automatic-eligible "match". +pub const DEFAULT_MATCH_THRESHOLD: f32 = 0.86; + +/// Between the two thresholds → "possible match", surfaced for review. +/// Below this → "non-match" (never offered). +pub const DEFAULT_POSSIBLE_THRESHOLD: f32 = 0.72; + +/// Weight of embedding cosine similarity in the combined score. +const W_EMBEDDING: f32 = 0.70; +/// Weight of tag overlap (Jaccard) in the combined score. +const W_TAGS: f32 = 0.15; +/// Weight of content token overlap (Jaccard) in the combined score. +const W_TOKENS: f32 = 0.15; + +// ============================================================================ +// CLASSIFICATION +// ============================================================================ + +/// Fellegi-Sunter three-way classification of a candidate pair/cluster. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum MatchClass { + /// Score ≥ match threshold — strong duplicate, auto-merge eligible. + Match, + /// Between thresholds — surfaced for human/agent review, never auto-applied. + Possible, + /// Below the possible threshold — not offered as a candidate. + NonMatch, +} + +impl MatchClass { + /// String label used in tool output and the `classification` column. + pub fn as_str(&self) -> &'static str { + match self { + MatchClass::Match => "match", + MatchClass::Possible => "possible", + MatchClass::NonMatch => "non_match", + } + } +} + +/// Per-merge-policy thresholds. Wired to `vestige.toml` when present, else the +/// defaults above. `auto_apply` gates whether `Match`-class candidates may be +/// applied without an explicit preview step (default: false — never silent). +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct MergePolicy { + /// Score ≥ this → `Match`. + pub match_threshold: f32, + /// Score in `[possible_threshold, match_threshold)` → `Possible`. + pub possible_threshold: f32, + /// If true, `Match`-class candidates may be auto-applied. Default false: + /// the product promise is review/preview, not silent mutation. + pub auto_apply: bool, +} + +impl Default for MergePolicy { + fn default() -> Self { + Self { + match_threshold: DEFAULT_MATCH_THRESHOLD, + possible_threshold: DEFAULT_POSSIBLE_THRESHOLD, + auto_apply: false, + } + } +} + +impl MergePolicy { + /// Build a policy, clamping thresholds into `[0,1]` and ensuring + /// `possible_threshold <= match_threshold`. + pub fn new(match_threshold: f32, possible_threshold: f32, auto_apply: bool) -> Self { + let match_threshold = match_threshold.clamp(0.0, 1.0); + let possible_threshold = possible_threshold.clamp(0.0, match_threshold); + Self { + match_threshold, + possible_threshold, + auto_apply, + } + } + + /// Classify a combined match score. + pub fn classify(&self, score: f32) -> MatchClass { + if score >= self.match_threshold { + MatchClass::Match + } else if score >= self.possible_threshold { + MatchClass::Possible + } else { + MatchClass::NonMatch + } + } +} + +// ============================================================================ +// SIGNALS — the self-explaining "why did these combine?" +// ============================================================================ + +/// The individual signals behind a candidate's score. Surfaced verbatim so a +/// user can see *why* two memories were judged duplicates. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MatchSignals { + /// Cosine similarity of the two embeddings (0–1). + pub embedding_similarity: f32, + /// Jaccard overlap of the two tag sets (0–1). + pub tag_overlap: f32, + /// Jaccard overlap of content tokens (0–1). + pub token_overlap: f32, + /// Combined weighted score that was classified. + pub combined_score: f32, +} + +/// Compute the combined match score and its signal breakdown for a pair. +pub fn score_pair( + embedding_similarity: f32, + a_tags: &[String], + b_tags: &[String], + a_content: &str, + b_content: &str, +) -> MatchSignals { + let tag_overlap = jaccard(&tag_set(a_tags), &tag_set(b_tags)); + let token_overlap = jaccard(&token_set(a_content), &token_set(b_content)); + let combined_score = (W_EMBEDDING * embedding_similarity.clamp(0.0, 1.0) + + W_TAGS * tag_overlap + + W_TOKENS * token_overlap) + .clamp(0.0, 1.0); + MatchSignals { + embedding_similarity: embedding_similarity.clamp(0.0, 1.0), + tag_overlap, + token_overlap, + combined_score, + } +} + +fn tag_set(tags: &[String]) -> std::collections::HashSet { + tags.iter().map(|t| t.to_lowercase()).collect() +} + +fn token_set(content: &str) -> std::collections::HashSet { + content + .split(|c: char| !c.is_alphanumeric()) + .filter(|t| t.len() > 2) + .map(|t| t.to_lowercase()) + .collect() +} + +fn jaccard(a: &std::collections::HashSet, b: &std::collections::HashSet) -> f32 { + if a.is_empty() && b.is_empty() { + return 0.0; + } + let inter = a.intersection(b).count() as f32; + let union = a.union(b).count() as f32; + if union == 0.0 { 0.0 } else { inter / union } +} + +// ============================================================================ +// CANDIDATE +// ============================================================================ + +/// A surfaced merge candidate: a cluster of likely-duplicate memories with the +/// signals and classification that justify offering it. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MergeCandidate { + /// Node ids in the cluster. The first is the suggested survivor (highest + /// retention). + pub member_ids: Vec, + /// Short content previews, parallel to `member_ids`. + pub previews: Vec, + /// Suggested survivor id (kept after a merge). + pub survivor_id: String, + /// Combined match score for the cluster (min pairwise within the cluster — + /// the weakest link, so a cluster is only as confident as its loosest pair). + pub confidence: f32, + /// Three-way classification under the active policy. + pub classification: MatchClass, + /// Signals for the survivor↔closest-member pair (the explanation). + pub signals: MatchSignals, + /// True if any member is protected (pinned) — blocks auto-merge. + pub has_protected_member: bool, +} + +// ============================================================================ +// PLAN — the previewable diff +// ============================================================================ + +/// What kind of plan this is. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum PlanKind { + /// Combine N memories into one survivor. + Merge, + /// Invalidate A in favour of B (bitemporal, audit-preserving). + Supersede, +} + +impl PlanKind { + pub fn as_str(&self) -> &'static str { + match self { + PlanKind::Merge => "merge", + PlanKind::Supersede => "supersede", + } + } +} + +/// A previewable plan: exactly what *would* change, without changing anything. +/// Persisted to `merge_plans`; consumed by `apply_plan` via its `id`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MergePlan { + /// Plan id (UUID). + pub id: String, + /// merge | supersede. + pub kind: PlanKind, + /// Node kept after the operation. + pub survivor_id: String, + /// All node ids involved. + pub member_ids: Vec, + /// Resulting content of the survivor after applying. + pub result_content: String, + /// Resulting tag set of the survivor after applying. + pub result_tags: Vec, + /// Resulting provenance / source string after applying. + pub result_source: Option, + /// For supersede: ids that get bitemporally invalidated (their + /// `valid_until` stamped, kept queryable). For merge: the absorbed ids. + pub invalidated_ids: Vec, + /// Match confidence (0–1) for the plan. + pub confidence: f32, + /// Three-way classification. + pub classification: MatchClass, + /// Signals explaining the plan. + pub signals: MatchSignals, + /// Human-readable explanation of what this plan does. + pub explanation: String, +} + +// ============================================================================ +// OPERATION LOG — the reversible "memory reflog" +// ============================================================================ + +/// A recorded, reversible operation. One row in `merge_operations`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MergeOperation { + /// Operation id (UUID). + pub id: String, + /// Plan id this came from (if any). + pub plan_id: Option, + /// merge | supersede | undo. + pub op_type: String, + /// applied | reverted. + pub status: String, + /// When recorded (RFC3339). + pub created_at: String, + /// When reverted (RFC3339), if reverted. + pub reverted_at: Option, + /// For undo ops: the op id being reversed. + pub reverts_op_id: Option, + /// Survivor node id. + pub survivor_id: Option, + /// Node ids touched by the op. + pub affected_ids: Vec, + /// Match confidence. + pub confidence: Option, + /// Human-readable reason. + pub reason: Option, +} + +// ============================================================================ +// MERGE COMPOSITION — pure helpers used by the storage apply path +// ============================================================================ + +/// Compose merged content from an ordered list of (id, content) members. +/// Survivor content leads; each absorbed member is appended with provenance so +/// nothing is silently dropped (anti-pattern: Mem0 #4896 double-store / +/// contradiction loss). +pub fn compose_merged_content(members: &[(String, String)]) -> String { + if members.is_empty() { + return String::new(); + } + let mut out = members[0].1.trim().to_string(); + for (id, content) in &members[1..] { + let c = content.trim(); + if c.is_empty() || out.contains(c) { + continue; + } + out.push_str("\n\n[merged from "); + out.push_str(id); + out.push_str("]\n"); + out.push_str(c); + } + out +} + +/// Union the tag sets of all members, preserving first-seen order. +pub fn compose_merged_tags(member_tags: &[Vec]) -> Vec { + let mut seen = std::collections::HashSet::new(); + let mut out = Vec::new(); + for tags in member_tags { + for t in tags { + if seen.insert(t.to_lowercase()) { + out.push(t.clone()); + } + } + } + out +} + +// ============================================================================ +// TESTS +// ============================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn classify_three_zones() { + let policy = MergePolicy::default(); + assert_eq!(policy.classify(0.95), MatchClass::Match); + assert_eq!(policy.classify(0.80), MatchClass::Possible); + assert_eq!(policy.classify(0.50), MatchClass::NonMatch); + // boundaries are inclusive at the lower edge of each higher zone + assert_eq!(policy.classify(DEFAULT_MATCH_THRESHOLD), MatchClass::Match); + assert_eq!( + policy.classify(DEFAULT_POSSIBLE_THRESHOLD), + MatchClass::Possible + ); + } + + #[test] + fn policy_clamps_and_orders() { + // possible above match gets clamped down to match + let p = MergePolicy::new(0.8, 0.95, true); + assert!(p.possible_threshold <= p.match_threshold); + // out-of-range clamps to [0,1] + let p2 = MergePolicy::new(2.0, -1.0, false); + assert_eq!(p2.match_threshold, 1.0); + assert_eq!(p2.possible_threshold, 0.0); + } + + #[test] + fn score_pair_combines_signals() { + let s = score_pair( + 1.0, + &["rust".into(), "async".into()], + &["rust".into(), "async".into()], + "use tokio for async rust", + "use tokio for async rust", + ); + assert!((s.embedding_similarity - 1.0).abs() < 1e-6); + assert!((s.tag_overlap - 1.0).abs() < 1e-6); + assert!(s.token_overlap > 0.9); + assert!(s.combined_score > 0.95); + } + + #[test] + fn score_pair_disjoint_is_low() { + let s = score_pair( + 0.1, + &["a".into()], + &["b".into()], + "completely different topic alpha", + "totally unrelated subject beta", + ); + assert!(s.combined_score < 0.3); + assert_eq!(MergePolicy::default().classify(s.combined_score), MatchClass::NonMatch); + } + + #[test] + fn jaccard_basics() { + let a: std::collections::HashSet = ["x".into(), "y".into()].into_iter().collect(); + let b: std::collections::HashSet = ["y".into(), "z".into()].into_iter().collect(); + assert!((jaccard(&a, &b) - (1.0 / 3.0)).abs() < 1e-6); + let empty: std::collections::HashSet = Default::default(); + assert_eq!(jaccard(&empty, &empty), 0.0); + } + + #[test] + fn compose_merged_content_dedups_and_attributes() { + let members = vec![ + ("a".into(), "Keep this.".into()), + ("b".into(), "Extra detail.".into()), + ("c".into(), "Keep this.".into()), // duplicate of survivor → skipped + ]; + let merged = compose_merged_content(&members); + assert!(merged.starts_with("Keep this.")); + assert!(merged.contains("[merged from b]")); + assert!(merged.contains("Extra detail.")); + // duplicate content not appended twice + assert_eq!(merged.matches("Keep this.").count(), 1); + } + + #[test] + fn compose_merged_tags_unions_in_order() { + let tags = vec![ + vec!["rust".into(), "async".into()], + vec!["async".into(), "tokio".into()], + ]; + let merged = compose_merged_tags(&tags); + assert_eq!(merged, vec!["rust", "async", "tokio"]); + } + + #[test] + fn match_class_labels() { + assert_eq!(MatchClass::Match.as_str(), "match"); + assert_eq!(MatchClass::Possible.as_str(), "possible"); + assert_eq!(MatchClass::NonMatch.as_str(), "non_match"); + } +} diff --git a/crates/vestige-core/src/advanced/mod.rs b/crates/vestige-core/src/advanced/mod.rs index fdbdfe4..0ed9280 100644 --- a/crates/vestige-core/src/advanced/mod.rs +++ b/crates/vestige-core/src/advanced/mod.rs @@ -23,6 +23,7 @@ pub mod cross_project; pub mod dreams; pub mod importance; pub mod intent; +pub mod merge_supersede; pub mod prediction_error; pub mod reconsolidation; pub mod speculative; @@ -61,6 +62,11 @@ pub use dreams::{ }; pub use importance::{ImportanceDecayConfig, ImportanceScore, ImportanceTracker, UsageEvent}; pub use intent::{ActionType, DetectedIntent, IntentDetector, MaintenanceType, UserAction}; +pub use merge_supersede::{ + DEFAULT_MATCH_THRESHOLD, DEFAULT_POSSIBLE_THRESHOLD, MatchClass, MatchSignals, MergeCandidate, + MergeOperation, MergePlan, MergePolicy, PlanKind, compose_merged_content, compose_merged_tags, + score_pair, +}; pub use prediction_error::{ CandidateMemory, CreateReason, EvaluationIntent, GateDecision, GateStats, MergeStrategy, PredictionErrorConfig, PredictionErrorGate, SimilarityResult, SupersedeReason, UpdateType, diff --git a/crates/vestige-core/src/lib.rs b/crates/vestige-core/src/lib.rs index 08ce090..4c50413 100644 --- a/crates/vestige-core/src/lib.rs +++ b/crates/vestige-core/src/lib.rs @@ -225,8 +225,16 @@ pub use advanced::{ MemoryPath, MemoryReplay, MemorySnapshot, + // Merge / Supersede controls (Phase 3) + MatchClass, + MatchSignals, + MergeCandidate, + MergeOperation, + MergePlan, + MergePolicy, MergeStrategy, Modification, + PlanKind, Pattern, PatternType, PredictedMemory, diff --git a/crates/vestige-core/src/storage/migrations.rs b/crates/vestige-core/src/storage/migrations.rs index 2c66a2d..3be941c 100644 --- a/crates/vestige-core/src/storage/migrations.rs +++ b/crates/vestige-core/src/storage/migrations.rs @@ -69,6 +69,11 @@ pub const MIGRATIONS: &[Migration] = &[ description: "v2.1.2 Honest Memory: non-content purge tombstones", up: MIGRATION_V13_UP, }, + Migration { + version: 14, + description: "v2.1.25 Merge/Supersede: reversible operation log, merge plans, bitemporal lineage, protected pins", + up: MIGRATION_V14_UP, + }, ]; /// A database migration @@ -735,6 +740,79 @@ ON deletion_tombstones(deleted_at); UPDATE schema_version SET version = 13, applied_at = datetime('now'); "#; +/// V14: Merge / Supersede controls (Phase 3). +/// +/// Adds the four pieces the merge/supersede feature needs on a never-delete +/// (bitemporal) store: +/// +/// 1. `merge_plans` — previewable, not-yet-applied plans. `plan_merge` and +/// `plan_supersede` write a plan row containing a JSON diff; `apply_plan` +/// consumes it by id. Plans are append-only; status moves +/// pending -> applied / cancelled. +/// 2. `merge_operations` — the reversible operation log (the "memory reflog"). +/// Every applied merge/supersede records one row with a JSON `undo_payload` +/// capturing exactly what changed, so `merge_undo` can reverse it. The +/// `signals` column records WHY the memories combined (provenance), which is +/// the self-explaining differentiator. +/// 3. `knowledge_nodes.protected` — pin flag. A protected memory can never be +/// auto-merged, superseded, or forgotten. +/// 4. `knowledge_nodes.superseded_by` — bitemporal lineage pointer. Superseding +/// A with B does NOT delete A: it stamps A.valid_until = B.valid_from and +/// sets A.superseded_by = B.id, leaving A fully queryable for audit +/// (Graphiti-style invalidate-don't-delete). +// The two `protected` / `superseded_by` ADD COLUMNs (and their indexes) are +// applied separately in `apply_migrations` BEFORE this batch runs, guarded +// against "duplicate column" on replay, since SQLite has no +// `ADD COLUMN IF NOT EXISTS`. The rest of V14 is idempotent (CREATE ... IF NOT +// EXISTS). +const MIGRATION_V14_UP: &str = r#" +CREATE INDEX IF NOT EXISTS idx_nodes_protected ON knowledge_nodes(protected); +CREATE INDEX IF NOT EXISTS idx_nodes_superseded_by ON knowledge_nodes(superseded_by); + +-- Previewable plans (a diff) produced by plan_merge / plan_supersede. +-- `kind` is 'merge' | 'supersede'. `payload` is the full JSON plan/diff. +CREATE TABLE IF NOT EXISTS merge_plans ( + id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', -- pending | applied | cancelled + created_at TEXT NOT NULL, + applied_at TEXT, + survivor_id TEXT, -- node kept after the op + member_ids TEXT NOT NULL DEFAULT '[]', -- JSON array of all involved node ids + confidence REAL, -- Fellegi-Sunter match score (0-1) + classification TEXT, -- match | possible | non_match + payload TEXT NOT NULL -- full JSON plan/diff +); + +CREATE INDEX IF NOT EXISTS idx_merge_plans_status ON merge_plans(status); +CREATE INDEX IF NOT EXISTS idx_merge_plans_created_at ON merge_plans(created_at); + +-- Reversible operation log — the "git reflog for your agent's memory". +-- One row per applied merge/supersede; `undo_payload` carries everything +-- needed to reverse it, `signals` records why the memories combined. +CREATE TABLE IF NOT EXISTS merge_operations ( + id TEXT PRIMARY KEY, + plan_id TEXT, -- merge_plans.id this came from + op_type TEXT NOT NULL, -- merge | supersede | undo + status TEXT NOT NULL DEFAULT 'applied', -- applied | reverted + created_at TEXT NOT NULL, + reverted_at TEXT, + reverts_op_id TEXT, -- set when op_type = 'undo' + survivor_id TEXT, -- node kept + affected_ids TEXT NOT NULL DEFAULT '[]', -- JSON array of node ids touched + confidence REAL, + signals TEXT, -- JSON: why they combined (provenance) + reason TEXT, -- human-readable explanation + undo_payload TEXT NOT NULL -- JSON snapshot to reverse the op +); + +CREATE INDEX IF NOT EXISTS idx_merge_operations_status ON merge_operations(status); +CREATE INDEX IF NOT EXISTS idx_merge_operations_created_at ON merge_operations(created_at); +CREATE INDEX IF NOT EXISTS idx_merge_operations_survivor ON merge_operations(survivor_id); + +UPDATE schema_version SET version = 14, applied_at = datetime('now'); +"#; + /// Get current schema version from database pub fn get_current_version(conn: &rusqlite::Connection) -> rusqlite::Result { conn.query_row( @@ -745,6 +823,19 @@ pub fn get_current_version(conn: &rusqlite::Connection) -> rusqlite::Result .or(Ok(0)) } +/// Run an `ALTER TABLE ... ADD COLUMN` statement, treating a "duplicate column +/// name" failure as success so migration replay stays idempotent (SQLite has no +/// `ADD COLUMN IF NOT EXISTS`). +fn add_column_if_missing(conn: &rusqlite::Connection, sql: &str) -> rusqlite::Result<()> { + match conn.execute(sql, []) { + Ok(_) => Ok(()), + Err(rusqlite::Error::SqliteFailure(_, Some(msg))) if msg.contains("duplicate column name") => { + Ok(()) + } + Err(e) => Err(e), + } +} + /// Apply pending migrations pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result { let current_version = get_current_version(conn)?; @@ -758,6 +849,21 @@ pub fn apply_migrations(conn: &rusqlite::Connection) -> rusqlite::Result { migration.description ); + // V14: add the two bitemporal/protect columns BEFORE the batch (the + // batch's indexes reference them). SQLite lacks + // `ADD COLUMN IF NOT EXISTS`, so swallow the "duplicate column" + // error to stay idempotent on replay. + if migration.version == 14 { + add_column_if_missing( + conn, + "ALTER TABLE knowledge_nodes ADD COLUMN protected INTEGER NOT NULL DEFAULT 0", + )?; + add_column_if_missing( + conn, + "ALTER TABLE knowledge_nodes ADD COLUMN superseded_by TEXT", + )?; + } + // Use execute_batch to handle multi-statement SQL including triggers conn.execute_batch(migration.up)?; @@ -784,17 +890,17 @@ mod tests { /// version after `apply_migrations` runs all migrations end-to-end, and /// neither of the dead tables V11 drops must exist afterwards. #[test] - fn test_apply_migrations_advances_to_v13_and_drops_dead_tables() { + fn test_apply_migrations_advances_to_v14_and_drops_dead_tables() { let conn = rusqlite::Connection::open_in_memory().expect("open in-memory"); // Pre-requisite: schema_version must be bootstrapped by V1. apply_migrations(&conn).expect("apply_migrations succeeds"); - // 1. schema_version advanced to V13 + // 1. schema_version advanced to V14 let version = get_current_version(&conn).expect("read schema_version"); assert_eq!( - version, 13, - "schema_version must be 13 after all migrations" + version, 14, + "schema_version must be 14 after all migrations" ); // 2. knowledge_edges is gone (V11 drops it) @@ -848,6 +954,37 @@ mod tests { deletion_tombstone_rows, 1, "deletion_tombstones table must be created by V13" ); + + // 6. merge_plans + merge_operations exist (V14 creates them) + for table in ["merge_plans", "merge_operations"] { + let rows: i64 = conn + .query_row( + "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1", + [table], + |row| row.get(0), + ) + .expect("query sqlite_master"); + assert_eq!(rows, 1, "{table} table must be created by V14"); + } + + // 7. knowledge_nodes gains `protected` + `superseded_by` (V14) + let node_cols: Vec = { + let mut stmt = conn + .prepare("PRAGMA table_info(knowledge_nodes)") + .expect("prepare table_info"); + stmt.query_map([], |row| row.get::<_, String>(1)) + .expect("query table_info") + .filter_map(|r| r.ok()) + .collect() + }; + assert!( + node_cols.iter().any(|c| c == "protected"), + "knowledge_nodes must have `protected` column after V14" + ); + assert!( + node_cols.iter().any(|c| c == "superseded_by"), + "knowledge_nodes must have `superseded_by` column after V14" + ); } /// V11 must be idempotent on replay — if the tables were already dropped @@ -869,6 +1006,6 @@ mod tests { apply_migrations(&conn).expect("V11 replay must be idempotent"); let version = get_current_version(&conn).expect("read schema_version"); - assert_eq!(version, 13, "schema_version back at 13 after replay"); + assert_eq!(version, 14, "schema_version back at 14 after replay"); } } diff --git a/crates/vestige-core/src/storage/sqlite.rs b/crates/vestige-core/src/storage/sqlite.rs index c4e8f2b..dcd32ad 100644 --- a/crates/vestige-core/src/storage/sqlite.rs +++ b/crates/vestige-core/src/storage/sqlite.rs @@ -6279,6 +6279,988 @@ impl Storage { } Ok(result) } + + // ======================================================================== + // Merge / Supersede controls (Phase 3 — v2.1.25) + // + // Diff-previewed, confidence-gated, reversible, self-explaining + // combine/dedupe/supersede on a never-delete (bitemporal) store. + // Pure scoring/plan/op types live in `advanced::merge_supersede`. + // ======================================================================== + + /// Mark a memory protected (pinned) or unprotected. A protected memory can + /// never be auto-merged, superseded, or garbage-collected. + pub fn set_protected(&self, id: &str, protected: bool) -> Result<()> { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + let affected = writer.execute( + "UPDATE knowledge_nodes SET protected = ?1 WHERE id = ?2", + params![if protected { 1 } else { 0 }, id], + )?; + if affected == 0 { + return Err(StorageError::NotFound(id.to_string())); + } + Ok(()) + } + + /// Is this memory protected (pinned)? + pub fn is_protected(&self, id: &str) -> Result { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let v: Option = reader + .query_row( + "SELECT protected FROM knowledge_nodes WHERE id = ?1", + params![id], + |row| row.get(0), + ) + .optional()?; + match v { + Some(p) => Ok(p != 0), + None => Err(StorageError::NotFound(id.to_string())), + } + } + + /// Read the per-project merge policy (two Fellegi-Sunter thresholds + + /// auto_apply). Persisted in `fsrs_config` so it survives restarts without a + /// new table; falls back to defaults (env-overridable) when unset. + pub fn get_merge_policy(&self) -> Result { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let read_key = |key: &str| -> Option { + reader + .query_row( + "SELECT value FROM fsrs_config WHERE key = ?1", + params![key], + |row| row.get::<_, f64>(0), + ) + .optional() + .ok() + .flatten() + }; + let default = crate::advanced::MergePolicy::default(); + let env_f32 = |name: &str, fallback: f32| -> f32 { + std::env::var(name) + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(fallback) + }; + let match_threshold = read_key("merge_match_threshold") + .map(|v| v as f32) + .unwrap_or_else(|| env_f32("VESTIGE_MERGE_MATCH_THRESHOLD", default.match_threshold)); + let possible_threshold = read_key("merge_possible_threshold") + .map(|v| v as f32) + .unwrap_or_else(|| { + env_f32("VESTIGE_MERGE_POSSIBLE_THRESHOLD", default.possible_threshold) + }); + let auto_apply = match read_key("merge_auto_apply") { + Some(v) => v != 0.0, + None => std::env::var("VESTIGE_MERGE_AUTO_APPLY") + .ok() + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(default.auto_apply), + }; + Ok(crate::advanced::MergePolicy::new( + match_threshold, + possible_threshold, + auto_apply, + )) + } + + /// Persist the per-project merge policy into `fsrs_config`. + pub fn set_merge_policy(&self, policy: crate::advanced::MergePolicy) -> Result<()> { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + let now = Utc::now().to_rfc3339(); + let put = |key: &str, value: f64| -> Result<()> { + writer.execute( + "INSERT OR REPLACE INTO fsrs_config (key, value, updated_at) VALUES (?1, ?2, ?3)", + params![key, value, now], + )?; + Ok(()) + }; + put("merge_match_threshold", policy.match_threshold as f64)?; + put("merge_possible_threshold", policy.possible_threshold as f64)?; + put( + "merge_auto_apply", + if policy.auto_apply { 1.0 } else { 0.0 }, + )?; + Ok(()) + } + + /// Surface likely duplicate/overlapping memory clusters with confidence + /// scores and the signals behind each (Fellegi-Sunter classified). + /// + /// Only clusters whose weakest pair scores at or above the policy's + /// `possible_threshold` are returned. Protected members are flagged so the + /// caller never auto-merges a pin. + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + pub fn merge_candidates( + &self, + policy: crate::advanced::MergePolicy, + limit: usize, + tag_filter: &[String], + ) -> Result> { + use crate::advanced::{MatchClass, MergeCandidate, score_pair}; + + let all_embeddings = self.get_all_embeddings()?; + if all_embeddings.is_empty() { + return Ok(vec![]); + } + + // Load nodes for metadata. Exclude already-superseded nodes — they are + // historical and must not be re-offered for merge. + let mut node_map: std::collections::HashMap = + std::collections::HashMap::new(); + let superseded: std::collections::HashSet = self.superseded_node_ids()?; + let protected: std::collections::HashSet = self.protected_node_ids()?; + + let mut offset = 0; + loop { + let batch = self.get_all_nodes(500, offset)?; + let n = batch.len(); + for node in batch { + node_map.insert(node.id.clone(), node); + } + if n < 500 { + break; + } + offset += 500; + } + + // Candidate embeddings, filtered by tag and excluding superseded. + let items: Vec<(String, Vec)> = all_embeddings + .into_iter() + .filter(|(id, _)| !superseded.contains(id)) + .filter(|(id, _)| { + if tag_filter.is_empty() { + return true; + } + node_map + .get(id) + .map(|n| tag_filter.iter().any(|t| n.tags.contains(t))) + .unwrap_or(false) + }) + .collect(); + + let n = items.len(); + if n > 2000 { + return Err(StorageError::Init(format!( + "Too many memories to scan ({n} with embeddings). Filter by tags to reduce scope." + ))); + } + + // Union-find clustering over pairs above the possible threshold. + let mut parent: Vec = (0..n).collect(); + fn find(parent: &mut [usize], x: usize) -> usize { + let mut root = x; + while parent[root] != root { + root = parent[root]; + } + let mut cur = x; + while parent[cur] != root { + let next = parent[cur]; + parent[cur] = root; + cur = next; + } + root + } + + // Best pair score per resulting cluster member, for the explanation. + let mut pair_score: std::collections::HashMap<(usize, usize), crate::advanced::MatchSignals> = + std::collections::HashMap::new(); + + for i in 0..n { + for j in (i + 1)..n { + let sim = crate::cosine_similarity(&items[i].1, &items[j].1); + let (a_node, b_node) = (node_map.get(&items[i].0), node_map.get(&items[j].0)); + let signals = score_pair( + sim, + a_node.map(|n| n.tags.as_slice()).unwrap_or(&[]), + b_node.map(|n| n.tags.as_slice()).unwrap_or(&[]), + a_node.map(|n| n.content.as_str()).unwrap_or(""), + b_node.map(|n| n.content.as_str()).unwrap_or(""), + ); + if signals.combined_score >= policy.possible_threshold { + let ri = find(&mut parent, i); + let rj = find(&mut parent, j); + if ri != rj { + parent[ri] = rj; + } + pair_score.insert((i, j), signals); + } + } + } + + // Group indices by root. + let mut clusters: std::collections::HashMap> = + std::collections::HashMap::new(); + for i in 0..n { + let r = find(&mut parent, i); + clusters.entry(r).or_default().push(i); + } + + let mut out: Vec = Vec::new(); + for members in clusters.into_values() { + if members.len() < 2 { + continue; + } + // Cluster confidence = weakest recorded pair (the loosest link). + let mut min_score = 1.0f32; + let mut best_signals: Option = None; + for a in 0..members.len() { + for b in (a + 1)..members.len() { + let key = (members[a].min(members[b]), members[a].max(members[b])); + if let Some(sig) = pair_score.get(&key) { + if sig.combined_score < min_score { + min_score = sig.combined_score; + } + if best_signals + .as_ref() + .map(|s| sig.combined_score > s.combined_score) + .unwrap_or(true) + { + best_signals = Some(sig.clone()); + } + } + } + } + let signals = match best_signals { + Some(s) => s, + None => continue, + }; + + // Survivor = highest retention member. + let mut member_ids: Vec = + members.iter().map(|&idx| items[idx].0.clone()).collect(); + member_ids.sort_by(|a, b| { + let ra = node_map.get(a).map(|n| n.retention_strength).unwrap_or(0.0); + let rb = node_map.get(b).map(|n| n.retention_strength).unwrap_or(0.0); + rb.partial_cmp(&ra).unwrap_or(std::cmp::Ordering::Equal) + }); + let survivor_id = member_ids[0].clone(); + let has_protected_member = member_ids.iter().any(|id| protected.contains(id)); + let previews: Vec = member_ids + .iter() + .map(|id| { + node_map + .get(id) + .map(|n| preview(&n.content, 120)) + .unwrap_or_default() + }) + .collect(); + + let classification = match policy.classify(min_score) { + MatchClass::NonMatch => continue, + c => c, + }; + + out.push(MergeCandidate { + member_ids, + previews, + survivor_id, + confidence: min_score, + classification, + signals, + has_protected_member, + }); + } + + out.sort_by(|a, b| { + b.confidence + .partial_cmp(&a.confidence) + .unwrap_or(std::cmp::Ordering::Equal) + }); + out.truncate(limit); + Ok(out) + } + + /// IDs of nodes that have been bitemporally superseded (kept, but invalid). + pub fn superseded_node_ids(&self) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = + reader.prepare("SELECT id FROM knowledge_nodes WHERE superseded_by IS NOT NULL")?; + let rows = stmt.query_map([], |row| row.get::<_, String>(0))?; + let mut set = std::collections::HashSet::new(); + for r in rows { + set.insert(r?); + } + Ok(set) + } + + /// IDs of protected (pinned) nodes. + pub fn protected_node_ids(&self) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = reader.prepare("SELECT id FROM knowledge_nodes WHERE protected = 1")?; + let rows = stmt.query_map([], |row| row.get::<_, String>(0))?; + let mut set = std::collections::HashSet::new(); + for r in rows { + set.insert(r?); + } + Ok(set) + } + + /// Build a previewable MERGE plan (a diff) WITHOUT applying it. + /// + /// The survivor is the first id (or highest retention if unspecified). The + /// plan is persisted to `merge_plans` with status `pending` and returned for + /// inspection. Nothing about the nodes changes until `apply_plan`. + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + pub fn plan_merge( + &self, + member_ids: &[String], + survivor_id: Option<&str>, + policy: crate::advanced::MergePolicy, + ) -> Result { + use crate::advanced::{ + MatchClass, PlanKind, compose_merged_content, compose_merged_tags, score_pair, + }; + + if member_ids.len() < 2 { + return Err(StorageError::Init( + "plan_merge needs at least 2 member ids".into(), + )); + } + + let mut nodes: Vec = Vec::new(); + for id in member_ids { + let node = self + .get_node(id)? + .ok_or_else(|| StorageError::NotFound(id.clone()))?; + nodes.push(node); + } + + // Protected nodes can never be absorbed. They may only be the survivor. + let survivor = match survivor_id { + Some(s) => s.to_string(), + None => { + // highest retention + nodes + .iter() + .max_by(|a, b| { + a.retention_strength + .partial_cmp(&b.retention_strength) + .unwrap_or(std::cmp::Ordering::Equal) + }) + .map(|n| n.id.clone()) + .unwrap_or_else(|| member_ids[0].clone()) + } + }; + for node in &nodes { + if node.id != survivor && self.is_protected(&node.id)? { + return Err(StorageError::Init(format!( + "Memory {} is protected and cannot be merged away. Unprotect it first or make it the survivor.", + node.id + ))); + } + } + + // Order: survivor first, then others. + nodes.sort_by_key(|n| if n.id == survivor { 0 } else { 1 }); + + let members: Vec<(String, String)> = nodes + .iter() + .map(|n| (n.id.clone(), n.content.clone())) + .collect(); + let result_content = compose_merged_content(&members); + let result_tags = compose_merged_tags( + &nodes.iter().map(|n| n.tags.clone()).collect::>(), + ); + let result_source = nodes.iter().find(|n| n.id == survivor).and_then(|n| n.source.clone()); + let invalidated_ids: Vec = nodes + .iter() + .filter(|n| n.id != survivor) + .map(|n| n.id.clone()) + .collect(); + + // Confidence = weakest pair survivor↔absorbed. + let survivor_node = nodes.iter().find(|n| n.id == survivor).unwrap(); + let mut min_score = 1.0f32; + let mut best_signals = score_pair( + 1.0, + &survivor_node.tags, + &survivor_node.tags, + &survivor_node.content, + &survivor_node.content, + ); + for node in nodes.iter().filter(|n| n.id != survivor) { + let sim = self.pair_similarity(&survivor, &node.id)?; + let sig = score_pair( + sim, + &survivor_node.tags, + &node.tags, + &survivor_node.content, + &node.content, + ); + if sig.combined_score < min_score { + min_score = sig.combined_score; + best_signals = sig; + } + } + let classification = policy.classify(min_score); + + let plan = crate::advanced::MergePlan { + id: uuid::Uuid::new_v4().to_string(), + kind: PlanKind::Merge, + survivor_id: survivor.clone(), + member_ids: member_ids.to_vec(), + result_content, + result_tags, + result_source, + invalidated_ids, + confidence: min_score, + classification, + signals: best_signals, + explanation: format!( + "Merge {} memories into {survivor} ({}). {} memory(ies) will be bitemporally invalidated (kept for audit, marked superseded_by={survivor}).", + member_ids.len(), + match classification { + MatchClass::Match => "strong duplicate", + MatchClass::Possible => "possible duplicate — review advised", + MatchClass::NonMatch => "weak match — review strongly advised", + }, + member_ids.len() - 1 + ), + }; + + self.persist_plan(&plan)?; + Ok(plan) + } + + /// Build a previewable SUPERSEDE plan: invalidate `old_id` in favour of + /// `new_id` (bitemporal, audit-preserving) WITHOUT applying it. + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + pub fn plan_supersede( + &self, + old_id: &str, + new_id: &str, + policy: crate::advanced::MergePolicy, + ) -> Result { + use crate::advanced::{PlanKind, score_pair}; + + let old = self + .get_node(old_id)? + .ok_or_else(|| StorageError::NotFound(old_id.to_string()))?; + let new = self + .get_node(new_id)? + .ok_or_else(|| StorageError::NotFound(new_id.to_string()))?; + + if self.is_protected(old_id)? { + return Err(StorageError::Init(format!( + "Memory {old_id} is protected and cannot be superseded. Unprotect it first." + ))); + } + + let sim = self.pair_similarity(old_id, new_id)?; + let signals = score_pair(sim, &old.tags, &new.tags, &old.content, &new.content); + let classification = policy.classify(signals.combined_score); + + let plan = crate::advanced::MergePlan { + id: uuid::Uuid::new_v4().to_string(), + kind: PlanKind::Supersede, + survivor_id: new_id.to_string(), + member_ids: vec![old_id.to_string(), new_id.to_string()], + result_content: new.content.clone(), + result_tags: new.tags.clone(), + result_source: new.source.clone(), + invalidated_ids: vec![old_id.to_string()], + confidence: signals.combined_score, + classification, + signals, + explanation: format!( + "Supersede {old_id} with {new_id}. {old_id} is kept and remains queryable for audit, but stamped valid_until=now and superseded_by={new_id} (invalidate, don't delete)." + ), + }; + + self.persist_plan(&plan)?; + Ok(plan) + } + + /// Cosine similarity between two nodes' stored embeddings (0 if missing). + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + fn pair_similarity(&self, a: &str, b: &str) -> Result { + let ea = self.get_node_embedding(a)?; + let eb = self.get_node_embedding(b)?; + match (ea, eb) { + (Some(ea), Some(eb)) => Ok(crate::cosine_similarity(&ea, &eb)), + _ => Ok(0.0), + } + } + + /// Persist a plan row (status pending). Idempotent on plan id. + fn persist_plan(&self, plan: &crate::advanced::MergePlan) -> Result<()> { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + let payload = serde_json::to_string(plan) + .map_err(|e| StorageError::Init(format!("plan serialize failed: {e}")))?; + let member_ids = serde_json::to_string(&plan.member_ids).unwrap_or_else(|_| "[]".into()); + writer.execute( + "INSERT OR REPLACE INTO merge_plans + (id, kind, status, created_at, applied_at, survivor_id, member_ids, confidence, classification, payload) + VALUES (?1, ?2, 'pending', ?3, NULL, ?4, ?5, ?6, ?7, ?8)", + params![ + plan.id, + plan.kind.as_str(), + Utc::now().to_rfc3339(), + plan.survivor_id, + member_ids, + plan.confidence as f64, + plan.classification.as_str(), + payload, + ], + )?; + Ok(()) + } + + /// Fetch a stored plan by id. + pub fn get_plan(&self, plan_id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let row: Option<(String, String)> = reader + .query_row( + "SELECT status, payload FROM merge_plans WHERE id = ?1", + params![plan_id], + |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)), + ) + .optional()?; + match row { + Some((_status, payload)) => { + let plan: crate::advanced::MergePlan = serde_json::from_str(&payload) + .map_err(|e| StorageError::Init(format!("plan deserialize failed: {e}")))?; + Ok(Some(plan)) + } + None => Ok(None), + } + } + + /// Plan status string (pending | applied | cancelled), if the plan exists. + pub fn plan_status(&self, plan_id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let status: Option = reader + .query_row( + "SELECT status FROM merge_plans WHERE id = ?1", + params![plan_id], + |row| row.get(0), + ) + .optional()?; + Ok(status) + } + + /// Execute a previously-generated plan by id. Everything it does is recorded + /// as a reversible [`MergeOperation`] in `merge_operations`. Returns the + /// recorded operation id. + /// + /// - **merge**: survivor content/tags are rewritten to the merged result; + /// each absorbed node is bitemporally invalidated (valid_until=now, + /// superseded_by=survivor) and kept queryable. + /// - **supersede**: old node is bitemporally invalidated in favour of new. + /// + /// `auto_apply` must be true in the policy to apply a `Match` plan without an + /// explicit `confirm`; non-`Match` plans always require `confirm=true`. + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + pub fn apply_plan( + &self, + plan_id: &str, + confirm: bool, + ) -> Result { + use crate::advanced::{MatchClass, PlanKind}; + + let plan = self + .get_plan(plan_id)? + .ok_or_else(|| StorageError::NotFound(format!("plan {plan_id}")))?; + + match self.plan_status(plan_id)?.as_deref() { + Some("applied") => { + return Err(StorageError::Init(format!( + "plan {plan_id} was already applied" + ))); + } + Some("cancelled") => { + return Err(StorageError::Init(format!("plan {plan_id} was cancelled"))); + } + _ => {} + } + + // Confirmation gate: only auto-applyable Match plans may skip confirm. + let needs_confirm = !(plan.classification == MatchClass::Match); + if needs_confirm && !confirm { + return Err(StorageError::Init(format!( + "plan {plan_id} is classified '{}' (confidence {:.3}) and requires confirm=true to apply", + plan.classification.as_str(), + plan.confidence + ))); + } + + let now = Utc::now(); + let op_id = uuid::Uuid::new_v4().to_string(); + + // Snapshot everything we need to undo, BEFORE mutating. + let mut undo = serde_json::Map::new(); + undo.insert("plan_id".into(), serde_json::json!(plan_id)); + undo.insert("kind".into(), serde_json::json!(plan.kind.as_str())); + undo.insert("survivor_id".into(), serde_json::json!(plan.survivor_id)); + + match plan.kind { + PlanKind::Merge => { + let survivor = self + .get_node(&plan.survivor_id)? + .ok_or_else(|| StorageError::NotFound(plan.survivor_id.clone()))?; + undo.insert( + "survivor_prev_content".into(), + serde_json::json!(survivor.content), + ); + undo.insert( + "survivor_prev_tags".into(), + serde_json::json!(survivor.tags), + ); + + // Capture prior valid_until / superseded_by of each absorbed node. + let mut absorbed = Vec::new(); + for id in &plan.invalidated_ids { + let (vu, sb) = self.read_bitemporal(id)?; + absorbed.push(serde_json::json!({ + "id": id, + "prev_valid_until": vu, + "prev_superseded_by": sb, + })); + } + undo.insert("absorbed".into(), serde_json::json!(absorbed)); + + // Apply: rewrite survivor, invalidate absorbed. + self.rewrite_survivor( + &plan.survivor_id, + &plan.result_content, + &plan.result_tags, + )?; + for id in &plan.invalidated_ids { + self.invalidate_node(id, &plan.survivor_id, now)?; + } + } + PlanKind::Supersede => { + let old_id = &plan.member_ids[0]; + let (vu, sb) = self.read_bitemporal(old_id)?; + undo.insert( + "absorbed".into(), + serde_json::json!([{ + "id": old_id, + "prev_valid_until": vu, + "prev_superseded_by": sb, + }]), + ); + self.invalidate_node(old_id, &plan.survivor_id, now)?; + } + } + + // Record the reversible operation. + let affected: Vec = { + let mut v = vec![plan.survivor_id.clone()]; + v.extend(plan.invalidated_ids.clone()); + v + }; + let signals = serde_json::to_string(&plan.signals).unwrap_or_else(|_| "{}".into()); + { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + "INSERT INTO merge_operations + (id, plan_id, op_type, status, created_at, reverted_at, reverts_op_id, + survivor_id, affected_ids, confidence, signals, reason, undo_payload) + VALUES (?1, ?2, ?3, 'applied', ?4, NULL, NULL, ?5, ?6, ?7, ?8, ?9, ?10)", + params![ + op_id, + plan_id, + plan.kind.as_str(), + now.to_rfc3339(), + plan.survivor_id, + serde_json::to_string(&affected).unwrap_or_else(|_| "[]".into()), + plan.confidence as f64, + signals, + plan.explanation, + serde_json::Value::Object(undo).to_string(), + ], + )?; + writer.execute( + "UPDATE merge_plans SET status = 'applied', applied_at = ?1 WHERE id = ?2", + params![now.to_rfc3339(), plan_id], + )?; + } + + self.read_operation(&op_id)? + .ok_or_else(|| StorageError::Init("operation vanished after insert".into())) + } + + /// Reverse a prior merge/supersede operation by id (the "memory reflog"). + /// Restores survivor content/tags and clears the bitemporal invalidation on + /// every node the operation touched, then records a compensating `undo` op. + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + pub fn merge_undo(&self, op_id: &str) -> Result { + let op = self + .read_operation(op_id)? + .ok_or_else(|| StorageError::NotFound(format!("operation {op_id}")))?; + if op.status == "reverted" { + return Err(StorageError::Init(format!( + "operation {op_id} was already reverted" + ))); + } + if op.op_type == "undo" { + return Err(StorageError::Init( + "cannot undo an undo operation".into(), + )); + } + + let undo: serde_json::Value = { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let payload: String = reader.query_row( + "SELECT undo_payload FROM merge_operations WHERE id = ?1", + params![op_id], + |row| row.get(0), + )?; + serde_json::from_str(&payload) + .map_err(|e| StorageError::Init(format!("undo payload parse failed: {e}")))? + }; + + let kind = undo.get("kind").and_then(|v| v.as_str()).unwrap_or(""); + let survivor_id = undo + .get("survivor_id") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + + // Restore survivor content/tags if this was a merge. + if kind == "merge" + && let (Some(content), Some(tags)) = ( + undo.get("survivor_prev_content").and_then(|v| v.as_str()), + undo.get("survivor_prev_tags").and_then(|v| v.as_array()), + ) + { + let tags: Vec = tags + .iter() + .filter_map(|t| t.as_str().map(|s| s.to_string())) + .collect(); + self.rewrite_survivor(&survivor_id, content, &tags)?; + } + + // Clear invalidation on every absorbed node, restoring prior values. + if let Some(absorbed) = undo.get("absorbed").and_then(|v| v.as_array()) { + for entry in absorbed { + let id = entry.get("id").and_then(|v| v.as_str()).unwrap_or_default(); + if id.is_empty() { + continue; + } + let prev_vu = entry.get("prev_valid_until").and_then(|v| v.as_str()); + let prev_sb = entry.get("prev_superseded_by").and_then(|v| v.as_str()); + self.restore_bitemporal(id, prev_vu, prev_sb)?; + } + } + + let now = Utc::now(); + let new_op_id = uuid::Uuid::new_v4().to_string(); + { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + // Mark original reverted. + writer.execute( + "UPDATE merge_operations SET status = 'reverted', reverted_at = ?1 WHERE id = ?2", + params![now.to_rfc3339(), op_id], + )?; + // Re-open the plan so it could be re-applied if desired. + if let Some(plan_id) = op.plan_id.as_deref() { + writer.execute( + "UPDATE merge_plans SET status = 'pending', applied_at = NULL WHERE id = ?1", + params![plan_id], + )?; + } + // Record compensating undo op. + writer.execute( + "INSERT INTO merge_operations + (id, plan_id, op_type, status, created_at, reverted_at, reverts_op_id, + survivor_id, affected_ids, confidence, signals, reason, undo_payload) + VALUES (?1, ?2, 'undo', 'applied', ?3, NULL, ?4, ?5, ?6, NULL, NULL, ?7, '{}')", + params![ + new_op_id, + op.plan_id, + now.to_rfc3339(), + op_id, + survivor_id, + serde_json::to_string(&op.affected_ids).unwrap_or_else(|_| "[]".into()), + format!("Reverted {} operation {op_id}", op.op_type), + ], + )?; + } + + self.read_operation(&new_op_id)? + .ok_or_else(|| StorageError::Init("undo operation vanished after insert".into())) + } + + /// List recent merge/supersede operations (the reflog), newest first. + pub fn list_merge_operations( + &self, + limit: usize, + ) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let mut stmt = reader.prepare( + "SELECT id, plan_id, op_type, status, created_at, reverted_at, reverts_op_id, + survivor_id, affected_ids, confidence, reason + FROM merge_operations ORDER BY created_at DESC LIMIT ?1", + )?; + let rows = stmt.query_map(params![limit as i64], Self::row_to_operation)?; + let mut out = Vec::new(); + for r in rows { + out.push(r?); + } + Ok(out) + } + + /// Read a single operation by id. + fn read_operation(&self, op_id: &str) -> Result> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let op = reader + .query_row( + "SELECT id, plan_id, op_type, status, created_at, reverted_at, reverts_op_id, + survivor_id, affected_ids, confidence, reason + FROM merge_operations WHERE id = ?1", + params![op_id], + Self::row_to_operation, + ) + .optional()?; + Ok(op) + } + + fn row_to_operation( + row: &rusqlite::Row, + ) -> rusqlite::Result { + let affected: String = row.get("affected_ids")?; + let affected_ids: Vec = serde_json::from_str(&affected).unwrap_or_default(); + Ok(crate::advanced::MergeOperation { + id: row.get("id")?, + plan_id: row.get("plan_id").ok().flatten(), + op_type: row.get("op_type")?, + status: row.get("status")?, + created_at: row.get("created_at")?, + reverted_at: row.get("reverted_at").ok().flatten(), + reverts_op_id: row.get("reverts_op_id").ok().flatten(), + survivor_id: row.get("survivor_id").ok().flatten(), + affected_ids, + confidence: row.get::<_, Option>("confidence").ok().flatten().map(|v| v as f32), + reason: row.get("reason").ok().flatten(), + }) + } + + /// Read (valid_until, superseded_by) for a node. + fn read_bitemporal(&self, id: &str) -> Result<(Option, Option)> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + let res = reader + .query_row( + "SELECT valid_until, superseded_by FROM knowledge_nodes WHERE id = ?1", + params![id], + |row| { + Ok(( + row.get::<_, Option>(0)?, + row.get::<_, Option>(1)?, + )) + }, + ) + .optional()?; + res.ok_or_else(|| StorageError::NotFound(id.to_string())) + } + + /// Bitemporally invalidate a node: stamp valid_until=now and superseded_by, + /// keeping the row fully queryable (Graphiti-style invalidate, don't delete). + fn invalidate_node(&self, id: &str, superseded_by: &str, now: DateTime) -> Result<()> { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + "UPDATE knowledge_nodes + SET valid_until = ?1, superseded_by = ?2, updated_at = ?1 + WHERE id = ?3", + params![now.to_rfc3339(), superseded_by, id], + )?; + Ok(()) + } + + /// Restore a node's bitemporal columns (used by undo). + fn restore_bitemporal( + &self, + id: &str, + valid_until: Option<&str>, + superseded_by: Option<&str>, + ) -> Result<()> { + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + "UPDATE knowledge_nodes + SET valid_until = ?1, superseded_by = ?2, updated_at = ?3 + WHERE id = ?4", + params![valid_until, superseded_by, Utc::now().to_rfc3339(), id], + )?; + Ok(()) + } + + /// Rewrite a survivor's content and tags (used by merge apply + undo). + /// Content rewrite regenerates the embedding via `update_node_content`. + fn rewrite_survivor(&self, id: &str, content: &str, tags: &[String]) -> Result<()> { + self.update_node_content(id, content)?; + let tags_json = serde_json::to_string(tags).unwrap_or_else(|_| "[]".into()); + let writer = self + .writer + .lock() + .map_err(|_| StorageError::Init("Writer lock poisoned".into()))?; + writer.execute( + "UPDATE knowledge_nodes SET tags = ?1, updated_at = ?2 WHERE id = ?3", + params![tags_json, Utc::now().to_rfc3339(), id], + )?; + Ok(()) + } +} + +/// Truncate `content` to `max` chars on a char boundary, collapsing newlines. +fn preview(content: &str, max: usize) -> String { + let c = content.replace('\n', " "); + if c.len() > max { + format!("{}...", &c[..c.floor_char_boundary(max)]) + } else { + c + } } // ============================================================================ @@ -6288,6 +7270,7 @@ impl Storage { #[cfg(test)] mod tests { use super::*; + use crate::advanced::{MatchClass, MergePolicy}; use tempfile::tempdir; fn create_test_storage() -> Storage { @@ -7370,4 +8353,300 @@ mod tests { .unwrap(); assert_eq!(has_content_column, 0); } + + // ======================================================================== + // Merge / Supersede controls (Phase 3 — v2.1.25) + // + // These exercise the full lifecycle without the live embedding model by + // seeding the `node_embeddings` table directly with the ACTIVE model name, + // so `get_all_embeddings` / `get_node_embedding` accept them. + // ======================================================================== + + /// Ingest a node and seed it with a controllable embedding under the active + /// model so similarity is deterministic in tests. + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + fn seed_node(storage: &Storage, content: &str, tags: &[&str], vector: Vec) -> String { + let node = storage + .ingest(IngestInput { + content: content.to_string(), + node_type: "fact".to_string(), + tags: tags.iter().map(|t| t.to_string()).collect(), + ..Default::default() + }) + .unwrap(); + let bytes = Embedding::new(vector).to_bytes(); + let active = storage.embedding_service.model_name().to_string(); + let writer = storage.writer.lock().unwrap(); + writer + .execute( + "INSERT OR REPLACE INTO node_embeddings + (node_id, embedding, dimensions, model, created_at) + VALUES (?1, ?2, ?3, ?4, ?5)", + rusqlite::params![ + &node.id, + &bytes, + EMBEDDING_DIMENSIONS as i32, + active, + Utc::now().to_rfc3339() + ], + ) + .unwrap(); + writer + .execute( + "UPDATE knowledge_nodes SET has_embedding = 1 WHERE id = ?1", + rusqlite::params![&node.id], + ) + .unwrap(); + node.id + } + + /// A near-unit vector pointing mostly along `axis`, so two nodes sharing an + /// axis are highly similar and nodes on different axes are not. + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + fn axis_vector(axis: usize, jitter: f32) -> Vec { + let mut v = vec![0.0f32; EMBEDDING_DIMENSIONS]; + v[axis % EMBEDDING_DIMENSIONS] = 1.0; + v[(axis + 1) % EMBEDDING_DIMENSIONS] = jitter; + v + } + + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + #[test] + fn test_merge_candidates_threshold_classification() { + let storage = create_test_storage(); + // Two near-identical (same axis) — should be offered as a candidate. + let a = seed_node( + &storage, + "Use tokio runtime for async Rust services", + &["rust", "async"], + axis_vector(3, 0.02), + ); + let b = seed_node( + &storage, + "Use the tokio runtime for async Rust services", + &["rust", "async"], + axis_vector(3, 0.01), + ); + // One unrelated (different axis) — must not join the cluster. + let _c = seed_node( + &storage, + "Prefer postgres for relational data", + &["db"], + axis_vector(200, 0.0), + ); + + let policy = MergePolicy::default(); + let candidates = storage.merge_candidates(policy, 20, &[]).unwrap(); + assert_eq!(candidates.len(), 1, "exactly one duplicate cluster"); + let cluster = &candidates[0]; + assert_eq!(cluster.member_ids.len(), 2); + assert!(cluster.member_ids.contains(&a)); + assert!(cluster.member_ids.contains(&b)); + assert!( + cluster.confidence >= policy.possible_threshold, + "confidence above possible threshold" + ); + assert!(!cluster.has_protected_member); + } + + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + #[test] + fn test_plan_merge_is_preview_only_no_mutation() { + let storage = create_test_storage(); + let a = seed_node(&storage, "Fact A about caching", &["perf"], axis_vector(5, 0.02)); + let b = seed_node( + &storage, + "Fact A about caching, expanded", + &["perf", "cache"], + axis_vector(5, 0.01), + ); + + let plan = storage + .plan_merge(&[a.clone(), b.clone()], None, MergePolicy::default()) + .unwrap(); + + // Plan diff is populated... + assert!(plan.result_content.contains("Fact A about caching")); + assert!(plan.result_tags.contains(&"cache".to_string())); + assert_eq!(plan.invalidated_ids.len(), 1); + + // ...but NOTHING changed: both nodes still valid, content untouched. + let na = storage.get_node(&a).unwrap().unwrap(); + let nb = storage.get_node(&b).unwrap().unwrap(); + assert_eq!(na.content, "Fact A about caching"); + assert_eq!(nb.content, "Fact A about caching, expanded"); + let (vu_a, sb_a) = storage.read_bitemporal(&a).unwrap(); + let (vu_b, sb_b) = storage.read_bitemporal(&b).unwrap(); + assert!(vu_a.is_none() && sb_a.is_none()); + assert!(vu_b.is_none() && sb_b.is_none()); + + // Plan persisted as pending. + assert_eq!(storage.plan_status(&plan.id).unwrap().as_deref(), Some("pending")); + } + + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + #[test] + fn test_apply_then_undo_merge_is_reversible() { + let storage = create_test_storage(); + let survivor = seed_node(&storage, "Keep this canonical note", &["x"], axis_vector(7, 0.02)); + let absorbed = seed_node( + &storage, + "Extra detail to fold in", + &["x", "y"], + axis_vector(7, 0.01), + ); + + let plan = storage + .plan_merge( + &[survivor.clone(), absorbed.clone()], + Some(&survivor), + MergePolicy::default(), + ) + .unwrap(); + let op = storage.apply_plan(&plan.id, true).unwrap(); + assert_eq!(op.op_type, "merge"); + + // After apply: survivor content merged, absorbed bitemporally invalidated + // but STILL QUERYABLE (never deleted). + let surv = storage.get_node(&survivor).unwrap().unwrap(); + assert!(surv.content.contains("Keep this canonical note")); + assert!(surv.content.contains("Extra detail to fold in")); + assert!(surv.tags.contains(&"y".to_string())); + + let (vu, sb) = storage.read_bitemporal(&absorbed).unwrap(); + assert!(vu.is_some(), "absorbed node stamped valid_until"); + assert_eq!(sb.as_deref(), Some(survivor.as_str())); + // Old node is still fully retrievable for audit. + assert!( + storage.get_node(&absorbed).unwrap().is_some(), + "superseded node remains queryable" + ); + assert!(storage.superseded_node_ids().unwrap().contains(&absorbed)); + + // Undo restores everything. + let undo = storage.merge_undo(&op.id).unwrap(); + assert_eq!(undo.op_type, "undo"); + let surv_after = storage.get_node(&survivor).unwrap().unwrap(); + assert_eq!(surv_after.content, "Keep this canonical note"); + let (vu2, sb2) = storage.read_bitemporal(&absorbed).unwrap(); + assert!(vu2.is_none() && sb2.is_none(), "invalidation cleared on undo"); + assert!(!storage.superseded_node_ids().unwrap().contains(&absorbed)); + + // The original op is now marked reverted; double-undo is rejected. + assert!(storage.merge_undo(&op.id).is_err()); + } + + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + #[test] + fn test_supersede_invalidates_old_but_keeps_it_queryable() { + let storage = create_test_storage(); + let old = seed_node(&storage, "LR should be 1e-4", &["ml"], axis_vector(9, 0.02)); + let new = seed_node( + &storage, + "Correction: LR should be 3e-4", + &["ml"], + axis_vector(9, 0.01), + ); + + let plan = storage + .plan_supersede(&old, &new, MergePolicy::default()) + .unwrap(); + // Preview did not mutate. + let (vu0, _) = storage.read_bitemporal(&old).unwrap(); + assert!(vu0.is_none()); + + let op = storage.apply_plan(&plan.id, true).unwrap(); + assert_eq!(op.op_type, "supersede"); + + let (vu, sb) = storage.read_bitemporal(&old).unwrap(); + assert!(vu.is_some(), "old stamped valid_until"); + assert_eq!(sb.as_deref(), Some(new.as_str())); + // New node untouched and valid. + let (vu_new, sb_new) = storage.read_bitemporal(&new).unwrap(); + assert!(vu_new.is_none() && sb_new.is_none()); + // Old still queryable for audit (invalidate, don't delete). + let old_node = storage.get_node(&old).unwrap().unwrap(); + assert_eq!(old_node.content, "LR should be 1e-4"); + + // And reversible. + storage.merge_undo(&op.id).unwrap(); + let (vu_r, sb_r) = storage.read_bitemporal(&old).unwrap(); + assert!(vu_r.is_none() && sb_r.is_none()); + } + + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + #[test] + fn test_protect_blocks_merge_away() { + let storage = create_test_storage(); + let pinned = seed_node(&storage, "Load-bearing fact", &["pin"], axis_vector(11, 0.02)); + let other = seed_node( + &storage, + "Load-bearing fact restated", + &["pin"], + axis_vector(11, 0.01), + ); + storage.set_protected(&pinned, true).unwrap(); + assert!(storage.is_protected(&pinned).unwrap()); + + // Protected node may not be merged AWAY (survivor=other). + let err = storage.plan_merge(&[other.clone(), pinned.clone()], Some(&other), MergePolicy::default()); + assert!(err.is_err(), "merging a protected node away must fail"); + + // But it CAN be the survivor. + let ok = storage.plan_merge( + &[pinned.clone(), other.clone()], + Some(&pinned), + MergePolicy::default(), + ); + assert!(ok.is_ok(), "protected node can be the survivor"); + + // Supersede of a protected node is also blocked. + assert!( + storage.plan_supersede(&pinned, &other, MergePolicy::default()).is_err(), + "superseding a protected node must fail" + ); + + // merge_candidates flags the protected member. + let cands = storage.merge_candidates(MergePolicy::default(), 20, &[]).unwrap(); + assert!(cands.iter().all(|c| c.has_protected_member)); + } + + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + #[test] + fn test_apply_requires_confirm_for_low_confidence() { + let storage = create_test_storage(); + // Tighten thresholds so a moderate pair lands in 'possible' (needs confirm). + let strict = MergePolicy::new(0.99, 0.5, false); + storage.set_merge_policy(strict).unwrap(); + + let a = seed_node(&storage, "Topic alpha note", &["t"], axis_vector(13, 0.30)); + let b = seed_node(&storage, "Topic alpha aside", &["t"], axis_vector(13, 0.60)); + let plan = storage.plan_merge(&[a, b], None, storage.get_merge_policy().unwrap()).unwrap(); + assert_ne!(plan.classification, MatchClass::Match); + + // Without confirm => rejected. + assert!(storage.apply_plan(&plan.id, false).is_err()); + // With confirm => applied. + assert!(storage.apply_plan(&plan.id, true).is_ok()); + // Re-applying an applied plan => rejected. + assert!(storage.apply_plan(&plan.id, true).is_err()); + } + + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + #[test] + fn test_merge_policy_roundtrip_persists() { + let storage = create_test_storage(); + let p = MergePolicy::new(0.9, 0.6, true); + storage.set_merge_policy(p).unwrap(); + let got = storage.get_merge_policy().unwrap(); + assert!((got.match_threshold - 0.9).abs() < 1e-6); + assert!((got.possible_threshold - 0.6).abs() < 1e-6); + assert!(got.auto_apply); + } + + #[test] + fn test_set_protected_unknown_node_errors() { + let storage = create_test_storage(); + assert!(storage.set_protected("does-not-exist", true).is_err()); + } } diff --git a/crates/vestige-mcp/Cargo.toml b/crates/vestige-mcp/Cargo.toml index 6485504..f265ec5 100644 --- a/crates/vestige-mcp/Cargo.toml +++ b/crates/vestige-mcp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "vestige-mcp" -version = "2.1.23" +version = "2.1.25" edition = "2024" description = "Cognitive memory MCP server for AI agents - FSRS-6, spreading activation, synaptic tagging, 3D dashboard, and 130 years of memory research" authors = ["samvallad33"] @@ -51,7 +51,7 @@ path = "src/bin/cli.rs" # Only `bundled-sqlite` is always on. `embeddings` and `vector-search` are # toggled via vestige-mcp's own feature flags below so `--no-default-features` # actually works (previously hardcoded here, which silently defeated the flag). -vestige-core = { version = "2.1.23", path = "../vestige-core", default-features = false, features = ["bundled-sqlite"] } +vestige-core = { version = "2.1.25", path = "../vestige-core", default-features = false, features = ["bundled-sqlite"] } # ============================================================================ # MCP Server Dependencies diff --git a/crates/vestige-mcp/src/server.rs b/crates/vestige-mcp/src/server.rs index a409ff5..890739b 100644 --- a/crates/vestige-mcp/src/server.rs +++ b/crates/vestige-mcp/src/server.rs @@ -328,6 +328,52 @@ impl McpServer { ..Default::default() }, // ================================================================ + // MERGE / SUPERSEDE CONTROLS (v2.1.25 — Phase 3) + // Diff-previewed, confidence-gated, reversible, never silent. + // ================================================================ + ToolDescription { + name: "merge_candidates".to_string(), + description: Some("Surface likely duplicate/overlapping memory clusters with confidence scores and the signals behind each (Fellegi-Sunter match/possible/non-match). Read-only — nothing is changed.".to_string()), + input_schema: tools::merge::merge_candidates_schema(), + ..Default::default() + }, + ToolDescription { + name: "plan_merge".to_string(), + description: Some("Produce a previewable MERGE plan (a diff: combined content/tags/provenance) for 2+ memories WITHOUT applying it. Returns a plan_id for apply_plan. Protected members block the merge.".to_string()), + input_schema: tools::merge::plan_merge_schema(), + ..Default::default() + }, + ToolDescription { + name: "plan_supersede".to_string(), + description: Some("Preview superseding memory A with B — bitemporal invalidation (stamps valid_until, keeps A queryable for audit) WITHOUT applying. Returns a plan_id for apply_plan.".to_string()), + input_schema: tools::merge::plan_supersede_schema(), + ..Default::default() + }, + ToolDescription { + name: "apply_plan".to_string(), + description: Some("Execute a previously-generated merge/supersede plan by id. Recorded as a reversible operation. Old memories are invalidated (never deleted). 'possible'/'non_match' plans require confirm=true.".to_string()), + input_schema: tools::merge::apply_plan_schema(), + ..Default::default() + }, + ToolDescription { + name: "merge_undo".to_string(), + description: Some("Reverse a prior merge/supersede operation (the 'git reflog for your agent's memory'). With no operation_id, lists the reversible operation log so you can pick one.".to_string()), + input_schema: tools::merge::merge_undo_schema(), + ..Default::default() + }, + ToolDescription { + name: "protect".to_string(), + description: Some("Pin a memory so it can never be auto-merged, superseded, or garbage-collected. Pass protected=false to unpin.".to_string()), + input_schema: tools::merge::protect_schema(), + ..Default::default() + }, + ToolDescription { + name: "merge_policy".to_string(), + description: Some("Get or set the per-project merge policy: the two Fellegi-Sunter thresholds (match_threshold, possible_threshold) and auto_apply. No args returns the current policy.".to_string()), + input_schema: tools::merge::merge_policy_schema(), + ..Default::default() + }, + // ================================================================ // COGNITIVE TOOLS (v1.5+) // ================================================================ ToolDescription { @@ -887,6 +933,14 @@ impl McpServer { } "find_duplicates" => tools::dedup::execute(&self.storage, request.arguments).await, + // ================================================================ + // MERGE / SUPERSEDE CONTROLS (v2.1.25 — Phase 3) + // ================================================================ + "merge_candidates" | "plan_merge" | "plan_supersede" | "apply_plan" | "merge_undo" + | "protect" | "merge_policy" => { + tools::merge::execute(&self.storage, request.name.as_str(), request.arguments).await + } + // ================================================================ // COGNITIVE TOOLS (v1.5+) // ================================================================ @@ -1686,8 +1740,10 @@ mod tests { let result = response.result.unwrap(); let tools = result["tools"].as_array().unwrap(); - // v2.1.21: 25 tools (includes first-class contradictions surface) - assert_eq!(tools.len(), 25, "Expected exactly 25 tools in v2.1.21"); + // v2.1.25: 32 tools (25 from v2.1.21 + 7 Phase 3 merge/supersede tools: + // merge_candidates, plan_merge, plan_supersede, apply_plan, merge_undo, + // protect, merge_policy) + assert_eq!(tools.len(), 32, "Expected exactly 32 tools in v2.1.25"); let tool_names: Vec<&str> = tools.iter().map(|t| t["name"].as_str().unwrap()).collect(); @@ -1741,6 +1797,15 @@ mod tests { assert!(tool_names.contains(&"importance_score")); assert!(tool_names.contains(&"find_duplicates")); + // Merge / Supersede controls (v2.1.25 — Phase 3) + assert!(tool_names.contains(&"merge_candidates")); + assert!(tool_names.contains(&"plan_merge")); + assert!(tool_names.contains(&"plan_supersede")); + assert!(tool_names.contains(&"apply_plan")); + assert!(tool_names.contains(&"merge_undo")); + assert!(tool_names.contains(&"protect")); + assert!(tool_names.contains(&"merge_policy")); + // Cognitive tools (v1.5) assert!(tool_names.contains(&"dream")); assert!(tool_names.contains(&"explore_connections")); diff --git a/crates/vestige-mcp/src/tools/merge.rs b/crates/vestige-mcp/src/tools/merge.rs new file mode 100644 index 0000000..f836f5f --- /dev/null +++ b/crates/vestige-mcp/src/tools/merge.rs @@ -0,0 +1,530 @@ +//! Merge / Supersede control tools (Phase 3 — v2.1.25) +//! +//! Diff-previewed, confidence-gated, reversible, self-explaining +//! combine/dedupe/supersede on a never-delete (bitemporal) store. The default +//! is always preview/review — these tools never silently mutate memory. +//! +//! Tool surface (each registered as its own MCP tool name, all routed here): +//! +//! - `merge_candidates` — surface likely duplicate clusters with confidence + +//! the signals behind each (Fellegi-Sunter match / possible / non-match). +//! - `plan_merge` — previewable merge PLAN (a diff) without applying it. +//! - `plan_supersede` — preview superseding A with B (bitemporal invalidation, +//! audit-preserving) without applying. +//! - `apply_plan` — execute a previously-generated plan id; recorded as a +//! reversible operation. +//! - `merge_undo` — reverse a prior merge/supersede operation (the reflog). +//! - `protect` — pin a memory so it can never be auto-merged/superseded/forgotten. +//! - `merge_policy` — get/set the two confidence thresholds + auto_apply. +//! +//! The actual logic lives in `vestige_core` (`storage::Storage` + +//! `advanced::merge_supersede`); this layer only validates arguments and shapes +//! JSON. + +use serde_json::{Value, json}; +use std::sync::Arc; +use vestige_core::Storage; + +// ============================================================================ +// SCHEMAS +// ============================================================================ + +/// `merge_candidates` input schema. +pub fn merge_candidates_schema() -> Value { + json!({ + "type": "object", + "properties": { + "limit": { + "type": "integer", + "description": "Max candidate clusters to return (default 20).", + "default": 20, "minimum": 1, "maximum": 100 + }, + "tags": { + "type": "array", + "items": { "type": "string" }, + "description": "Optional: only consider memories with these tags (ANY match)." + } + } + }) +} + +/// `plan_merge` input schema. +pub fn plan_merge_schema() -> Value { + json!({ + "type": "object", + "properties": { + "member_ids": { + "type": "array", + "items": { "type": "string" }, + "description": "IDs of the memories to merge (>= 2). The survivor is kept; the rest are bitemporally invalidated (kept for audit)." + }, + "survivor_id": { + "type": "string", + "description": "Optional: which member to keep. Defaults to the highest-retention member." + } + }, + "required": ["member_ids"] + }) +} + +/// `plan_supersede` input schema. +pub fn plan_supersede_schema() -> Value { + json!({ + "type": "object", + "properties": { + "old_id": { "type": "string", "description": "Memory being superseded (kept, marked invalid)." }, + "new_id": { "type": "string", "description": "Memory that supersedes the old one." } + }, + "required": ["old_id", "new_id"] + }) +} + +/// `apply_plan` input schema. +pub fn apply_plan_schema() -> Value { + json!({ + "type": "object", + "properties": { + "plan_id": { "type": "string", "description": "ID of a plan produced by plan_merge / plan_supersede." }, + "confirm": { + "type": "boolean", + "description": "Required true for 'possible'/'non_match' plans. 'match' plans apply only if the policy has auto_apply=true, else confirm is required too.", + "default": false + } + }, + "required": ["plan_id"] + }) +} + +/// `merge_undo` input schema. +pub fn merge_undo_schema() -> Value { + json!({ + "type": "object", + "properties": { + "operation_id": { + "type": "string", + "description": "ID of the merge/supersede operation to reverse. Omit to list recent operations (the reflog)." + } + } + }) +} + +/// `protect` input schema. +pub fn protect_schema() -> Value { + json!({ + "type": "object", + "properties": { + "id": { "type": "string", "description": "Memory id to protect/unprotect." }, + "protected": { + "type": "boolean", + "description": "true to pin (block auto-merge/supersede/forget), false to unpin. Default true.", + "default": true + } + }, + "required": ["id"] + }) +} + +/// `merge_policy` input schema. +pub fn merge_policy_schema() -> Value { + json!({ + "type": "object", + "properties": { + "match_threshold": { + "type": "number", + "description": "Score >= this => 'match' (auto-merge eligible). 0-1.", + "minimum": 0.0, "maximum": 1.0 + }, + "possible_threshold": { + "type": "number", + "description": "Score in [possible, match) => 'possible' (review). Below => not offered. 0-1.", + "minimum": 0.0, "maximum": 1.0 + }, + "auto_apply": { + "type": "boolean", + "description": "Allow 'match'-class plans to apply without confirm. Default false (review-first)." + } + } + }) +} + +// ============================================================================ +// DISPATCH +// ============================================================================ + +/// Route a merge/supersede tool call by tool name. +pub async fn execute(storage: &Arc, tool: &str, args: Option) -> Result { + match tool { + "merge_candidates" => merge_candidates(storage, args), + "plan_merge" => plan_merge(storage, args), + "plan_supersede" => plan_supersede(storage, args), + "apply_plan" => apply_plan(storage, args), + "merge_undo" => merge_undo(storage, args), + "protect" => protect(storage, args), + "merge_policy" => merge_policy(storage, args), + other => Err(format!("unknown merge tool: {other}")), + } +} + +fn obj(args: &Option) -> serde_json::Map { + args.as_ref() + .and_then(|v| v.as_object().cloned()) + .unwrap_or_default() +} + +// ============================================================================ +// merge_candidates +// ============================================================================ + +fn merge_candidates(storage: &Arc, args: Option) -> Result { + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + { + let a = obj(&args); + let limit = a.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as usize; + let tags: Vec = a + .get("tags") + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|t| t.as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_default(); + + let policy = storage.get_merge_policy().map_err(|e| e.to_string())?; + let candidates = storage + .merge_candidates(policy, limit, &tags) + .map_err(|e| e.to_string())?; + + let out: Vec = candidates + .iter() + .map(|c| { + json!({ + "memberIds": c.member_ids, + "previews": c.previews, + "survivorId": c.survivor_id, + "confidence": format!("{:.3}", c.confidence), + "classification": c.classification.as_str(), + "hasProtectedMember": c.has_protected_member, + "signals": { + "embeddingSimilarity": format!("{:.3}", c.signals.embedding_similarity), + "tagOverlap": format!("{:.3}", c.signals.tag_overlap), + "tokenOverlap": format!("{:.3}", c.signals.token_overlap), + "combinedScore": format!("{:.3}", c.signals.combined_score) + }, + "nextStep": if c.has_protected_member { + "A member is protected — unprotect it or pick it as survivor before plan_merge." + } else { + "Call plan_merge with these memberIds to preview the combined result." + } + }) + }) + .collect(); + + let policy = storage.get_merge_policy().map_err(|e| e.to_string())?; + Ok(json!({ + "candidates": out, + "totalCandidates": out.len(), + "policy": { + "matchThreshold": policy.match_threshold, + "possibleThreshold": policy.possible_threshold, + "autoApply": policy.auto_apply + }, + "note": "Nothing was changed. These are review candidates only." + })) + } + #[cfg(not(all(feature = "embeddings", feature = "vector-search")))] + { + let _ = (storage, args); + Ok(json!({ "error": "Embeddings feature not enabled.", "candidates": [] })) + } +} + +// ============================================================================ +// plan_merge +// ============================================================================ + +fn plan_merge(storage: &Arc, args: Option) -> Result { + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + { + let a = obj(&args); + let member_ids: Vec = a + .get("member_ids") + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|t| t.as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_default(); + if member_ids.len() < 2 { + return Err("member_ids must contain at least 2 ids".into()); + } + let survivor = a.get("survivor_id").and_then(|v| v.as_str()); + let policy = storage.get_merge_policy().map_err(|e| e.to_string())?; + let plan = storage + .plan_merge(&member_ids, survivor, policy) + .map_err(|e| e.to_string())?; + Ok(plan_to_json(&plan, &policy)) + } + #[cfg(not(all(feature = "embeddings", feature = "vector-search")))] + { + let _ = (storage, args); + Err("Embeddings feature not enabled.".into()) + } +} + +// ============================================================================ +// plan_supersede +// ============================================================================ + +fn plan_supersede(storage: &Arc, args: Option) -> Result { + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + { + let a = obj(&args); + let old_id = a + .get("old_id") + .and_then(|v| v.as_str()) + .ok_or("old_id is required")?; + let new_id = a + .get("new_id") + .and_then(|v| v.as_str()) + .ok_or("new_id is required")?; + let policy = storage.get_merge_policy().map_err(|e| e.to_string())?; + let plan = storage + .plan_supersede(old_id, new_id, policy) + .map_err(|e| e.to_string())?; + Ok(plan_to_json(&plan, &policy)) + } + #[cfg(not(all(feature = "embeddings", feature = "vector-search")))] + { + let _ = (storage, args); + Err("Embeddings feature not enabled.".into()) + } +} + +#[cfg(all(feature = "embeddings", feature = "vector-search"))] +fn plan_to_json(plan: &vestige_core::MergePlan, policy: &vestige_core::MergePolicy) -> Value { + let requires_confirm = plan.classification != vestige_core::MatchClass::Match || !policy.auto_apply; + json!({ + "planId": plan.id, + "kind": plan.kind.as_str(), + "survivorId": plan.survivor_id, + "memberIds": plan.member_ids, + "diff": { + "resultContent": plan.result_content, + "resultTags": plan.result_tags, + "resultSource": plan.result_source, + "invalidatedIds": plan.invalidated_ids + }, + "confidence": format!("{:.3}", plan.confidence), + "classification": plan.classification.as_str(), + "signals": { + "embeddingSimilarity": format!("{:.3}", plan.signals.embedding_similarity), + "tagOverlap": format!("{:.3}", plan.signals.tag_overlap), + "tokenOverlap": format!("{:.3}", plan.signals.token_overlap), + "combinedScore": format!("{:.3}", plan.signals.combined_score) + }, + "explanation": plan.explanation, + "requiresConfirm": requires_confirm, + "nextStep": format!( + "Review the diff. To execute: apply_plan with plan_id='{}'{}.", + plan.id, + if requires_confirm { " and confirm=true" } else { "" } + ), + "note": "Nothing was changed. This is a preview plan — apply_plan applies it; merge_undo reverses it." + }) +} + +// ============================================================================ +// apply_plan +// ============================================================================ + +fn apply_plan(storage: &Arc, args: Option) -> Result { + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + { + let a = obj(&args); + let plan_id = a + .get("plan_id") + .and_then(|v| v.as_str()) + .ok_or("plan_id is required")?; + let confirm = a.get("confirm").and_then(|v| v.as_bool()).unwrap_or(false); + let op = storage + .apply_plan(plan_id, confirm) + .map_err(|e| e.to_string())?; + Ok(json!({ + "operationId": op.id, + "opType": op.op_type, + "status": op.status, + "survivorId": op.survivor_id, + "affectedIds": op.affected_ids, + "reason": op.reason, + "appliedAt": op.created_at, + "reversible": true, + "nextStep": format!("To reverse this, call merge_undo with operation_id='{}'.", op.id), + "note": "Old memories were bitemporally invalidated (valid_until stamped), NOT deleted. They remain queryable for audit." + })) + } + #[cfg(not(all(feature = "embeddings", feature = "vector-search")))] + { + let _ = (storage, args); + Err("Embeddings feature not enabled.".into()) + } +} + +// ============================================================================ +// merge_undo (also lists the reflog when no id given) +// ============================================================================ + +fn merge_undo(storage: &Arc, args: Option) -> Result { + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + { + let a = obj(&args); + match a.get("operation_id").and_then(|v| v.as_str()) { + Some(op_id) => { + let op = storage.merge_undo(op_id).map_err(|e| e.to_string())?; + Ok(json!({ + "undoOperationId": op.id, + "revertedOperationId": op.reverts_op_id, + "status": "reverted", + "affectedIds": op.affected_ids, + "reason": op.reason, + "note": "The original operation was reversed: survivor content/tags restored and invalidation cleared. The plan is re-openable." + })) + } + None => { + // No id => return the reflog so the caller can pick one. + let ops = storage.list_merge_operations(20).map_err(|e| e.to_string())?; + let log: Vec = ops + .iter() + .map(|op| { + json!({ + "operationId": op.id, + "opType": op.op_type, + "status": op.status, + "survivorId": op.survivor_id, + "affectedIds": op.affected_ids, + "confidence": op.confidence.map(|c| format!("{:.3}", c)), + "reason": op.reason, + "createdAt": op.created_at, + "revertedAt": op.reverted_at + }) + }) + .collect(); + Ok(json!({ + "operations": log, + "totalOperations": log.len(), + "note": "This is the reversible operation log (the memory reflog). Pass operation_id to reverse one." + })) + } + } + } + #[cfg(not(all(feature = "embeddings", feature = "vector-search")))] + { + let _ = (storage, args); + Err("Embeddings feature not enabled.".into()) + } +} + +// ============================================================================ +// protect +// ============================================================================ + +fn protect(storage: &Arc, args: Option) -> Result { + let a = obj(&args); + let id = a + .get("id") + .and_then(|v| v.as_str()) + .ok_or("id is required")?; + let protected = a.get("protected").and_then(|v| v.as_bool()).unwrap_or(true); + storage + .set_protected(id, protected) + .map_err(|e| e.to_string())?; + Ok(json!({ + "id": id, + "protected": protected, + "note": if protected { + "Memory pinned. It can never be auto-merged, superseded, or garbage-collected until unprotected." + } else { + "Memory unprotected. It is now eligible for merge/supersede/forget again." + } + })) +} + +// ============================================================================ +// merge_policy (get when no args, set otherwise) +// ============================================================================ + +fn merge_policy(storage: &Arc, args: Option) -> Result { + let a = obj(&args); + let current = storage.get_merge_policy().map_err(|e| e.to_string())?; + + let has_update = a.contains_key("match_threshold") + || a.contains_key("possible_threshold") + || a.contains_key("auto_apply"); + + if has_update { + let match_t = a + .get("match_threshold") + .and_then(|v| v.as_f64()) + .map(|v| v as f32) + .unwrap_or(current.match_threshold); + let possible_t = a + .get("possible_threshold") + .and_then(|v| v.as_f64()) + .map(|v| v as f32) + .unwrap_or(current.possible_threshold); + let auto = a + .get("auto_apply") + .and_then(|v| v.as_bool()) + .unwrap_or(current.auto_apply); + let policy = vestige_core::MergePolicy::new(match_t, possible_t, auto); + storage.set_merge_policy(policy).map_err(|e| e.to_string())?; + Ok(json!({ + "updated": true, + "matchThreshold": policy.match_threshold, + "possibleThreshold": policy.possible_threshold, + "autoApply": policy.auto_apply, + "note": "Policy saved. Fellegi-Sunter: score>=match => auto-merge eligible; [possible,match) => review; below => not offered." + })) + } else { + Ok(json!({ + "matchThreshold": current.match_threshold, + "possibleThreshold": current.possible_threshold, + "autoApply": current.auto_apply, + "note": "Two-threshold merge policy. Pass match_threshold / possible_threshold / auto_apply to change it." + })) + } +} + +// ============================================================================ +// TESTS — see tests/merge_supersede_test.rs for full integration coverage. +// ============================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn schemas_are_objects() { + for s in [ + merge_candidates_schema(), + plan_merge_schema(), + plan_supersede_schema(), + apply_plan_schema(), + merge_undo_schema(), + protect_schema(), + merge_policy_schema(), + ] { + assert_eq!(s["type"], "object"); + } + } + + #[test] + fn plan_merge_requires_two_ids() { + assert!(plan_merge_schema()["required"] + .as_array() + .unwrap() + .iter() + .any(|v| v == "member_ids")); + } +} diff --git a/crates/vestige-mcp/src/tools/mod.rs b/crates/vestige-mcp/src/tools/mod.rs index 260869e..a2c3e24 100644 --- a/crates/vestige-mcp/src/tools/mod.rs +++ b/crates/vestige-mcp/src/tools/mod.rs @@ -24,6 +24,9 @@ pub mod maintenance; pub mod dedup; pub mod importance; +// v2.1.25: Merge / Supersede controls (Phase 3) +pub mod merge; + // v1.5: Cognitive tools pub mod dream; pub mod explore; diff --git a/docs/MERGE_SUPERSEDE.md b/docs/MERGE_SUPERSEDE.md new file mode 100644 index 0000000..a35fb06 --- /dev/null +++ b/docs/MERGE_SUPERSEDE.md @@ -0,0 +1,152 @@ +# Merge / Supersede Controls (Phase 3) + +> Diff-previewed, confidence-gated, reversible, self-explaining +> combine/dedupe/supersede on a never-delete (bitemporal) store. + +Memory systems accumulate duplicates, near-duplicates, and outdated facts. The +naive fixes are all bad: dumb hashing under-merges (misses paraphrases), +aggressive LLM merging over-merges and destroys the audit trail, and +auto-deleting on contradiction silently loses information. Vestige's Phase 3 +takes the opposite stance: + +- **Opt-in, never silent.** The default is preview/review. Nothing mutates your + memory unless you explicitly apply a plan. +- **Diff-previewed.** `plan_merge` / `plan_supersede` show exactly what *would* + change before anything does. +- **Confidence-gated.** A Fellegi-Sunter two-threshold score classifies each + candidate as `match` / `possible` / `non_match`. +- **Reversible.** Every applied operation is recorded with an undo payload — a + *git reflog for your agent's memory*. +- **Self-explaining.** Each candidate carries the signals that explain *why* two + memories were judged duplicates. +- **Audit-preserving.** Superseding does not delete: it stamps `valid_until` and + keeps the old memory queryable (Graphiti-style "invalidate, don't delete"). + +## The bitemporal model: invalidate, don't delete + +Superseding memory A with memory B does **not** erase A. Instead: + +- `A.valid_until` is stamped with the supersede time. +- `A.superseded_by` is set to `B.id` (a lineage pointer). +- A remains fully queryable for audit. Searches and timelines can still surface + it; it is simply marked as no longer the current truth. + +This reuses the existing `valid_from` / `valid_until` columns on +`knowledge_nodes` (migration V2) plus a new `superseded_by` column (migration +V14). Merges work the same way: the survivor absorbs the others' content, and +each absorbed node is bitemporally invalidated rather than deleted. + +## Fellegi-Sunter two-threshold scoring + +Candidate scoring combines three signals into a weighted score in `[0, 1]`: + +| Signal | Weight | Source | +| ----------------------- | -----: | ------------------------------------------ | +| Embedding cosine sim | 0.70 | stored embeddings (`node_embeddings`) | +| Tag overlap (Jaccard) | 0.15 | `knowledge_nodes.tags` | +| Content token overlap | 0.15 | Jaccard over content tokens (len > 2) | + +The combined score is then classified against **two** thresholds: + +``` +score >= match_threshold => "match" (auto-merge eligible) +possible_threshold <= score => "possible" (surfaced for review) +score < possible_threshold => "non_match" (never offered) +``` + +Defaults: `match_threshold = 0.86`, `possible_threshold = 0.72`. The two-band +design means borderline cases are surfaced for review instead of being +force-decided in either direction. + +A cluster's confidence is the **weakest** pairwise score within it (the loosest +link), so a cluster is only as confident as its least-similar member. + +## The reversible operation log (the "memory reflog") + +Every applied merge/supersede writes one row to `merge_operations`: + +- `op_type` — `merge` | `supersede` | `undo` +- `status` — `applied` | `reverted` +- `survivor_id`, `affected_ids` — what was touched +- `confidence`, `signals` — the score and *why* the memories combined +- `reason` — a human-readable explanation +- `undo_payload` — a JSON snapshot capturing everything needed to reverse it + +`merge_undo` consumes the undo payload to restore the survivor's prior +content/tags and clear the bitemporal invalidation on every affected node, then +records a compensating `undo` operation. Calling `merge_undo` with no +`operation_id` returns the operation log so you can pick one. + +## Memory protection (pinning) + +`protect` sets the `protected` flag on a memory. A protected memory: + +- is never offered for auto-merge (it is flagged in `merge_candidates`), +- cannot be merged *away* (it may only be the survivor of a merge), +- cannot be superseded, +- is excluded from garbage collection. + +Pass `protected: false` to unpin. + +## Tool surface + +| Tool | Mutates? | Purpose | +| ------------------ | :------: | ------------------------------------------------------------------------- | +| `merge_candidates` | No | Surface likely duplicate clusters with confidence + signals. | +| `plan_merge` | No | Preview a merge of 2+ memories (a diff). Returns a `plan_id`. | +| `plan_supersede` | No | Preview superseding A with B (bitemporal). Returns a `plan_id`. | +| `apply_plan` | **Yes** | Execute a plan by id; recorded as a reversible operation. | +| `merge_undo` | **Yes** | Reverse an operation, or list the operation log when given no id. | +| `protect` | **Yes** | Pin / unpin a memory so it can never be auto-merged/superseded/forgotten. | +| `merge_policy` | **Yes** | Get/set the two thresholds + `auto_apply`. | + +### Typical flow + +```text +1. merge_candidates -> review clusters + confidence + signals +2. plan_merge { member_ids: [...] } -> inspect the diff, get plan_id +3. apply_plan { plan_id, confirm } -> apply; get operation_id (reversible) +4. merge_undo { operation_id } -> reverse if it was wrong +``` + +`apply_plan` requires `confirm: true` for `possible` / `non_match` plans. A +`match` plan applies without `confirm` only when the policy has +`auto_apply: true` (default `false`). + +## Configuration + +The merge policy persists per project (stored in `fsrs_config`). It can also be +overridden via environment variables: + +| Variable | Meaning | +| ----------------------------------- | ------------------------------------ | +| `VESTIGE_MERGE_MATCH_THRESHOLD` | Score ≥ this ⇒ `match`. | +| `VESTIGE_MERGE_POSSIBLE_THRESHOLD` | Score ≥ this ⇒ at least `possible`. | +| `VESTIGE_MERGE_AUTO_APPLY` | `1`/`true` to allow auto-apply. | + +A persisted policy (set via `merge_policy`) takes precedence over the +environment, which takes precedence over the built-in defaults. When +`vestige.toml` configuration lands, the policy will read from there as well. + +## Schema (migration V14) + +- `knowledge_nodes.protected INTEGER NOT NULL DEFAULT 0` +- `knowledge_nodes.superseded_by TEXT` +- `merge_plans(id, kind, status, created_at, applied_at, survivor_id, + member_ids, confidence, classification, payload)` +- `merge_operations(id, plan_id, op_type, status, created_at, reverted_at, + reverts_op_id, survivor_id, affected_ids, confidence, signals, reason, + undo_payload)` + +The two `ALTER TABLE ... ADD COLUMN` statements are applied with duplicate-column +guards so the migration is idempotent on replay; the rest of V14 uses +`CREATE ... IF NOT EXISTS`. + +## Anti-patterns this design avoids + +- **Silently double-storing contradictions.** Merge composition attributes and + de-duplicates content instead of blindly concatenating or dropping it. +- **Auto-deleting on contradiction.** Supersede invalidates bitemporally; the + old memory is retained and queryable. +- **Trading away the audit trail for auto-merge convenience.** Every operation is + logged and reversible, with provenance for why memories combined. diff --git a/package.json b/package.json index 9d759a6..20bb78a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "vestige", - "version": "2.1.23", + "version": "2.1.25", "private": true, "description": "Cognitive memory for AI - MCP server with FSRS-6 spaced repetition", "author": "Sam Valladares", diff --git a/packages/vestige-init/package.json b/packages/vestige-init/package.json index dce6675..cb9b4f7 100644 --- a/packages/vestige-init/package.json +++ b/packages/vestige-init/package.json @@ -1,6 +1,6 @@ { "name": "@vestige/init", - "version": "2.1.23", + "version": "2.1.25", "description": "Configure Vestige local memory for MCP-compatible AI agents", "bin": { "vestige-init": "bin/init.js" diff --git a/packages/vestige-mcp-npm/package.json b/packages/vestige-mcp-npm/package.json index 1ff860f..a7e7829 100644 --- a/packages/vestige-mcp-npm/package.json +++ b/packages/vestige-mcp-npm/package.json @@ -1,6 +1,6 @@ { "name": "vestige-mcp-server", - "version": "2.1.23", + "version": "2.1.25", "mcpName": "io.github.samvallad33/vestige", "description": "Vestige MCP Server — local cognitive memory for MCP-compatible AI agents", "bin": { diff --git a/server.json b/server.json index e11c5a4..2b9a927 100644 --- a/server.json +++ b/server.json @@ -7,12 +7,12 @@ "url": "https://github.com/samvallad33/vestige", "source": "github" }, - "version": "2.1.23", + "version": "2.1.25", "packages": [ { "registryType": "npm", "identifier": "vestige-mcp-server", - "version": "2.1.23", + "version": "2.1.25", "transport": { "type": "stdio" }