From fb250207a3c90223ce62688bb6d9840d3400fba8 Mon Sep 17 00:00:00 2001 From: Sam Valladares Date: Fri, 1 May 2026 05:48:28 -0500 Subject: [PATCH] Fix embedding model upgrade consolidation Fixes #51 --- CHANGELOG.md | 1 + README.md | 2 +- crates/vestige-core/src/storage/sqlite.rs | 251 ++++++++++++---------- 3 files changed, 143 insertions(+), 111 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b370c3c..53dddd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - **Dream connection persistence cap** — dense single-domain dreams now persist every connection discovered in that run instead of losing everything beyond the old 1,000-entry live buffer. The live dreamer buffer now keeps up to 200,000 high-scoring recent connections, and the MCP `dream` tool exposes `min_similarity` for corpus-specific tuning. +- **Embedding-model upgrade repair** — `vestige consolidate` now re-embeds every missing or active-model-mismatched memory in one pass, so v1/v2 mixed stores are no longer left partially unreachable after only the first 100 legacy embeddings are regenerated. ## [2.1.1] - 2026-05-01 — "Portable Sync" diff --git a/README.md b/README.md index ff34566..334c3b6 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ v2.1.1 focuses on the biggest post-launch ask: move memories between machines wi - **Exact portable archives.** `vestige portable-export` / `vestige portable-import` preserve IDs, FSRS state, graph edges, suppression state, audit rows, and embedding blobs for Vestige-to-Vestige device transfer. - **Sync-safe merge storage.** `vestige portable-import --merge` and `vestige sync ` merge non-empty databases, apply delete tombstones, keep newer local memories, rebuild FTS, and push through a pluggable portable-sync backend. v2.1.1 ships the file backend for Dropbox, iCloud, Syncthing, Git, and shared folders. -- **Qwen3 embeddings.** Build with `qwen3-embeddings`, set `VESTIGE_EMBEDDING_MODEL=qwen3-0.6b`, and run `vestige consolidate` to re-embed existing memories. +- **Qwen3 embeddings.** Build with `qwen3-embeddings`, set `VESTIGE_EMBEDDING_MODEL=qwen3-0.6b`, and run `vestige consolidate` to re-embed existing memories. `vestige health` reports mixed-model stores before search quality is affected. - **Model-aware retrieval.** Vestige now avoids comparing Qwen and Nomic vectors in the same search/dedup path. ## What's New in v2.1.0 "Cognitive Sandwich Goes Local" diff --git a/crates/vestige-core/src/storage/sqlite.rs b/crates/vestige-core/src/storage/sqlite.rs index fbe3c7c..ddc17dd 100644 --- a/crates/vestige-core/src/storage/sqlite.rs +++ b/crates/vestige-core/src/storage/sqlite.rs @@ -2409,73 +2409,7 @@ impl Storage { let mut result = EmbeddingResult::default(); let active_model = self.embedding_service.model_name(); - let model_pattern = Self::active_embedding_model_like_pattern(active_model); - - let nodes: Vec<(String, String, Option)> = { - let reader = self - .reader - .lock() - .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; - if let Some(ids) = node_ids { - let placeholders = ids.iter().map(|_| "?").collect::>().join(","); - let query = format!( - "SELECT kn.id, kn.content, COALESCE(ne.model, kn.embedding_model) AS embedding_model - FROM knowledge_nodes kn - LEFT JOIN node_embeddings ne ON ne.node_id = kn.id - WHERE kn.id IN ({})", - placeholders - ); - - let mut result_nodes = Vec::new(); - { - let mut stmt = reader.prepare(&query)?; - let params: Vec<&dyn rusqlite::ToSql> = - ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); - - let rows = stmt.query_map(params.as_slice(), |row| { - Ok(( - row.get::<_, String>(0)?, - row.get::<_, String>(1)?, - row.get::<_, Option>(2)?, - )) - })?; - - for r in rows.flatten() { - result_nodes.push(r); - } - } - result_nodes - } else if force { - let mut stmt = - reader.prepare("SELECT id, content, embedding_model FROM knowledge_nodes")?; - let rows = stmt.query_map([], |row| { - Ok(( - row.get::<_, String>(0)?, - row.get::<_, String>(1)?, - row.get::<_, Option>(2)?, - )) - })?; - rows.filter_map(|r| r.ok()).collect() - } else { - let mut stmt = reader.prepare( - "SELECT kn.id, kn.content, COALESCE(ne.model, kn.embedding_model) AS embedding_model - FROM knowledge_nodes kn - LEFT JOIN node_embeddings ne ON ne.node_id = kn.id - WHERE kn.has_embedding = 0 - OR kn.has_embedding IS NULL - OR ne.node_id IS NULL - OR COALESCE(ne.model, kn.embedding_model, '') NOT LIKE ?1", - )?; - let rows = stmt.query_map(params![model_pattern], |row| { - Ok(( - row.get::<_, String>(0)?, - row.get::<_, String>(1)?, - row.get::<_, Option>(2)?, - )) - })?; - rows.filter_map(|r| r.ok()).collect() - } - }; + let nodes = self.embedding_regeneration_candidates(node_ids, force)?; for (id, content, stored_model) in nodes { if !force { @@ -2491,7 +2425,7 @@ impl Storage { params![&id], |row| Ok((row.get(0)?, row.get(1)?)), ) - .unwrap_or_else(|_| (0, stored_model)); + .unwrap_or((0, stored_model)); if has_emb == 1 && stored_model.as_deref().is_some_and(|model| { @@ -2515,6 +2449,78 @@ impl Storage { Ok(result) } + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + fn embedding_regeneration_candidates( + &self, + node_ids: Option<&[String]>, + force: bool, + ) -> Result)>> { + let reader = self + .reader + .lock() + .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; + + if let Some(ids) = node_ids { + if ids.is_empty() { + return Ok(Vec::new()); + } + + let placeholders = ids.iter().map(|_| "?").collect::>().join(","); + let query = format!( + "SELECT kn.id, kn.content, COALESCE(ne.model, kn.embedding_model) AS embedding_model + FROM knowledge_nodes kn + LEFT JOIN node_embeddings ne ON ne.node_id = kn.id + WHERE kn.id IN ({})", + placeholders + ); + + let mut stmt = reader.prepare(&query)?; + let params: Vec<&dyn rusqlite::ToSql> = + ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect(); + let rows = stmt.query_map(params.as_slice(), |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, Option>(2)?, + )) + })?; + return Ok(rows.filter_map(|r| r.ok()).collect()); + } + + if force { + let mut stmt = + reader.prepare("SELECT id, content, embedding_model FROM knowledge_nodes")?; + let rows = stmt.query_map([], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, Option>(2)?, + )) + })?; + return Ok(rows.filter_map(|r| r.ok()).collect()); + } + + let active_model = self.embedding_service.model_name(); + let model_pattern = Self::active_embedding_model_like_pattern(active_model); + let mut stmt = reader.prepare( + "SELECT kn.id, kn.content, COALESCE(ne.model, kn.embedding_model) AS embedding_model + FROM knowledge_nodes kn + LEFT JOIN node_embeddings ne ON ne.node_id = kn.id + WHERE kn.has_embedding = 0 + OR kn.has_embedding IS NULL + OR ne.node_id IS NULL + OR COALESCE(ne.model, kn.embedding_model, '') NOT LIKE ?1", + )?; + let rows = stmt.query_map(params![model_pattern], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, Option>(2)?, + )) + })?; + Ok(rows.filter_map(|r| r.ok()).collect()) + } + /// Query memories valid at a specific time pub fn query_at_time( &self, @@ -2788,7 +2794,8 @@ impl Storage { } } - // 3. Generate missing embeddings + // 3. Generate missing and model-mismatched embeddings. + // This must drain the whole set so embedder upgrades do not strand v1 corpora. #[cfg(all(feature = "embeddings", feature = "vector-search"))] let embeddings_generated = self.generate_missing_embeddings()?; #[cfg(not(all(feature = "embeddings", feature = "vector-search")))] @@ -3342,7 +3349,7 @@ impl Storage { Ok(Some(optimized_w20)) } - /// Generate missing embeddings + /// Generate all missing or active-model-mismatched embeddings. #[cfg(all(feature = "embeddings", feature = "vector-search"))] fn generate_missing_embeddings(&self) -> Result { if !self.embedding_service.is_ready() @@ -3352,41 +3359,15 @@ impl Storage { return Ok(0); } - let active_model = self.embedding_service.model_name(); - let model_pattern = Self::active_embedding_model_like_pattern(active_model); - - let nodes: Vec<(String, String)> = { - let reader = self - .reader - .lock() - .map_err(|_| StorageError::Init("Reader lock poisoned".into()))?; - reader - .prepare( - "SELECT kn.id, kn.content - FROM knowledge_nodes kn - LEFT JOIN node_embeddings ne ON ne.node_id = kn.id - WHERE kn.has_embedding = 0 - OR kn.has_embedding IS NULL - OR ne.node_id IS NULL - OR COALESCE(ne.model, kn.embedding_model, '') NOT LIKE ?1 - LIMIT 100", - )? - .query_map(params![model_pattern], |row| Ok((row.get(0)?, row.get(1)?)))? - .filter_map(|r| r.ok()) - .collect() - }; - - let mut count = 0i64; - - for (id, content) in nodes { - if let Err(e) = self.generate_embedding_for_node(&id, &content) { - tracing::warn!("Failed to generate embedding for {}: {}", id, e); - } else { - count += 1; - } + let result = self.generate_embeddings(None, false)?; + if result.failed > 0 { + tracing::warn!( + failed = result.failed, + "Some embeddings could not be regenerated during consolidation" + ); } - Ok(count) + Ok(result.successful) } } @@ -4308,7 +4289,7 @@ impl Storage { let mut latest: Option> = None; - if let Ok(entries) = std::fs::read_dir(&backup_dir) { + if let Ok(entries) = std::fs::read_dir(backup_dir) { for entry in entries.flatten() { let name = entry.file_name(); let name_str = name.to_string_lossy(); @@ -4730,12 +4711,11 @@ impl Storage { .as_deref() .and_then(Self::parse_rfc3339_opt), incoming_updated, - ) { - if existing > incoming { - report.conflicts_kept_local += 1; - report.rows_skipped += 1; - continue; - } + ) && existing > incoming + { + report.conflicts_kept_local += 1; + report.rows_skipped += 1; + continue; } let affected = Self::insert_or_replace_row(tx, "knowledge_nodes", table, row)?; @@ -5656,6 +5636,57 @@ mod tests { ); } + #[cfg(all(feature = "embeddings", feature = "vector-search"))] + #[test] + fn test_embedding_regeneration_candidates_include_entire_mismatched_corpus() { + let storage = create_test_storage(); + let stale_model = "all-MiniLM-L6-v2"; + let stale_embedding = Embedding::new(vec![0.0; EMBEDDING_DIMENSIONS]).to_bytes(); + + for i in 0..125 { + let node = storage + .ingest(IngestInput { + content: format!("legacy embedded memory {}", i), + node_type: "fact".to_string(), + ..Default::default() + }) + .unwrap(); + + let writer = storage.writer.lock().unwrap(); + writer + .execute( + "INSERT OR REPLACE INTO node_embeddings + (node_id, embedding, dimensions, model, created_at) + VALUES (?1, ?2, ?3, ?4, ?5)", + rusqlite::params![ + &node.id, + &stale_embedding, + EMBEDDING_DIMENSIONS as i32, + stale_model, + Utc::now().to_rfc3339() + ], + ) + .unwrap(); + writer + .execute( + "UPDATE knowledge_nodes + SET has_embedding = 1, embedding_model = ?2 + WHERE id = ?1", + rusqlite::params![&node.id, stale_model], + ) + .unwrap(); + } + + let stats = storage.get_stats().unwrap(); + assert_eq!(stats.nodes_with_mismatched_embeddings, 125); + assert_eq!(stats.nodes_with_active_embeddings, 0); + + let candidates = storage + .embedding_regeneration_candidates(None, false) + .unwrap(); + assert_eq!(candidates.len(), 125); + } + #[test] fn test_storage_creation() { let storage = create_test_storage();