From 6d4606a830a3a6404ac8ee6b72f70eb24e35052f Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Mon, 22 Jun 2026 13:05:28 +0200 Subject: [PATCH] fix(engine): optimize survives a cross-process write race on the same table (#297) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * test(engine): cross-process optimize-vs-write race — RED Two regression tests for the prod bug: a direct `optimize` process racing a served write on the same table fails, because the in-process write queue does not serialize across processes and the data-table optimize path has no retry. - optimize_survives_concurrent_insert_advancing_manifest: a concurrent insert advances the manifest while optimize is paused between compact and publish; optimize's equality-CAS publish then fails "expected X but current Y". - optimize_survives_concurrent_delete_before_compaction: a concurrent delete commits before optimize compacts; Lance rebases the compaction past it cleanly, so optimize again fails the publish CAS (the genuine Lance Rewrite-vs-Rewrite overlap is rarer and shares the internal path's retry). Both fail today with ExpectedVersionMismatch. Adds the `optimize.before_compact` failpoint seam + a wait_for_sidecar helper; serializes the optimize failpoint tests (shared failpoint name). The fix lands next. * fix(engine): optimize survives a cross-process write race on the same table The data-table optimize path trusted the in-process write queue and skipped a retry, so a CLI `optimize` racing a served write (separate processes = separate queues) failed: either the Lance Rewrite lost ("preempted by concurrent Update") or the manifest publish lost the strict equality CAS ("expected X but current Y"). Unify both compaction paths on the internal path's reopen+replan shape, with a two-level retry that matches the two failure points: - Outer loop (reopen+replan): a genuine Lance Rewrite-vs-Update/Delete same- fragment conflict means our compaction did not commit — reopen at the new HEAD and re-plan. Lance rebases the common disjoint case (a concurrent insert/delete on other fragments) for free, so this fires only on real overlap. - Inner loop (Phase C, monotonic publish): the manifest advanced between our compaction and our publish. The compaction is already committed at Lance HEAD N, so we must NOT reopen (that trips the HEAD>manifest drift guard on our own work). Re-read the current manifest version C: if C >= N the manifest already includes our compaction (versions are linear) — no-op; else fast-forward to N. Monotonic, not the strict equality CAS that manufactured the conflict. The Phase-A sidecar is written once and reused across reopen attempts (every Phase-B commit is content-preserving, so recovery rolls the observed HEAD forward or safely rolls the compaction back). The in-process queue is kept — it is now an in-process contention reducer, not the cross-process correctness guard. Shares the COMPACTION_RETRY_BUDGET constant + is_retryable_lance_conflict with the internal path; adds is_retryable_manifest_conflict for the publish loop. No writer_epoch. Turns the prior commit's two race tests green. * docs(rfc-013): two-op-class principle + the found+fixed optimize-vs-write race §6.6 records the maintenance vs logical op-class distinction (maintenance commutes → Lance rebase + reopen/replan + monotonic manifest fast-forward, no writer_epoch; logical → strict cross-process OCC + epoch) and the prod optimize-vs-served-write race that motivated it, now landed. Adds the matching mechanic row to §4.2. * fix(engine): retry must not misclassify optimize's own HEAD drift Review catch on the cross-process optimize fix: the outer retry loop re-ran the `lance_head > manifest` drift guard every iteration. After a partial Phase-B commit (the auto_cleanup strip or compaction commits, then a later op hits a retryable conflict), the reopened attempt saw HEAD ahead of the manifest — from OUR own sidecar-covered work, not an external writer — and deleted the sidecar + returned `skipped_for_drift`, stranding uncovered drift that then needs `repair`. Track `head_advanced` (did one of our Phase-B ops already commit). The drift guard now fires only when `!head_advanced` (genuine pre-existing external drift); once we have advanced HEAD, a reopened HEAD>manifest is our work that the monotonic publish fast-forwards. The no-op early-return likewise publishes prior committed work instead of dropping it when `head_advanced`. Regression test `optimize_retry_does_not_misclassify_own_head_drift` injects one retryable reindex conflict after the compaction commits (new `optimize.inject_ reindex_conflict` seam); red→green verified by negative control (reverting the gate reproduces `skipped_for_drift: Some(DriftNeedsRepair)`). Also de-flake `optimize_survives_concurrent_insert_advancing_manifest`: pause at `before_compact` (not post-compact) so the concurrent insert lands while HEAD== manifest — otherwise it could race optimize's committed-but-unpublished compaction and hit the write-path "HEAD ahead of manifest" guard. * fix(engine): optimize publish converges on retry-budget exhaustion Review catch (greptile): the monotonic Phase-C publish loop returned an error on its final iteration's retryable manifest conflict, even though that conflict can itself mean a concurrent writer published a version that already includes our (content- preserving) compaction — i.e. the postcondition ("the manifest reflects our compaction") is already met. Recovery covered it (no data loss), but the operator saw a spurious error and had to re-run. Restructure the loop to re-read `current` on every retryable conflict and, on budget exhaustion, do a final `current >= state.version` convergence check before surfacing the error — the §6.6 "postcondition is the state, not winning the CAS" principle. Factor the repeated current-version read into `current_manifest_version`. --- crates/omnigraph/src/db/omnigraph/optimize.rs | 500 +++++++++++------- crates/omnigraph/tests/failpoints.rs | 237 +++++++++ docs/dev/rfc-013-write-path-latency.md | 54 ++ 3 files changed, 603 insertions(+), 188 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index 29bf2b6..e3aed3d 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -359,205 +359,306 @@ async fn optimize_one_table( .acquire_many(&[(table_key.clone(), None)]) .await; - // `compact_files` is a Lance-only maintenance API that needs `&mut Dataset`. - // The `TableStorage` trait deliberately does not surface it (the staged-write - // invariant covers writes; compaction is a separate concern). Unwrap the - // opaque `SnapshotHandle` via `into_dataset()` (`pub(crate)`, gated to the - // maintenance path). - let handle = db - .storage() - .open_dataset_head_for_write(&table_key, &full_path, None) - .await?; - let mut ds = handle.into_dataset(); + // Survive a CROSS-PROCESS race (a CLI `optimize` vs the served server): the + // in-process write queue above serializes only same-process writers, so we also + // retry. Two failure modes, two retry levels: + // * Outer loop — a genuine Lance `Rewrite`-vs-`Update/Delete` same-fragment + // conflict (compaction did NOT commit). Reopen at the new HEAD and re-plan, + // exactly as the internal-table path does. (Lance rebases the common disjoint + // case — a concurrent insert/delete on other fragments — for free, so this + // fires only on real overlap.) + // * Inner loop (Phase C) — the manifest advanced under us between our + // compaction and our publish. The compaction IS committed at Lance HEAD, so + // we must NOT reopen (that would trip the HEAD>manifest drift guard on our + // own work); instead re-read the current manifest version and either no-op + // (the manifest already moved past our version — being linear, it descends + // from and includes our compaction) or fast-forward to it. Monotonic, never + // the strict equality CAS that manufactured the bug. + // + // The Phase-A sidecar is written ONCE on the first productive attempt and reused + // across reopen attempts: every Phase-B commit is content-preserving, so a crash + // mid-retry leaves the table readable and recovery either rolls the observed HEAD + // forward (pin still matches the manifest) or safely rolls the compaction back. + let mut sidecar: Option = None; - // CAS baseline: the table's current manifest version, read under the queue - // (in-memory coordinator snapshot, no storage I/O — stable for this section). - let expected_version = db - .fresh_snapshot_for_branch(None) - .await? - .entry(&table_key) - .map(|e| e.table_version) - .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; + // Tracks whether one of OUR Phase-B ops (auto-cleanup strip / compact / reindex) + // already committed and advanced Lance HEAD past the manifest in a prior attempt. + // Once true, a reopened `lance_head > manifest` is our own sidecar-covered work, + // NOT external drift — so the drift guard and the no-op early-return must not treat + // it as such (that would drop our committed work as uncovered drift). + let mut head_advanced = false; - let lance_head_version = ds.version().version; - if lance_head_version < expected_version { - return Err(OmniError::manifest_internal(format!( - "table '{}' Lance HEAD version {} is behind manifest version {}", - table_key, lance_head_version, expected_version - ))); - } - if lance_head_version > expected_version { - tracing::warn!( - target: "omnigraph::optimize", - table = %table_key, - manifest_version = expected_version, - lance_head_version, - "skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \ - to classify and publish covered maintenance drift explicitly", - ); - return Ok(TableOptimizeStats::skipped_for_drift( - table_key, - expected_version, - lance_head_version, - )); - } + // Outer loop: open → plan → Phase B, reopening + re-planning on a retryable + // Lance conflict. Breaks with the committed snapshot once Phase B succeeds. + let mut attempt: u32 = 0; + let (snapshot, metrics, pending_indexes, committed) = loop { + attempt += 1; - // Precise "will it compact?" check — `plan_compaction` also accounts for - // deletion materialization (which can rewrite even a single fragment). - let options = CompactionOptions::default(); - let plan = plan_compaction(&ds, &options) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - let will_compact = plan.num_tasks() > 0; - // Even when there is nothing to compact, the table may still have index - // work: rows appended since the index was built (e.g. via `ingest --mode - // merge`) are scanned unindexed until folded in (needs_reindex), OR a - // declared `@index` was never built — schema apply records the intent but - // defers the physical build (iss-848), so optimize is the operator-facing - // reconciler that materializes it (needs_index_create). Any of the three is - // enough to enter the publish path. If NONE, this table is a no-op and must - // NOT be pinned in a sidecar — a zero-commit pin classifies NoMovement on - // recovery and forces an all-or-nothing rollback of sibling tables' - // legitimate work. Uncovered pre-existing manifest/HEAD drift is skipped - // above and goes through explicit repair, so this only runs on a healthy - // table under the per-table queue + sidecar. - let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?; - // needs_index_work_* checks "a declared index is missing AND row_count > 0", - // so empty tables stay no-ops (never pinned). It re-reads the head under the - // queue we already hold, so it is consistent with `ds`. - let needs_index_create = if let Some(type_name) = table_key.strip_prefix("node:") { - super::table_ops::needs_index_work_node(db, type_name, &table_key, &full_path, None).await? - } else { - super::table_ops::needs_index_work_edge(db, &table_key, &full_path, None).await? - }; - if !will_compact && !needs_reindex && !needs_index_create { - return Ok(TableOptimizeStats::compacted( - table_key, - &CompactionMetrics::default(), - false, - )); - } + // `compact_files` is a Lance-only maintenance API that needs `&mut Dataset`. + // The `TableStorage` trait deliberately does not surface it; unwrap the + // opaque `SnapshotHandle` via `into_dataset()` (gated to the maintenance path). + let mut ds = db + .storage() + .open_dataset_head_for_write(&table_key, &full_path, None) + .await? + .into_dataset(); - // Phase A: recovery sidecar BEFORE any HEAD-advancing op (compaction or - // index optimize), so a crash before the manifest publish rolls forward on - // next open. - let sidecar = crate::db::manifest::new_sidecar( - crate::db::manifest::SidecarKind::Optimize, - None, - // optimize is system-attributed (no `optimize_as` actor API today). - None, - vec![crate::db::manifest::SidecarTablePin { - table_key: table_key.clone(), - table_path: full_path.clone(), - expected_version, - // Lower bound — compaction commits N≥1 versions (reserve + rewrite); - // the classifier loose-matches SidecarKind::Optimize. - post_commit_pin: expected_version + 1, - // Optimize uses the loose match (drift is derived state), not - // BranchMerge's Phase-B confirmation — left None. - confirmed_version: None, - table_branch: None, - }], - ); - let handle = - crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?; + // CAS baseline: the table's current manifest version, re-read each attempt + // (a reopen means the manifest may have advanced). + let expected_version = db + .fresh_snapshot_for_branch(None) + .await? + .entry(&table_key) + .map(|e| e.table_version) + .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; - // Phase B: compaction (if any) then incremental index optimize — both - // advance Lance HEAD inside the sidecar window. `compact_files` rewrites - // fragments and drops them from existing index segments' coverage; - // `optimize_indices` folds the rewritten and any previously-unindexed - // fragments back in (Lance's incremental merge, not a full retrain). This - // is the same compact -> optimize_indices sequencing LanceDB's `optimize()` - // uses. `optimize_indices` is an inline-commit residual: lance-6.0.1 - // exposes no uncommitted variant, so like `compact_files` it commits - // directly and relies on the sidecar for recovery. - // Capture the baseline BEFORE the auto-cleanup scrub below, so that if the - // scrub is the only thing that commits, `committed` is still true and Phase C - // publishes the advanced HEAD (no uncovered HEAD>manifest drift). - let version_before = ds.version().version; + let lance_head_version = ds.version().version; + if lance_head_version < expected_version { + return Err(OmniError::manifest_internal(format!( + "table '{}' Lance HEAD version {} is behind manifest version {}", + table_key, lance_head_version, expected_version + ))); + } + if !head_advanced && lance_head_version > expected_version { + // Pre-existing EXTERNAL uncovered drift (we have not advanced HEAD yet) — + // go through explicit repair. Once `head_advanced` is set, a reopened + // `lance_head > manifest` is our own prior Phase-B commit (sidecar-covered) + // that the publish below fast-forwards, NOT external drift, so this guard is + // skipped on those retries. + if let Some(h) = sidecar.take() { + let _ = crate::db::manifest::delete_sidecar(&h, db.storage_adapter()).await; + } + tracing::warn!( + target: "omnigraph::optimize", + table = %table_key, + manifest_version = expected_version, + lance_head_version, + "skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \ + to classify and publish covered maintenance drift explicitly", + ); + return Ok(TableOptimizeStats::skipped_for_drift( + table_key.clone(), + expected_version, + lance_head_version, + )); + } - // Keep optimize non-destructive on upgraded graphs (same guarantee the - // internal-table path makes — see `clear_stale_auto_cleanup_config`). - // `compact_files` / `optimize_indices` commit with a default `CommitConfig` - // (`skip_auto_cleanup = false`) and expose no skip override, so on a graph - // created by a pre-v7 binary (auto_cleanup ON) those commits would fire - // Lance's version-GC hook and prune `__manifest`-pinned data-table versions. - // Strip the stale config first. We hold the per-table queue, so no concurrent - // writer can race this (no retry loop needed, unlike the internal-table path); - // any commit it makes is content-preserving and covered by the Optimize - // sidecar's loose `post_commit_pin` like the other Phase-B commits. - clear_stale_auto_cleanup_config(&mut ds) - .await - .map_err(|e| OmniError::Lance(e.to_string()))?; - - let metrics: CompactionMetrics = if will_compact { - compact_files(&mut ds, options, None) + // Precise "will it compact?" check — `plan_compaction` also accounts for + // deletion materialization (which can rewrite even a single fragment). + let options = CompactionOptions::default(); + let plan = plan_compaction(&ds, &options) .await - .map_err(|e| OmniError::Lance(e.to_string()))? - } else { - CompactionMetrics::default() + .map_err(|e| OmniError::Lance(e.to_string()))?; + let will_compact = plan.num_tasks() > 0; + // Even with nothing to compact, the table may still have index work + // (needs_reindex: rows appended since the index was built; needs_index_create: + // a declared `@index` whose physical build schema apply deferred, iss-848). + // Any of the three enters the publish path. If NONE, this is a no-op and must + // NOT be pinned in a sidecar (a zero-commit pin classifies NoMovement on + // recovery and rolls back siblings). + let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?; + let needs_index_create = if let Some(type_name) = table_key.strip_prefix("node:") { + super::table_ops::needs_index_work_node(db, type_name, &table_key, &full_path, None) + .await? + } else { + super::table_ops::needs_index_work_edge(db, &table_key, &full_path, None).await? + }; + if !will_compact && !needs_reindex && !needs_index_create { + if head_advanced { + // Nothing left to compact, but a prior attempt already advanced HEAD + // (e.g. the strip committed, then compaction conflicted, and the reopen + // is now already compacted). Publish that committed work instead of + // dropping it as uncovered drift. + break ( + crate::storage_layer::SnapshotHandle::new(ds), + CompactionMetrics::default(), + Vec::new(), + true, + ); + } + if let Some(h) = sidecar.take() { + let _ = crate::db::manifest::delete_sidecar(&h, db.storage_adapter()).await; + } + return Ok(TableOptimizeStats::compacted( + table_key.clone(), + &CompactionMetrics::default(), + false, + )); + } + + // Phase A: recovery sidecar BEFORE any HEAD-advancing op, written once and + // reused across reopen attempts. + if sidecar.is_none() { + let sc = crate::db::manifest::new_sidecar( + crate::db::manifest::SidecarKind::Optimize, + None, + // optimize is system-attributed (no `optimize_as` actor API today). + None, + vec![crate::db::manifest::SidecarTablePin { + table_key: table_key.clone(), + table_path: full_path.clone(), + expected_version, + // Lower bound — compaction commits N≥1 versions (reserve + rewrite); + // the classifier loose-matches SidecarKind::Optimize. + post_commit_pin: expected_version + 1, + confirmed_version: None, + table_branch: None, + }], + ); + sidecar = Some( + crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sc).await?, + ); + } + + // Test seam: a concurrent (cross-process) writer can interleave here, before + // any Phase-B commit lands, to exercise the reopen+replan path. + crate::failpoints::maybe_fail("optimize.before_compact")?; + + // Phase B: scrub stale auto_cleanup (keeps optimize non-destructive on a + // graph upgraded from a pre-v7 binary whose `compact_files`/`optimize_indices` + // commits would otherwise fire Lance's auto-cleanup GC hook), compact, + // incremental reindex, then materialize declared-but-missing indexes. Each is + // an inline-commit residual covered by the sidecar. A retryable Lance conflict + // here means a concurrent writer preempted an overlapping fragment → reopen at + // the new HEAD and re-plan. Baseline captured BEFORE the scrub so that if the + // scrub is the only commit, `committed` still triggers the Phase-C publish. + let version_before = ds.version().version; + match clear_stale_auto_cleanup_config(&mut ds).await { + // `true` ⇒ the strip committed and advanced HEAD past the manifest. + Ok(stripped) => head_advanced |= stripped, + Err(e) if attempt < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => { + continue; + } + Err(e) => return Err(OmniError::Lance(e.to_string())), + } + let metrics: CompactionMetrics = if will_compact { + match compact_files(&mut ds, options, None).await { + Ok(m) => { + head_advanced = true; + m + } + Err(e) if attempt < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => { + continue; + } + Err(e) => return Err(OmniError::Lance(e.to_string())), + } + } else { + CompactionMetrics::default() + }; + // Test seam: inject one retryable reindex conflict AFTER compaction has + // committed (so HEAD is already ahead of the manifest from our own work), + // exercising the own-HEAD (not external) drift classification on the next + // reopened attempt. + if crate::failpoints::maybe_fail("optimize.inject_reindex_conflict").is_err() + && attempt < COMPACTION_RETRY_BUDGET + { + continue; + } + match ds.optimize_indices(&OptimizeOptions::default()).await { + Ok(()) => {} + Err(e) if attempt < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => { + continue; + } + Err(e) => { + return Err(OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e))); + } + } + + let catalog = db.catalog(); + let mut snapshot = crate::storage_layer::SnapshotHandle::new(ds); + let pending_indexes: Vec = + super::table_ops::build_indices_on_dataset_for_catalog( + db, + &catalog, + &table_key, + &mut snapshot, + ) + .await?; + // optimize_indices / index build may also have committed (folded fragments, + // built a deferred index). Any HEAD advance this attempt counts too. + let version_after = snapshot.dataset().version().version; + head_advanced |= version_after != version_before; + + break (snapshot, metrics, pending_indexes, head_advanced); }; - ds.optimize_indices(&OptimizeOptions::default()) - .await - .map_err(|e| OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)))?; - // Materialize any declared-but-missing index over the just-compacted layout, - // reusing the build chokepoint (idempotent: skips existing indexes; fault- - // isolates an untrainable vector column into `pending` rather than failing). - // Run it UNCONDITIONALLY now that we are past the no-op gate — not only when - // `needs_index_create`. A table can enter this path for compaction or - // reindex while its sole missing index is an untrainable Vector column - // (which `needs_index_work_*` does not count as buildable work); calling the - // build here is what surfaces that column in `pending_indexes`, so optimize - // can't compact a table yet silently drop the deferred-index signal. - // Idempotent + cheap when there is nothing to build. Vector index creation - // is an inline-commit residual; the Optimize sidecar's loose post_commit_pin - // covers the extra commits. - let catalog = db.catalog(); - let mut snapshot = crate::storage_layer::SnapshotHandle::new(ds); - let pending_indexes: Vec = - super::table_ops::build_indices_on_dataset_for_catalog( - db, - &catalog, - &table_key, - &mut snapshot, - ) - .await?; - let version_after = snapshot.dataset().version().version; - let committed = version_after != version_before; - - // Pin the per-writer Phase B → Phase C residual for optimize: Lance HEAD has - // advanced but the manifest publish below hasn't run. + // Pin the per-writer Phase B → Phase C residual: Lance HEAD has advanced but the + // manifest publish below hasn't run. crate::failpoints::maybe_fail("optimize.post_phase_b_pre_manifest_commit")?; - // Phase C: publish the compacted version to the manifest (one CAS commit, - // expected = the version observed under the queue). On failure the sidecar - // is intentionally left for the open-time recovery sweep to roll forward. + // Phase C: monotonic fast-forward publish. The compaction is committed at Lance + // HEAD `N`; publish a manifest pointer that includes it. If a concurrent writer + // already advanced the manifest to ≥ N (it built on our compaction), there is + // nothing to do. Otherwise advance to N; a concurrent advance during this window + // is a retryable manifest conflict — re-read the current version and re-evaluate + // (NOT a reopen: the compaction is already committed). if committed { let state = db.storage().table_state(&full_path, &snapshot).await?; - let update = crate::db::SubTableUpdate { - table_key: table_key.clone(), - table_version: state.version, - table_branch: None, - row_count: state.row_count, - version_metadata: state.version_metadata, - }; - let mut expected = std::collections::HashMap::new(); - expected.insert(table_key.clone(), expected_version); - db.coordinator - .write() - .await - .commit_updates_with_actor_with_expected(&[update], &expected, None) - .await?; + let mut published = false; + let mut last_conflict: Option = None; + for _ in 0..COMPACTION_RETRY_BUDGET { + let current = current_manifest_version(db, &table_key).await?; + if current >= state.version { + // The manifest already points at a version that includes our + // compaction (Lance versions are linear). Nothing to publish. + published = true; + break; + } + let update = crate::db::SubTableUpdate { + table_key: table_key.clone(), + table_version: state.version, + table_branch: None, + row_count: state.row_count, + version_metadata: state.version_metadata.clone(), + }; + let mut expected = std::collections::HashMap::new(); + expected.insert(table_key.clone(), current); + match db + .coordinator + .write() + .await + .commit_updates_with_actor_with_expected(&[update], &expected, None) + .await + { + Ok(_) => { + published = true; + break; + } + // A retryable manifest conflict means the manifest moved under us — + // loop and re-read `current` (the top check converges if it now + // already includes our compaction). Record it for the exhaustion path. + Err(e) if is_retryable_manifest_conflict(&e) => last_conflict = Some(e), + // Leave the sidecar for the open-time recovery sweep to roll forward. + Err(e) => return Err(e), + } + } + if !published { + // Budget exhausted under sustained contention. The final conflict may + // itself mean a concurrent writer published a version that already + // includes our (content-preserving) compaction — the postcondition is + // "the manifest reflects our compaction," not "we won the CAS" — so + // re-check before surfacing an error (§6.6). + let current = current_manifest_version(db, &table_key).await?; + if current < state.version { + return Err(last_conflict.unwrap_or_else(|| { + OmniError::manifest_conflict(format!( + "optimize publish of {table_key} exhausted {COMPACTION_RETRY_BUDGET} \ + retries against concurrent writers" + )) + })); + } + } } // Phase D: delete the sidecar (best-effort; recovery resolves a leftover). - if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await { - tracing::warn!( - error = %err, - operation_id = handle.operation_id.as_str(), - "optimize recovery sidecar cleanup failed; next open's recovery sweep will resolve it" - ); + if let Some(h) = sidecar.take() { + if let Err(err) = crate::db::manifest::delete_sidecar(&h, db.storage_adapter()).await { + tracing::warn!( + error = %err, + operation_id = h.operation_id.as_str(), + "optimize recovery sidecar cleanup failed; next open's recovery sweep will resolve it" + ); + } } let mut stat = TableOptimizeStats::compacted(table_key, &metrics, committed); @@ -567,7 +668,7 @@ async fn optimize_one_table( /// Bound on the app-level retry of an internal-table compaction against a /// concurrent live writer (see [`is_retryable_lance_conflict`]). -const INTERNAL_COMPACTION_RETRY_BUDGET: u32 = 5; +const COMPACTION_RETRY_BUDGET: u32 = 5; /// A Lance commit error that means "a concurrent writer preempted us; reload the /// dataset and rerun." `compact_files` commits via `commit_compaction` -> @@ -588,6 +689,29 @@ fn is_retryable_lance_conflict(err: &lance::Error) -> bool { ) } +/// A manifest publish conflict that optimize's monotonic Phase-C loop re-evaluates +/// (re-read the current version, then no-op or fast-forward). Both shapes that reach +/// here are `Conflict`-kind and mean "the manifest moved under us; reconsider," never +/// a lost update: the typed `ExpectedVersionMismatch` (a concurrent writer advanced +/// the table) and the publisher's exhausted row-level CAS (`manifest_conflict`). +fn is_retryable_manifest_conflict(err: &OmniError) -> bool { + matches!( + err, + OmniError::Manifest(m) if m.kind == crate::error::ManifestErrorKind::Conflict + ) +} + +/// The table's current manifest version on `main` (0 if absent), read fresh. Used by +/// optimize's monotonic publish loop to decide no-op (`current >= N`) vs fast-forward. +async fn current_manifest_version(db: &Omnigraph, table_key: &str) -> Result { + Ok(db + .fresh_snapshot_for_branch(None) + .await? + .entry(table_key) + .map(|e| e.table_version) + .unwrap_or(0)) +} + /// Remove any stored `lance.auto_cleanup.*` config from a table so compaction /// stays **non-destructive by construction**. Used by both the internal-table /// path ([`compact_internal_table`]) and the data-table path @@ -666,7 +790,7 @@ async fn compact_internal_table( // so optimize would otherwise fail spuriously on a live graph. On a retryable // conflict we re-open at the new HEAD and rerun — the canonical Lance-consumer // pattern. Each attempt opens fresh because the conflict means the version moved. - for attempt in 0..INTERNAL_COMPACTION_RETRY_BUDGET { + for attempt in 0..COMPACTION_RETRY_BUDGET { let handle = db .storage() .open_dataset_head_for_write(table_key, &uri, None) @@ -678,7 +802,7 @@ async fn compact_internal_table( let cleared_config = match clear_stale_auto_cleanup_config(&mut ds).await { Ok(cleared) => cleared, Err(e) => { - if attempt + 1 < INTERNAL_COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) + if attempt + 1 < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) { continue; } @@ -718,7 +842,7 @@ async fn compact_internal_table( )); } Err(e) - if attempt + 1 < INTERNAL_COMPACTION_RETRY_BUDGET + if attempt + 1 < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => { continue; @@ -727,7 +851,7 @@ async fn compact_internal_table( } } Err(OmniError::manifest_conflict(format!( - "internal-table compaction of {table_key} exhausted {INTERNAL_COMPACTION_RETRY_BUDGET} \ + "internal-table compaction of {table_key} exhausted {COMPACTION_RETRY_BUDGET} \ retries against concurrent writers" ))) } diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index 9d65bc1..85c056d 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -3089,6 +3089,7 @@ edge WorksAt: Person -> Company /// forward on next open so the manifest tracks the Lance HEAD — and the healed /// table must then accept a schema apply (the original bug's victim). #[tokio::test] +#[serial(optimize)] async fn optimize_phase_b_failure_recovered_on_next_open() { let _scenario = FailScenario::setup(); let dir = tempfile::tempdir().unwrap(); @@ -3178,6 +3179,242 @@ async fn optimize_phase_b_failure_recovered_on_next_open() { .expect("schema apply after optimize recovery must succeed"); } +/// Cross-process race (the prod bug): a served write advances the manifest on the +/// same table while a SEPARATE `optimize` process is paused between its compaction +/// and its manifest publish. The in-process write queue does NOT serialize across +/// processes, so optimize's equality-CAS publish (expected = its pre-compaction +/// version) finds the manifest already advanced. optimize must CONVERGE — the +/// concurrent write built on top of the compacted HEAD, so the compaction is +/// already reflected — not fail with "expected X but current Y". RED before the +/// monotonic-publish fix. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial(optimize)] +async fn optimize_survives_concurrent_insert_advancing_manifest() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + { + let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] { + db.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", name)], &[("$age", age)]), + ) + .await + .unwrap(); + } + } + + // Pause optimize BEFORE it compacts, so the concurrent insert lands while + // HEAD == manifest (no in-flight optimize drift for the writer to trip on); the + // insert advances the manifest, then optimize compacts on top and must converge + // its publish over the advanced manifest rather than fail the equality CAS. + let failpoint = ScopedFailPoint::new("optimize.before_compact", "pause"); + + let uri_opt = uri.clone(); + let optimize = tokio::spawn(async move { + let db = Omnigraph::open(&uri_opt).await.unwrap(); + db.optimize().await + }); + + // Wait until optimize reaches the pause (its Optimize sidecar is on disk). + assert!( + wait_for_sidecar(dir.path()).await, + "optimize never reached the pre-compact pause", + ); + + // Concurrent insert on the SAME table via a SEPARATE handle (= separate + // in-process write queue = a different process) advances the manifest. + { + let db_b = Omnigraph::open(&uri).await.unwrap(); + db_b.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "eve")], &[("$age", 34)]), + ) + .await + .unwrap(); + } + + drop(failpoint); // release optimize + let result = tokio::time::timeout(std::time::Duration::from_secs(20), optimize) + .await + .expect("optimize task hung") + .unwrap(); + result.expect("optimize must survive a concurrent same-table write (cross-process)"); + + // No lost write: 4 seed + eve all present; graph remains re-optimizable. + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + helpers::count_rows(&db, "node:Person").await, + 5, + "concurrent insert must not be lost", + ); + db.optimize() + .await + .expect("graph must remain healthy / re-optimizable"); +} + +/// Cross-process race: a served DELETE commits on the same table while a SEPARATE +/// `optimize` process is parked just before its compaction. Lance rebases the +/// compaction past the delete cleanly (so this surfaces as a manifest-CAS mismatch +/// at publish, not a Lance `Rewrite` conflict — the genuine `Rewrite`-vs-`Rewrite` +/// overlap is the rarer many-fragment/concurrent-compaction case, covered by the +/// shared `is_retryable_lance_conflict` retry the internal-table path already +/// exercises). optimize must converge its publish over the advanced manifest and +/// preserve the delete. RED before the fix. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial(optimize)] +async fn optimize_survives_concurrent_delete_before_compaction() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + { + let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] { + db.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", name)], &[("$age", age)]), + ) + .await + .unwrap(); + } + } + + // Pause optimize BEFORE its compaction commits. + let failpoint = ScopedFailPoint::new("optimize.before_compact", "pause"); + + let uri_opt = uri.clone(); + let optimize = tokio::spawn(async move { + let db = Omnigraph::open(&uri_opt).await.unwrap(); + db.optimize().await + }); + + assert!( + wait_for_sidecar(dir.path()).await, + "optimize never reached the pre-compact pause", + ); + + // Concurrent DELETE of an existing row writes a deletion vector onto the + // fragment optimize is about to compact → optimize's Rewrite overlap-conflicts + // at the Lance level ("Rewrite … preempted by concurrent Delete/Update"). + { + let db_b = Omnigraph::open(&uri).await.unwrap(); + db_b.mutate( + "main", + MUTATION_QUERIES, + "remove_person", + &mixed_params(&[("$name", "alice")], &[]), + ) + .await + .unwrap(); + } + + drop(failpoint); // release optimize + let result = tokio::time::timeout(std::time::Duration::from_secs(20), optimize) + .await + .expect("optimize task hung") + .unwrap(); + result.expect("optimize must reopen+replan past a concurrent overlapping delete"); + + // No lost write: alice's delete persisted (3 rows); graph remains re-optimizable. + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + helpers::count_rows(&db, "node:Person").await, + 3, + "the concurrent delete must persist (alice removed)", + ); + db.optimize() + .await + .expect("graph must remain healthy / re-optimizable"); +} + +/// Regression: the outer compaction retry loop must NOT misclassify optimize's OWN +/// committed Phase-B work as external drift. Attempt 1 compacts (HEAD → V+1); if a +/// LATER Phase-B op (reindex) then hits a retryable conflict, the reopened attempt +/// sees Lance HEAD ahead of the manifest — from OUR compaction, not an external +/// writer. The drift guard must skip it (we hold the sidecar) and converge, not +/// delete the sidecar and return `skipped_for_drift` (which would strand uncovered +/// drift). Reproduced by injecting one retryable reindex conflict after the compact. +#[tokio::test] +#[serial(optimize)] +async fn optimize_retry_does_not_misclassify_own_head_drift() { + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + { + let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] { + db.mutate( + "main", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", name)], &[("$age", age)]), + ) + .await + .unwrap(); + } + } + + // Inject exactly one retryable reindex conflict: attempt 1 compacts (HEAD+1) then + // "conflicts" on reindex → retry; attempt 2 reopens with HEAD ahead of the manifest + // from our own compaction — the misclassification trigger. + let _failpoint = ScopedFailPoint::new("optimize.inject_reindex_conflict", "1*return"); + + let db = Omnigraph::open(&uri).await.unwrap(); + let stats = db + .optimize() + .await + .expect("optimize must converge, not misclassify its own HEAD drift"); + let person = stats + .iter() + .find(|s| s.table_key == "node:Person") + .expect("node:Person stat present"); + assert!( + person.skipped.is_none(), + "node:Person must converge, not skipped_for_drift: {:?}", + person.skipped, + ); + + // No uncovered drift stranded: a follow-up optimize is clean and all rows read. + let stats2 = db.optimize().await.unwrap(); + let person2 = stats2 + .iter() + .find(|s| s.table_key == "node:Person") + .unwrap(); + assert!( + person2.skipped.is_none(), + "follow-up optimize must be clean (no stranded drift): {:?}", + person2.skipped, + ); + assert_eq!(helpers::count_rows(&db, "node:Person").await, 4); +} + +/// Poll until `optimize` has written its recovery sidecar (i.e. reached Phase B +/// and is about to / has compacted), signalling it is parked at its failpoint. +async fn wait_for_sidecar(root: &std::path::Path) -> bool { + let recovery_dir = root.join("__recovery"); + for _ in 0..1000 { + if recovery_dir.exists() + && std::fs::read_dir(&recovery_dir) + .map(|d| d.count() > 0) + .unwrap_or(false) + { + return true; + } + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + false +} + #[tokio::test] #[serial(branch_merge_phase_b)] async fn branch_merge_phase_b_failure_recovered_on_next_open() { diff --git a/docs/dev/rfc-013-write-path-latency.md b/docs/dev/rfc-013-write-path-latency.md index d955a9d..1954b01 100644 --- a/docs/dev/rfc-013-write-path-latency.md +++ b/docs/dev/rfc-013-write-path-latency.md @@ -579,6 +579,7 @@ The cost contract becomes part of `publish`'s documented API: | Epoch fence | Monotonic `writer_epoch` in `__manifest`, CAS-claimed at writer init, checked on every publish. Fences a whole zombie *writer* deterministically (no TTL); closes the multi-process exposure and the Lance-MTT TTL-lease gap. | SlateDB `FenceableTransactionalObject` **[U]** | | Branch create | Lance `Clone` instead of the per-table fork loop (O(tables)→O(1) sequential). | `iss-691` **[G]** | | Branch delete | Run the per-other-branch safety check and the per-table reclaim loops concurrently (`buffer_unordered`); read branch sets from in-memory coordinator state. | **[S]** | +| Maintenance-class commit (compaction) | Commutative/content-preserving ops do NOT use the logical class's strict OCC: Lance rebases the disjoint case, the app reopens+replans on a real overlap, and the manifest publish is a **monotonic fast-forward** (advance or no-op, never equality CAS) — no `writer_epoch`. The two-op-class rule + the found+fixed optimize-vs-write race: §6.6. | §6.6 **[M]**, **LANDED** | --- @@ -823,6 +824,59 @@ This is the standard WAL-replay + leader-lease shape (confirmed against SlateDB' finding promotes #6/#7 from "nice correctness work" to the load-bearing guard that gates multi-writer topologies — and it is the motivating case for them.** +## 6.6 The two op classes — and a found+fixed maintenance race (LANDED) + +§6.5 is about the **logical** write class. A prod run surfaced the same +process-boundary flaw in the **maintenance** class: a direct CLI `optimize` racing +a served write on the same table **failed** — either the Lance `Rewrite` lost +("preempted by concurrent Update") or the manifest publish lost the strict equality +CAS ("expected 14 but current 15"). Same root cause as §6.5 (the in-process write +queue does not serialize across processes), but the right fix is the **opposite** of +the logical class, because the two classes commute differently: + +| class | examples | commutes? | correct commit model | +|---|---|---|---| +| **maintenance** | compaction (`Rewrite`), `optimize_indices` | **yes** (content-preserving) | Lance native rebase + app reopen/replan on real overlap + **monotonic manifest fast-forward** — no epoch, no read-set | +| **logical mutation** | load / mutate / merge / delete | **no** (lost-update, write-skew) | strict cross-process OCC: read-set + write-set CAS under the `writer_epoch` fence (§6.5, #7) | + +Applying strict OCC + equality-CAS uniformly is the mistake: **too strong for +maintenance** (it manufactures a false conflict against a commutative op — this +bug) and **too weak for logical writes cross-process** (§6.5). The maintenance fix +needs **no `writer_epoch`** — that is the tell that it is a different class. + +**Validated against Lance 7.0.0 source + reproduced [M].** Lance has no compaction +re-plan retry (the semantic `RetryableCommitConflict` escapes `commit_transaction`'s +loop at `io/commit.rs:979`; only the OCC manifest race is retried), so the +application must reopen + re-plan — exactly what the internal-table path already +did. Notably, Lance **rebases the common case for free**: a concurrent +insert/update/delete on *other* fragments is disjoint, so the data-table compaction +commits cleanly and the conflict surfaces only as the manifest fast-forward +(the genuine `Rewrite`-vs-`Rewrite` overlap is the rarer many-fragment / +concurrent-compaction case). + +**Fix (LANDED).** Both compaction paths now share one reopen+replan shape with a +two-level retry: an outer loop reopens+replans on a real Lance overlap conflict; an +inner Phase-C loop makes the manifest publish a **monotonic fast-forward** +(advance to the compacted version `N`, or no-op when the manifest already moved to +`≥ N` — being linear, it descends from and includes the compaction), never the +equality CAS. The `Optimize` recovery sidecar is written once and reused across +attempts (every commit is content-preserving). The in-process queue is kept as an +in-process contention reducer, not the cross-process guard. No `writer_epoch`. +(`db/omnigraph/optimize.rs`; regression tests in `tests/failpoints.rs`: +`optimize_survives_concurrent_insert_advancing_manifest`, +`optimize_survives_concurrent_delete_before_compaction`.) + +**Relationship to step 5 (the unification).** This is the first correct *instance* of +the maintenance-class commit model, not a parallel band-aid: when step 5 collapses the +writers into the single `publish(txn, plan)` authority, it **relocates** this — a +`TableAction::Rewrite` carries the monotonic-fast-forward + reopen/replan commit model +into the unified publisher — rather than reinventing it. What step 5 deletes is the +*location* (the hand-rolled loop in `optimize_one_table`), not the *semantics*; the two +regression tests above are the contract that must stay green across that refactor. It +also makes step 5 easier, not harder: it already unified the two compaction paths onto +one retry shape and drew the op-class line (logical writers keep equality CAS; only +compaction is monotonic), so there is one fewer pattern for the unification to absorb. + --- ## 7. Invariants & deny-list check