diff --git a/Cargo.lock b/Cargo.lock index 3d6af25..3a6b08b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3522,6 +3522,7 @@ dependencies = [ "colored", "directories", "rmcp", + "rusqlite", "serde", "serde_json", "tempfile", diff --git a/crates/vestige-core/Cargo.toml b/crates/vestige-core/Cargo.toml index a5eab35..01fc79b 100644 --- a/crates/vestige-core/Cargo.toml +++ b/crates/vestige-core/Cargo.toml @@ -11,7 +11,15 @@ keywords = ["memory", "spaced-repetition", "fsrs", "embeddings", "knowledge-grap categories = ["science", "database"] [features] -default = ["embeddings", "vector-search"] +default = ["embeddings", "vector-search", "bundled-sqlite"] + +# SQLite backend (default, unencrypted) +bundled-sqlite = ["rusqlite/bundled"] + +# Encrypted SQLite via SQLCipher (mutually exclusive with bundled-sqlite) +# Use: --no-default-features --features encryption,embeddings,vector-search +# Set VESTIGE_ENCRYPTION_KEY env var to enable encryption +encryption = ["rusqlite/bundled-sqlcipher"] # Core embeddings with fastembed (ONNX-based, local inference) embeddings = ["dep:fastembed"] @@ -40,7 +48,8 @@ uuid = { version = "1", features = ["v4", "serde"] } thiserror = "2" # Database - SQLite with FTS5 full-text search and JSON -rusqlite = { version = "0.38", features = ["bundled", "chrono", "serde_json"] } +# Note: "bundled" or "bundled-sqlcipher" added via feature flags above +rusqlite = { version = "0.38", features = ["chrono", "serde_json"] } # Platform-specific directories directories = "6" diff --git a/crates/vestige-core/src/storage/sqlite.rs b/crates/vestige-core/src/storage/sqlite.rs index 336aeec..345ce06 100644 --- a/crates/vestige-core/src/storage/sqlite.rs +++ b/crates/vestige-core/src/storage/sqlite.rs @@ -105,6 +105,16 @@ impl Storage { let conn = Connection::open(&path)?; + // Apply encryption key if SQLCipher is enabled and key is provided + #[cfg(feature = "encryption")] + { + if let Ok(key) = std::env::var("VESTIGE_ENCRYPTION_KEY") { + if !key.is_empty() { + conn.pragma_update(None, "key", &key)?; + } + } + } + // Configure SQLite for performance conn.execute_batch( "PRAGMA journal_mode = WAL; @@ -1431,61 +1441,81 @@ impl Storage { Ok(result) } - /// Apply decay to all memories + /// Apply decay to all memories using batched pagination to avoid OOM. + /// + /// Instead of loading all knowledge_nodes into memory at once, this + /// processes rows in fixed-size batches (BATCH_SIZE = 500) using + /// LIMIT/OFFSET pagination. Each batch runs inside its own transaction + /// for atomicity without holding a giant write-lock. pub fn apply_decay(&mut self) -> Result { const FSRS_DECAY: f64 = 0.5; const FSRS_FACTOR: f64 = 9.0; + const BATCH_SIZE: i64 = 500; let now = Utc::now(); + let mut count = 0i32; + let mut offset = 0i64; - let mut stmt = self.conn.prepare( - "SELECT id, last_accessed, storage_strength, retrieval_strength, - sentiment_magnitude, stability - FROM knowledge_nodes", - )?; + loop { + let batch: Vec<(String, String, f64, f64, f64, f64)> = self + .conn + .prepare( + "SELECT id, last_accessed, storage_strength, retrieval_strength, + sentiment_magnitude, stability + FROM knowledge_nodes + ORDER BY id + LIMIT ?1 OFFSET ?2", + )? + .query_map(params![BATCH_SIZE, offset], |row| { + Ok(( + row.get(0)?, + row.get(1)?, + row.get(2)?, + row.get(3)?, + row.get(4)?, + row.get(5)?, + )) + })? + .filter_map(|r| r.ok()) + .collect(); - let nodes: Vec<(String, String, f64, f64, f64, f64)> = stmt - .query_map([], |row| { - Ok(( - row.get(0)?, - row.get(1)?, - row.get(2)?, - row.get(3)?, - row.get(4)?, - row.get(5)?, - )) - })? - .filter_map(|r| r.ok()) - .collect(); - - let mut count = 0; - - for (id, last_accessed, storage_strength, _, sentiment_mag, stability) in nodes { - let last = DateTime::parse_from_rfc3339(&last_accessed) - .map(|dt| dt.with_timezone(&Utc)) - .unwrap_or(now); - - let days_since = (now - last).num_seconds() as f64 / 86400.0; - - if days_since > 0.0 { - let effective_stability = stability * (1.0 + sentiment_mag * 0.5); - - let new_retrieval = (1.0 + days_since / (FSRS_FACTOR * effective_stability)) - .powf(-1.0 / FSRS_DECAY); - - let new_retention = - (new_retrieval * 0.7) + ((storage_strength / 10.0).min(1.0) * 0.3); - - self.conn.execute( - "UPDATE knowledge_nodes SET - retrieval_strength = ?1, - retention_strength = ?2 - WHERE id = ?3", - params![new_retrieval, new_retention, id], - )?; - - count += 1; + if batch.is_empty() { + break; } + + let batch_len = batch.len() as i64; + + // Use a transaction for the batch + let tx = self.conn.transaction()?; + + for (id, last_accessed, storage_strength, _, sentiment_mag, stability) in &batch { + let last = DateTime::parse_from_rfc3339(last_accessed) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or(now); + + let days_since = (now - last).num_seconds() as f64 / 86400.0; + + if days_since > 0.0 { + let effective_stability = stability * (1.0 + sentiment_mag * 0.5); + + let new_retrieval = + (1.0 + days_since / (FSRS_FACTOR * effective_stability)) + .powf(-1.0 / FSRS_DECAY); + + let new_retention = + (new_retrieval * 0.7) + ((storage_strength / 10.0).min(1.0) * 0.3); + + tx.execute( + "UPDATE knowledge_nodes SET retrieval_strength = ?1, retention_strength = ?2 WHERE id = ?3", + params![new_retrieval, new_retention, id], + )?; + + count += 1; + } + } + + tx.commit()?; + offset += batch_len; } Ok(count) diff --git a/crates/vestige-mcp/Cargo.toml b/crates/vestige-mcp/Cargo.toml index f126f59..3e5d37e 100644 --- a/crates/vestige-mcp/Cargo.toml +++ b/crates/vestige-mcp/Cargo.toml @@ -68,5 +68,8 @@ rmcp = "0.14" clap = { version = "4", features = ["derive"] } colored = "3" +# SQLite (for backup WAL checkpoint) +rusqlite = { version = "0.38", features = ["bundled"] } + [dev-dependencies] tempfile = "3" diff --git a/crates/vestige-mcp/src/bin/cli.rs b/crates/vestige-mcp/src/bin/cli.rs index 5136f0d..a32cad6 100644 --- a/crates/vestige-mcp/src/bin/cli.rs +++ b/crates/vestige-mcp/src/bin/cli.rs @@ -2,10 +2,13 @@ //! //! Command-line interface for managing cognitive memory system. +use std::io::{BufWriter, Write}; use std::path::PathBuf; +use chrono::{NaiveDate, Utc}; use clap::{Parser, Subcommand}; use colored::Colorize; +use directories::ProjectDirs; use vestige_core::{IngestInput, Storage}; /// Vestige - Cognitive Memory System CLI @@ -44,6 +47,43 @@ enum Commands { /// Path to backup JSON file file: PathBuf, }, + + /// Create a full backup of the SQLite database + Backup { + /// Output file path for the backup + output: PathBuf, + }, + + /// Export memories in JSON or JSONL format + Export { + /// Output file path + output: PathBuf, + /// Export format: json or jsonl + #[arg(long, default_value = "json")] + format: String, + /// Filter by tags (comma-separated) + #[arg(long)] + tags: Option, + /// Only export memories created after this date (YYYY-MM-DD) + #[arg(long)] + since: Option, + }, + + /// Garbage collect stale memories below retention threshold + Gc { + /// Minimum retention strength to keep (delete below this) + #[arg(long, default_value = "0.1")] + min_retention: f64, + /// Maximum age in days (delete memories older than this AND below retention threshold) + #[arg(long)] + max_age_days: Option, + /// Dry run - show what would be deleted without actually deleting + #[arg(long)] + dry_run: bool, + /// Skip confirmation prompt + #[arg(long)] + yes: bool, + }, } fn main() -> anyhow::Result<()> { @@ -54,6 +94,19 @@ fn main() -> anyhow::Result<()> { Commands::Health => run_health(), Commands::Consolidate => run_consolidate(), Commands::Restore { file } => run_restore(file), + Commands::Backup { output } => run_backup(output), + Commands::Export { + output, + format, + tags, + since, + } => run_export(output, format, tags, since), + Commands::Gc { + min_retention, + max_age_days, + dry_run, + yes, + } => run_gc(min_retention, max_age_days, dry_run, yes), } } @@ -424,6 +477,360 @@ fn run_restore(backup_path: PathBuf) -> anyhow::Result<()> { Ok(()) } +/// Get the default database path +fn get_default_db_path() -> anyhow::Result { + let proj_dirs = ProjectDirs::from("com", "vestige", "core") + .ok_or_else(|| anyhow::anyhow!("Could not determine project directories"))?; + Ok(proj_dirs.data_dir().join("vestige.db")) +} + +/// Fetch all nodes from storage using pagination +fn fetch_all_nodes(storage: &Storage) -> anyhow::Result> { + let mut all_nodes = Vec::new(); + let page_size = 500; + let mut offset = 0; + + loop { + let batch = storage.get_all_nodes(page_size, offset)?; + let batch_len = batch.len(); + all_nodes.extend(batch); + if batch_len < page_size as usize { + break; + } + offset += page_size; + } + + Ok(all_nodes) +} + +/// Run backup command - copies the SQLite database file +fn run_backup(output: PathBuf) -> anyhow::Result<()> { + println!("{}", "=== Vestige Backup ===".cyan().bold()); + println!(); + + let db_path = get_default_db_path()?; + + if !db_path.exists() { + anyhow::bail!("Database not found at: {}", db_path.display()); + } + + // Open storage to flush WAL before copying + println!("Flushing WAL checkpoint..."); + { + let storage = Storage::new(None)?; + // get_stats triggers a read so the connection is active, then drop flushes + let _ = storage.get_stats()?; + } + + // Also flush WAL directly via a separate connection for safety + { + let conn = rusqlite::Connection::open(&db_path)?; + conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?; + } + + // Create parent directories if needed + if let Some(parent) = output.parent() { + if !parent.exists() { + std::fs::create_dir_all(parent)?; + } + } + + // Copy the database file + println!("Copying database..."); + println!(" {} {}", "From:".dimmed(), db_path.display()); + println!(" {} {}", "To:".dimmed(), output.display()); + + std::fs::copy(&db_path, &output)?; + + let file_size = std::fs::metadata(&output)?.len(); + let size_display = if file_size >= 1024 * 1024 { + format!("{:.2} MB", file_size as f64 / (1024.0 * 1024.0)) + } else if file_size >= 1024 { + format!("{:.1} KB", file_size as f64 / 1024.0) + } else { + format!("{} bytes", file_size) + }; + + println!(); + println!( + "{}", + format!("Backup complete: {} ({})", output.display(), size_display) + .green() + .bold() + ); + + Ok(()) +} + +/// Run export command - exports memories in JSON or JSONL format +fn run_export( + output: PathBuf, + format: String, + tags: Option, + since: Option, +) -> anyhow::Result<()> { + println!("{}", "=== Vestige Export ===".cyan().bold()); + println!(); + + // Validate format + if format != "json" && format != "jsonl" { + anyhow::bail!("Invalid format '{}'. Must be 'json' or 'jsonl'.", format); + } + + // Parse since date if provided + let since_date = match &since { + Some(date_str) => { + let naive = NaiveDate::parse_from_str(date_str, "%Y-%m-%d") + .map_err(|e| anyhow::anyhow!("Invalid date '{}': {}. Use YYYY-MM-DD format.", date_str, e))?; + Some( + naive + .and_hms_opt(0, 0, 0) + .expect("midnight is always valid") + .and_utc(), + ) + } + None => None, + }; + + // Parse tags filter + let tag_filter: Vec = tags + .as_deref() + .map(|t| t.split(',').map(|s| s.trim().to_string()).filter(|s| !s.is_empty()).collect()) + .unwrap_or_default(); + + let storage = Storage::new(None)?; + let all_nodes = fetch_all_nodes(&storage)?; + + // Apply filters + let filtered: Vec<&vestige_core::KnowledgeNode> = all_nodes + .iter() + .filter(|node| { + // Date filter + if let Some(ref since_dt) = since_date { + if node.created_at < *since_dt { + return false; + } + } + // Tag filter: node must contain ALL specified tags + if !tag_filter.is_empty() { + for tag in &tag_filter { + if !node.tags.iter().any(|t| t == tag) { + return false; + } + } + } + true + }) + .collect(); + + println!("{}: {}", "Format".white().bold(), format); + if !tag_filter.is_empty() { + println!("{}: {}", "Tag filter".white().bold(), tag_filter.join(", ")); + } + if let Some(ref date_str) = since { + println!("{}: {}", "Since".white().bold(), date_str); + } + println!( + "{}: {} / {} total", + "Matching".white().bold(), + filtered.len(), + all_nodes.len() + ); + println!(); + + // Create parent directories if needed + if let Some(parent) = output.parent() { + if !parent.exists() { + std::fs::create_dir_all(parent)?; + } + } + + let file = std::fs::File::create(&output)?; + let mut writer = BufWriter::new(file); + + match format.as_str() { + "json" => { + serde_json::to_writer_pretty(&mut writer, &filtered)?; + writer.write_all(b"\n")?; + } + "jsonl" => { + for node in &filtered { + serde_json::to_writer(&mut writer, node)?; + writer.write_all(b"\n")?; + } + } + _ => unreachable!(), + } + + writer.flush()?; + + let file_size = std::fs::metadata(&output)?.len(); + let size_display = if file_size >= 1024 * 1024 { + format!("{:.2} MB", file_size as f64 / (1024.0 * 1024.0)) + } else if file_size >= 1024 { + format!("{:.1} KB", file_size as f64 / 1024.0) + } else { + format!("{} bytes", file_size) + }; + + println!( + "{}", + format!( + "Exported {} memories to {} ({}, {})", + filtered.len(), + output.display(), + format, + size_display + ) + .green() + .bold() + ); + + Ok(()) +} + +/// Run garbage collection command +fn run_gc( + min_retention: f64, + max_age_days: Option, + dry_run: bool, + yes: bool, +) -> anyhow::Result<()> { + println!("{}", "=== Vestige Garbage Collection ===".cyan().bold()); + println!(); + + let mut storage = Storage::new(None)?; + let all_nodes = fetch_all_nodes(&storage)?; + let now = Utc::now(); + + // Find candidates for deletion + let candidates: Vec<&vestige_core::KnowledgeNode> = all_nodes + .iter() + .filter(|node| { + // Must be below retention threshold + if node.retention_strength >= min_retention { + return false; + } + // If max_age_days specified, must also be older than that + if let Some(max_days) = max_age_days { + let age_days = (now - node.created_at).num_days(); + if age_days < 0 || (age_days as u64) < max_days { + return false; + } + } + true + }) + .collect(); + + println!("{}: {}", "Min retention threshold".white().bold(), min_retention); + if let Some(max_days) = max_age_days { + println!("{}: {} days", "Max age".white().bold(), max_days); + } + println!( + "{}: {} / {} total", + "Candidates for deletion".white().bold(), + candidates.len(), + all_nodes.len() + ); + + if candidates.is_empty() { + println!(); + println!("{}", "No memories match the garbage collection criteria.".green()); + return Ok(()); + } + + // Show sample of what would be deleted + println!(); + println!("{}", "Sample of memories to be removed:".yellow().bold()); + let sample_count = candidates.len().min(10); + for node in candidates.iter().take(sample_count) { + let age_days = (now - node.created_at).num_days(); + println!( + " {} [ret={:.3}, age={}d] {}", + node.id[..8].dimmed(), + node.retention_strength, + age_days, + truncate(&node.content, 60).dimmed() + ); + } + if candidates.len() > sample_count { + println!( + " {} ... and {} more", + "".dimmed(), + candidates.len() - sample_count + ); + } + + if dry_run { + println!(); + println!( + "{}", + format!( + "Dry run: {} memories would be deleted. Re-run without --dry-run to delete.", + candidates.len() + ) + .yellow() + .bold() + ); + return Ok(()); + } + + // Confirmation prompt (unless --yes) + if !yes { + println!(); + print!( + "{} Delete {} memories? This cannot be undone. [y/N] ", + "WARNING:".red().bold(), + candidates.len() + ); + std::io::stdout().flush()?; + + let mut input = String::new(); + std::io::stdin().read_line(&mut input)?; + let input = input.trim().to_lowercase(); + + if input != "y" && input != "yes" { + println!("{}", "Aborted.".yellow()); + return Ok(()); + } + } + + // Perform deletion + let mut deleted = 0; + let mut errors = 0; + let total_candidates = candidates.len(); + + for node in &candidates { + match storage.delete_node(&node.id) { + Ok(true) => deleted += 1, + Ok(false) => errors += 1, // node was already gone + Err(e) => { + eprintln!(" {} Failed to delete {}: {}", "ERR".red(), &node.id[..8], e); + errors += 1; + } + } + } + + println!(); + println!( + "{}", + format!( + "Garbage collection complete: {}/{} memories deleted{}", + deleted, + total_candidates, + if errors > 0 { + format!(" ({} errors)", errors) + } else { + String::new() + } + ) + .green() + .bold() + ); + + Ok(()) +} + /// Truncate a string for display (UTF-8 safe) fn truncate(s: &str, max_chars: usize) -> String { let s = s.replace('\n', " "); diff --git a/crates/vestige-mcp/src/main.rs b/crates/vestige-mcp/src/main.rs index ef16924..4f5ebf4 100644 --- a/crates/vestige-mcp/src/main.rs +++ b/crates/vestige-mcp/src/main.rs @@ -36,7 +36,7 @@ use std::io; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{error, info, Level}; +use tracing::{error, info, warn, Level}; use tracing_subscriber::EnvFilter; // Use vestige-core for the cognitive science engine @@ -156,6 +156,58 @@ async fn main() { } }; + // Spawn background auto-consolidation so FSRS-6 decay scores stay fresh. + // Runs only if the last consolidation was more than 6 hours ago. + { + let storage_clone = storage.clone(); + tokio::spawn(async move { + // Small delay so we don't block server startup / stdio handshake + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + let mut storage = storage_clone.lock().await; + + // Check whether consolidation is actually needed + let should_run = match storage.get_last_consolidation() { + Ok(Some(last)) => { + let elapsed = chrono::Utc::now() - last; + let stale = elapsed > chrono::Duration::hours(6); + if !stale { + info!( + last_consolidation = %last, + "Skipping auto-consolidation (last run was < 6 hours ago)" + ); + } + stale + } + Ok(None) => { + info!("No previous consolidation found — running first auto-consolidation"); + true + } + Err(e) => { + warn!("Could not read consolidation history: {} — running anyway", e); + true + } + }; + + if should_run { + match storage.run_consolidation() { + Ok(result) => { + info!( + nodes_processed = result.nodes_processed, + decay_applied = result.decay_applied, + embeddings_generated = result.embeddings_generated, + duration_ms = result.duration_ms, + "Auto-consolidation complete" + ); + } + Err(e) => { + warn!("Auto-consolidation failed: {}", e); + } + } + } + }); + } + // Create MCP server let server = McpServer::new(storage); diff --git a/crates/vestige-mcp/src/server.rs b/crates/vestige-mcp/src/server.rs index ba841f5..76f48eb 100644 --- a/crates/vestige-mcp/src/server.rs +++ b/crates/vestige-mcp/src/server.rs @@ -502,6 +502,19 @@ impl McpServer { description: Some("Remembered architectural and design decisions".to_string()), mime_type: Some("application/json".to_string()), }, + // Consolidation resources + ResourceDescription { + uri: "memory://insights".to_string(), + name: "Consolidation Insights".to_string(), + description: Some("Insights generated during memory consolidation".to_string()), + mime_type: Some("application/json".to_string()), + }, + ResourceDescription { + uri: "memory://consolidation-log".to_string(), + name: "Consolidation Log".to_string(), + description: Some("History of memory consolidation runs".to_string()), + mime_type: Some("application/json".to_string()), + }, // Prospective memory resources ResourceDescription { uri: "memory://intentions".to_string(),