Fix embedding model upgrade consolidation
Some checks are pending
CI / Test (macos-latest) (push) Waiting to run
CI / Test (ubuntu-latest) (push) Waiting to run
CI / Release Build (aarch64-apple-darwin) (push) Blocked by required conditions
CI / Release Build (x86_64-unknown-linux-gnu) (push) Blocked by required conditions
CI / Release Build (x86_64-apple-darwin) (push) Blocked by required conditions
Test Suite / Unit Tests (push) Waiting to run
Test Suite / MCP E2E Tests (push) Waiting to run
Test Suite / User Journey Tests (push) Blocked by required conditions
Test Suite / Dashboard Build (push) Waiting to run
Test Suite / Code Coverage (push) Waiting to run

Fixes #51
This commit is contained in:
Sam Valladares 2026-05-01 05:48:28 -05:00
parent 4e9e11ac0b
commit fb250207a3
3 changed files with 143 additions and 111 deletions

View file

@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- **Dream connection persistence cap** — dense single-domain dreams now persist every connection discovered in that run instead of losing everything beyond the old 1,000-entry live buffer. The live dreamer buffer now keeps up to 200,000 high-scoring recent connections, and the MCP `dream` tool exposes `min_similarity` for corpus-specific tuning.
- **Embedding-model upgrade repair**`vestige consolidate` now re-embeds every missing or active-model-mismatched memory in one pass, so v1/v2 mixed stores are no longer left partially unreachable after only the first 100 legacy embeddings are regenerated.
## [2.1.1] - 2026-05-01 — "Portable Sync"

View file

@ -26,7 +26,7 @@ v2.1.1 focuses on the biggest post-launch ask: move memories between machines wi
- **Exact portable archives.** `vestige portable-export` / `vestige portable-import` preserve IDs, FSRS state, graph edges, suppression state, audit rows, and embedding blobs for Vestige-to-Vestige device transfer.
- **Sync-safe merge storage.** `vestige portable-import --merge` and `vestige sync <archive>` merge non-empty databases, apply delete tombstones, keep newer local memories, rebuild FTS, and push through a pluggable portable-sync backend. v2.1.1 ships the file backend for Dropbox, iCloud, Syncthing, Git, and shared folders.
- **Qwen3 embeddings.** Build with `qwen3-embeddings`, set `VESTIGE_EMBEDDING_MODEL=qwen3-0.6b`, and run `vestige consolidate` to re-embed existing memories.
- **Qwen3 embeddings.** Build with `qwen3-embeddings`, set `VESTIGE_EMBEDDING_MODEL=qwen3-0.6b`, and run `vestige consolidate` to re-embed existing memories. `vestige health` reports mixed-model stores before search quality is affected.
- **Model-aware retrieval.** Vestige now avoids comparing Qwen and Nomic vectors in the same search/dedup path.
## What's New in v2.1.0 "Cognitive Sandwich Goes Local"

View file

@ -2409,73 +2409,7 @@ impl Storage {
let mut result = EmbeddingResult::default();
let active_model = self.embedding_service.model_name();
let model_pattern = Self::active_embedding_model_like_pattern(active_model);
let nodes: Vec<(String, String, Option<String>)> = {
let reader = self
.reader
.lock()
.map_err(|_| StorageError::Init("Reader lock poisoned".into()))?;
if let Some(ids) = node_ids {
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let query = format!(
"SELECT kn.id, kn.content, COALESCE(ne.model, kn.embedding_model) AS embedding_model
FROM knowledge_nodes kn
LEFT JOIN node_embeddings ne ON ne.node_id = kn.id
WHERE kn.id IN ({})",
placeholders
);
let mut result_nodes = Vec::new();
{
let mut stmt = reader.prepare(&query)?;
let params: Vec<&dyn rusqlite::ToSql> =
ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
let rows = stmt.query_map(params.as_slice(), |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})?;
for r in rows.flatten() {
result_nodes.push(r);
}
}
result_nodes
} else if force {
let mut stmt =
reader.prepare("SELECT id, content, embedding_model FROM knowledge_nodes")?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})?;
rows.filter_map(|r| r.ok()).collect()
} else {
let mut stmt = reader.prepare(
"SELECT kn.id, kn.content, COALESCE(ne.model, kn.embedding_model) AS embedding_model
FROM knowledge_nodes kn
LEFT JOIN node_embeddings ne ON ne.node_id = kn.id
WHERE kn.has_embedding = 0
OR kn.has_embedding IS NULL
OR ne.node_id IS NULL
OR COALESCE(ne.model, kn.embedding_model, '') NOT LIKE ?1",
)?;
let rows = stmt.query_map(params![model_pattern], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})?;
rows.filter_map(|r| r.ok()).collect()
}
};
let nodes = self.embedding_regeneration_candidates(node_ids, force)?;
for (id, content, stored_model) in nodes {
if !force {
@ -2491,7 +2425,7 @@ impl Storage {
params![&id],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or_else(|_| (0, stored_model));
.unwrap_or((0, stored_model));
if has_emb == 1
&& stored_model.as_deref().is_some_and(|model| {
@ -2515,6 +2449,78 @@ impl Storage {
Ok(result)
}
#[cfg(all(feature = "embeddings", feature = "vector-search"))]
fn embedding_regeneration_candidates(
&self,
node_ids: Option<&[String]>,
force: bool,
) -> Result<Vec<(String, String, Option<String>)>> {
let reader = self
.reader
.lock()
.map_err(|_| StorageError::Init("Reader lock poisoned".into()))?;
if let Some(ids) = node_ids {
if ids.is_empty() {
return Ok(Vec::new());
}
let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
let query = format!(
"SELECT kn.id, kn.content, COALESCE(ne.model, kn.embedding_model) AS embedding_model
FROM knowledge_nodes kn
LEFT JOIN node_embeddings ne ON ne.node_id = kn.id
WHERE kn.id IN ({})",
placeholders
);
let mut stmt = reader.prepare(&query)?;
let params: Vec<&dyn rusqlite::ToSql> =
ids.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
let rows = stmt.query_map(params.as_slice(), |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})?;
return Ok(rows.filter_map(|r| r.ok()).collect());
}
if force {
let mut stmt =
reader.prepare("SELECT id, content, embedding_model FROM knowledge_nodes")?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})?;
return Ok(rows.filter_map(|r| r.ok()).collect());
}
let active_model = self.embedding_service.model_name();
let model_pattern = Self::active_embedding_model_like_pattern(active_model);
let mut stmt = reader.prepare(
"SELECT kn.id, kn.content, COALESCE(ne.model, kn.embedding_model) AS embedding_model
FROM knowledge_nodes kn
LEFT JOIN node_embeddings ne ON ne.node_id = kn.id
WHERE kn.has_embedding = 0
OR kn.has_embedding IS NULL
OR ne.node_id IS NULL
OR COALESCE(ne.model, kn.embedding_model, '') NOT LIKE ?1",
)?;
let rows = stmt.query_map(params![model_pattern], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
/// Query memories valid at a specific time
pub fn query_at_time(
&self,
@ -2788,7 +2794,8 @@ impl Storage {
}
}
// 3. Generate missing embeddings
// 3. Generate missing and model-mismatched embeddings.
// This must drain the whole set so embedder upgrades do not strand v1 corpora.
#[cfg(all(feature = "embeddings", feature = "vector-search"))]
let embeddings_generated = self.generate_missing_embeddings()?;
#[cfg(not(all(feature = "embeddings", feature = "vector-search")))]
@ -3342,7 +3349,7 @@ impl Storage {
Ok(Some(optimized_w20))
}
/// Generate missing embeddings
/// Generate all missing or active-model-mismatched embeddings.
#[cfg(all(feature = "embeddings", feature = "vector-search"))]
fn generate_missing_embeddings(&self) -> Result<i64> {
if !self.embedding_service.is_ready()
@ -3352,41 +3359,15 @@ impl Storage {
return Ok(0);
}
let active_model = self.embedding_service.model_name();
let model_pattern = Self::active_embedding_model_like_pattern(active_model);
let nodes: Vec<(String, String)> = {
let reader = self
.reader
.lock()
.map_err(|_| StorageError::Init("Reader lock poisoned".into()))?;
reader
.prepare(
"SELECT kn.id, kn.content
FROM knowledge_nodes kn
LEFT JOIN node_embeddings ne ON ne.node_id = kn.id
WHERE kn.has_embedding = 0
OR kn.has_embedding IS NULL
OR ne.node_id IS NULL
OR COALESCE(ne.model, kn.embedding_model, '') NOT LIKE ?1
LIMIT 100",
)?
.query_map(params![model_pattern], |row| Ok((row.get(0)?, row.get(1)?)))?
.filter_map(|r| r.ok())
.collect()
};
let mut count = 0i64;
for (id, content) in nodes {
if let Err(e) = self.generate_embedding_for_node(&id, &content) {
tracing::warn!("Failed to generate embedding for {}: {}", id, e);
} else {
count += 1;
}
let result = self.generate_embeddings(None, false)?;
if result.failed > 0 {
tracing::warn!(
failed = result.failed,
"Some embeddings could not be regenerated during consolidation"
);
}
Ok(count)
Ok(result.successful)
}
}
@ -4308,7 +4289,7 @@ impl Storage {
let mut latest: Option<DateTime<Utc>> = None;
if let Ok(entries) = std::fs::read_dir(&backup_dir) {
if let Ok(entries) = std::fs::read_dir(backup_dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
@ -4730,12 +4711,11 @@ impl Storage {
.as_deref()
.and_then(Self::parse_rfc3339_opt),
incoming_updated,
) {
if existing > incoming {
report.conflicts_kept_local += 1;
report.rows_skipped += 1;
continue;
}
) && existing > incoming
{
report.conflicts_kept_local += 1;
report.rows_skipped += 1;
continue;
}
let affected = Self::insert_or_replace_row(tx, "knowledge_nodes", table, row)?;
@ -5656,6 +5636,57 @@ mod tests {
);
}
#[cfg(all(feature = "embeddings", feature = "vector-search"))]
#[test]
fn test_embedding_regeneration_candidates_include_entire_mismatched_corpus() {
let storage = create_test_storage();
let stale_model = "all-MiniLM-L6-v2";
let stale_embedding = Embedding::new(vec![0.0; EMBEDDING_DIMENSIONS]).to_bytes();
for i in 0..125 {
let node = storage
.ingest(IngestInput {
content: format!("legacy embedded memory {}", i),
node_type: "fact".to_string(),
..Default::default()
})
.unwrap();
let writer = storage.writer.lock().unwrap();
writer
.execute(
"INSERT OR REPLACE INTO node_embeddings
(node_id, embedding, dimensions, model, created_at)
VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![
&node.id,
&stale_embedding,
EMBEDDING_DIMENSIONS as i32,
stale_model,
Utc::now().to_rfc3339()
],
)
.unwrap();
writer
.execute(
"UPDATE knowledge_nodes
SET has_embedding = 1, embedding_model = ?2
WHERE id = ?1",
rusqlite::params![&node.id, stale_model],
)
.unwrap();
}
let stats = storage.get_stats().unwrap();
assert_eq!(stats.nodes_with_mismatched_embeddings, 125);
assert_eq!(stats.nodes_with_active_embeddings, 0);
let candidates = storage
.embedding_regeneration_candidates(None, false)
.unwrap();
assert_eq!(candidates.len(), 125);
}
#[test]
fn test_storage_creation() {
let storage = create_test_storage();