fix(engine): optimize survives a cross-process write race on the same table (#297)

* test(engine): cross-process optimize-vs-write race — RED

Two regression tests for the prod bug: a direct `optimize` process racing a
served write on the same table fails, because the in-process write queue does
not serialize across processes and the data-table optimize path has no retry.

- optimize_survives_concurrent_insert_advancing_manifest: a concurrent insert
  advances the manifest while optimize is paused between compact and publish;
  optimize's equality-CAS publish then fails "expected X but current Y".
- optimize_survives_concurrent_delete_before_compaction: a concurrent delete
  commits before optimize compacts; Lance rebases the compaction past it
  cleanly, so optimize again fails the publish CAS (the genuine Lance
  Rewrite-vs-Rewrite overlap is rarer and shares the internal path's retry).

Both fail today with ExpectedVersionMismatch. Adds the `optimize.before_compact`
failpoint seam + a wait_for_sidecar helper; serializes the optimize failpoint
tests (shared failpoint name). The fix lands next.

* fix(engine): optimize survives a cross-process write race on the same table

The data-table optimize path trusted the in-process write queue and skipped a
retry, so a CLI `optimize` racing a served write (separate processes = separate
queues) failed: either the Lance Rewrite lost ("preempted by concurrent Update")
or the manifest publish lost the strict equality CAS ("expected X but current Y").

Unify both compaction paths on the internal path's reopen+replan shape, with a
two-level retry that matches the two failure points:

- Outer loop (reopen+replan): a genuine Lance Rewrite-vs-Update/Delete same-
  fragment conflict means our compaction did not commit — reopen at the new HEAD
  and re-plan. Lance rebases the common disjoint case (a concurrent insert/delete
  on other fragments) for free, so this fires only on real overlap.
- Inner loop (Phase C, monotonic publish): the manifest advanced between our
  compaction and our publish. The compaction is already committed at Lance HEAD N,
  so we must NOT reopen (that trips the HEAD>manifest drift guard on our own work).
  Re-read the current manifest version C: if C >= N the manifest already includes
  our compaction (versions are linear) — no-op; else fast-forward to N. Monotonic,
  not the strict equality CAS that manufactured the conflict.

The Phase-A sidecar is written once and reused across reopen attempts (every
Phase-B commit is content-preserving, so recovery rolls the observed HEAD forward
or safely rolls the compaction back). The in-process queue is kept — it is now an
in-process contention reducer, not the cross-process correctness guard. Shares the
COMPACTION_RETRY_BUDGET constant + is_retryable_lance_conflict with the internal
path; adds is_retryable_manifest_conflict for the publish loop. No writer_epoch.

Turns the prior commit's two race tests green.

* docs(rfc-013): two-op-class principle + the found+fixed optimize-vs-write race

§6.6 records the maintenance vs logical op-class distinction (maintenance commutes
→ Lance rebase + reopen/replan + monotonic manifest fast-forward, no writer_epoch;
logical → strict cross-process OCC + epoch) and the prod optimize-vs-served-write
race that motivated it, now landed. Adds the matching mechanic row to §4.2.

* fix(engine): retry must not misclassify optimize's own HEAD drift

Review catch on the cross-process optimize fix: the outer retry loop re-ran the
`lance_head > manifest` drift guard every iteration. After a partial Phase-B commit
(the auto_cleanup strip or compaction commits, then a later op hits a retryable
conflict), the reopened attempt saw HEAD ahead of the manifest — from OUR own
sidecar-covered work, not an external writer — and deleted the sidecar + returned
`skipped_for_drift`, stranding uncovered drift that then needs `repair`.

Track `head_advanced` (did one of our Phase-B ops already commit). The drift guard
now fires only when `!head_advanced` (genuine pre-existing external drift); once we
have advanced HEAD, a reopened HEAD>manifest is our work that the monotonic publish
fast-forwards. The no-op early-return likewise publishes prior committed work instead
of dropping it when `head_advanced`.

Regression test `optimize_retry_does_not_misclassify_own_head_drift` injects one
retryable reindex conflict after the compaction commits (new `optimize.inject_
reindex_conflict` seam); red→green verified by negative control (reverting the gate
reproduces `skipped_for_drift: Some(DriftNeedsRepair)`).

Also de-flake `optimize_survives_concurrent_insert_advancing_manifest`: pause at
`before_compact` (not post-compact) so the concurrent insert lands while HEAD==
manifest — otherwise it could race optimize's committed-but-unpublished compaction
and hit the write-path "HEAD ahead of manifest" guard.

* fix(engine): optimize publish converges on retry-budget exhaustion

Review catch (greptile): the monotonic Phase-C publish loop returned an error on its
final iteration's retryable manifest conflict, even though that conflict can itself
mean a concurrent writer published a version that already includes our (content-
preserving) compaction — i.e. the postcondition ("the manifest reflects our
compaction") is already met. Recovery covered it (no data loss), but the operator
saw a spurious error and had to re-run.

Restructure the loop to re-read `current` on every retryable conflict and, on budget
exhaustion, do a final `current >= state.version` convergence check before surfacing
the error — the §6.6 "postcondition is the state, not winning the CAS" principle.
Factor the repeated current-version read into `current_manifest_version`.
This commit is contained in:
Ragnor Comerford 2026-06-22 13:05:28 +02:00 committed by GitHub
parent 5cfae9acc1
commit 6d4606a830
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 603 additions and 188 deletions

View file

@ -359,205 +359,306 @@ async fn optimize_one_table(
.acquire_many(&[(table_key.clone(), None)])
.await;
// `compact_files` is a Lance-only maintenance API that needs `&mut Dataset`.
// The `TableStorage` trait deliberately does not surface it (the staged-write
// invariant covers writes; compaction is a separate concern). Unwrap the
// opaque `SnapshotHandle` via `into_dataset()` (`pub(crate)`, gated to the
// maintenance path).
let handle = db
.storage()
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?;
let mut ds = handle.into_dataset();
// Survive a CROSS-PROCESS race (a CLI `optimize` vs the served server): the
// in-process write queue above serializes only same-process writers, so we also
// retry. Two failure modes, two retry levels:
// * Outer loop — a genuine Lance `Rewrite`-vs-`Update/Delete` same-fragment
// conflict (compaction did NOT commit). Reopen at the new HEAD and re-plan,
// exactly as the internal-table path does. (Lance rebases the common disjoint
// case — a concurrent insert/delete on other fragments — for free, so this
// fires only on real overlap.)
// * Inner loop (Phase C) — the manifest advanced under us between our
// compaction and our publish. The compaction IS committed at Lance HEAD, so
// we must NOT reopen (that would trip the HEAD>manifest drift guard on our
// own work); instead re-read the current manifest version and either no-op
// (the manifest already moved past our version — being linear, it descends
// from and includes our compaction) or fast-forward to it. Monotonic, never
// the strict equality CAS that manufactured the bug.
//
// The Phase-A sidecar is written ONCE on the first productive attempt and reused
// across reopen attempts: every Phase-B commit is content-preserving, so a crash
// mid-retry leaves the table readable and recovery either rolls the observed HEAD
// forward (pin still matches the manifest) or safely rolls the compaction back.
let mut sidecar: Option<crate::db::manifest::RecoverySidecarHandle> = None;
// CAS baseline: the table's current manifest version, read under the queue
// (in-memory coordinator snapshot, no storage I/O — stable for this section).
let expected_version = db
.fresh_snapshot_for_branch(None)
.await?
.entry(&table_key)
.map(|e| e.table_version)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
// Tracks whether one of OUR Phase-B ops (auto-cleanup strip / compact / reindex)
// already committed and advanced Lance HEAD past the manifest in a prior attempt.
// Once true, a reopened `lance_head > manifest` is our own sidecar-covered work,
// NOT external drift — so the drift guard and the no-op early-return must not treat
// it as such (that would drop our committed work as uncovered drift).
let mut head_advanced = false;
let lance_head_version = ds.version().version;
if lance_head_version < expected_version {
return Err(OmniError::manifest_internal(format!(
"table '{}' Lance HEAD version {} is behind manifest version {}",
table_key, lance_head_version, expected_version
)));
}
if lance_head_version > expected_version {
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
manifest_version = expected_version,
lance_head_version,
"skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \
to classify and publish covered maintenance drift explicitly",
);
return Ok(TableOptimizeStats::skipped_for_drift(
table_key,
expected_version,
lance_head_version,
));
}
// Outer loop: open → plan → Phase B, reopening + re-planning on a retryable
// Lance conflict. Breaks with the committed snapshot once Phase B succeeds.
let mut attempt: u32 = 0;
let (snapshot, metrics, pending_indexes, committed) = loop {
attempt += 1;
// Precise "will it compact?" check — `plan_compaction` also accounts for
// deletion materialization (which can rewrite even a single fragment).
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let will_compact = plan.num_tasks() > 0;
// Even when there is nothing to compact, the table may still have index
// work: rows appended since the index was built (e.g. via `ingest --mode
// merge`) are scanned unindexed until folded in (needs_reindex), OR a
// declared `@index` was never built — schema apply records the intent but
// defers the physical build (iss-848), so optimize is the operator-facing
// reconciler that materializes it (needs_index_create). Any of the three is
// enough to enter the publish path. If NONE, this table is a no-op and must
// NOT be pinned in a sidecar — a zero-commit pin classifies NoMovement on
// recovery and forces an all-or-nothing rollback of sibling tables'
// legitimate work. Uncovered pre-existing manifest/HEAD drift is skipped
// above and goes through explicit repair, so this only runs on a healthy
// table under the per-table queue + sidecar.
let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?;
// needs_index_work_* checks "a declared index is missing AND row_count > 0",
// so empty tables stay no-ops (never pinned). It re-reads the head under the
// queue we already hold, so it is consistent with `ds`.
let needs_index_create = if let Some(type_name) = table_key.strip_prefix("node:") {
super::table_ops::needs_index_work_node(db, type_name, &table_key, &full_path, None).await?
} else {
super::table_ops::needs_index_work_edge(db, &table_key, &full_path, None).await?
};
if !will_compact && !needs_reindex && !needs_index_create {
return Ok(TableOptimizeStats::compacted(
table_key,
&CompactionMetrics::default(),
false,
));
}
// `compact_files` is a Lance-only maintenance API that needs `&mut Dataset`.
// The `TableStorage` trait deliberately does not surface it; unwrap the
// opaque `SnapshotHandle` via `into_dataset()` (gated to the maintenance path).
let mut ds = db
.storage()
.open_dataset_head_for_write(&table_key, &full_path, None)
.await?
.into_dataset();
// Phase A: recovery sidecar BEFORE any HEAD-advancing op (compaction or
// index optimize), so a crash before the manifest publish rolls forward on
// next open.
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::Optimize,
None,
// optimize is system-attributed (no `optimize_as` actor API today).
None,
vec![crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: full_path.clone(),
expected_version,
// Lower bound — compaction commits N≥1 versions (reserve + rewrite);
// the classifier loose-matches SidecarKind::Optimize.
post_commit_pin: expected_version + 1,
// Optimize uses the loose match (drift is derived state), not
// BranchMerge's Phase-B confirmation — left None.
confirmed_version: None,
table_branch: None,
}],
);
let handle =
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?;
// CAS baseline: the table's current manifest version, re-read each attempt
// (a reopen means the manifest may have advanced).
let expected_version = db
.fresh_snapshot_for_branch(None)
.await?
.entry(&table_key)
.map(|e| e.table_version)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
// Phase B: compaction (if any) then incremental index optimize — both
// advance Lance HEAD inside the sidecar window. `compact_files` rewrites
// fragments and drops them from existing index segments' coverage;
// `optimize_indices` folds the rewritten and any previously-unindexed
// fragments back in (Lance's incremental merge, not a full retrain). This
// is the same compact -> optimize_indices sequencing LanceDB's `optimize()`
// uses. `optimize_indices` is an inline-commit residual: lance-6.0.1
// exposes no uncommitted variant, so like `compact_files` it commits
// directly and relies on the sidecar for recovery.
// Capture the baseline BEFORE the auto-cleanup scrub below, so that if the
// scrub is the only thing that commits, `committed` is still true and Phase C
// publishes the advanced HEAD (no uncovered HEAD>manifest drift).
let version_before = ds.version().version;
let lance_head_version = ds.version().version;
if lance_head_version < expected_version {
return Err(OmniError::manifest_internal(format!(
"table '{}' Lance HEAD version {} is behind manifest version {}",
table_key, lance_head_version, expected_version
)));
}
if !head_advanced && lance_head_version > expected_version {
// Pre-existing EXTERNAL uncovered drift (we have not advanced HEAD yet) —
// go through explicit repair. Once `head_advanced` is set, a reopened
// `lance_head > manifest` is our own prior Phase-B commit (sidecar-covered)
// that the publish below fast-forwards, NOT external drift, so this guard is
// skipped on those retries.
if let Some(h) = sidecar.take() {
let _ = crate::db::manifest::delete_sidecar(&h, db.storage_adapter()).await;
}
tracing::warn!(
target: "omnigraph::optimize",
table = %table_key,
manifest_version = expected_version,
lance_head_version,
"skipping compaction: Lance HEAD is ahead of the manifest; run `omnigraph repair` \
to classify and publish covered maintenance drift explicitly",
);
return Ok(TableOptimizeStats::skipped_for_drift(
table_key.clone(),
expected_version,
lance_head_version,
));
}
// Keep optimize non-destructive on upgraded graphs (same guarantee the
// internal-table path makes — see `clear_stale_auto_cleanup_config`).
// `compact_files` / `optimize_indices` commit with a default `CommitConfig`
// (`skip_auto_cleanup = false`) and expose no skip override, so on a graph
// created by a pre-v7 binary (auto_cleanup ON) those commits would fire
// Lance's version-GC hook and prune `__manifest`-pinned data-table versions.
// Strip the stale config first. We hold the per-table queue, so no concurrent
// writer can race this (no retry loop needed, unlike the internal-table path);
// any commit it makes is content-preserving and covered by the Optimize
// sidecar's loose `post_commit_pin` like the other Phase-B commits.
clear_stale_auto_cleanup_config(&mut ds)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let metrics: CompactionMetrics = if will_compact {
compact_files(&mut ds, options, None)
// Precise "will it compact?" check — `plan_compaction` also accounts for
// deletion materialization (which can rewrite even a single fragment).
let options = CompactionOptions::default();
let plan = plan_compaction(&ds, &options)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?
} else {
CompactionMetrics::default()
.map_err(|e| OmniError::Lance(e.to_string()))?;
let will_compact = plan.num_tasks() > 0;
// Even with nothing to compact, the table may still have index work
// (needs_reindex: rows appended since the index was built; needs_index_create:
// a declared `@index` whose physical build schema apply deferred, iss-848).
// Any of the three enters the publish path. If NONE, this is a no-op and must
// NOT be pinned in a sidecar (a zero-commit pin classifies NoMovement on
// recovery and rolls back siblings).
let needs_reindex = TableStore::has_unindexed_fragments(&ds).await?;
let needs_index_create = if let Some(type_name) = table_key.strip_prefix("node:") {
super::table_ops::needs_index_work_node(db, type_name, &table_key, &full_path, None)
.await?
} else {
super::table_ops::needs_index_work_edge(db, &table_key, &full_path, None).await?
};
if !will_compact && !needs_reindex && !needs_index_create {
if head_advanced {
// Nothing left to compact, but a prior attempt already advanced HEAD
// (e.g. the strip committed, then compaction conflicted, and the reopen
// is now already compacted). Publish that committed work instead of
// dropping it as uncovered drift.
break (
crate::storage_layer::SnapshotHandle::new(ds),
CompactionMetrics::default(),
Vec::new(),
true,
);
}
if let Some(h) = sidecar.take() {
let _ = crate::db::manifest::delete_sidecar(&h, db.storage_adapter()).await;
}
return Ok(TableOptimizeStats::compacted(
table_key.clone(),
&CompactionMetrics::default(),
false,
));
}
// Phase A: recovery sidecar BEFORE any HEAD-advancing op, written once and
// reused across reopen attempts.
if sidecar.is_none() {
let sc = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::Optimize,
None,
// optimize is system-attributed (no `optimize_as` actor API today).
None,
vec![crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),
table_path: full_path.clone(),
expected_version,
// Lower bound — compaction commits N≥1 versions (reserve + rewrite);
// the classifier loose-matches SidecarKind::Optimize.
post_commit_pin: expected_version + 1,
confirmed_version: None,
table_branch: None,
}],
);
sidecar = Some(
crate::db::manifest::write_sidecar(db.root_uri(), db.storage_adapter(), &sc).await?,
);
}
// Test seam: a concurrent (cross-process) writer can interleave here, before
// any Phase-B commit lands, to exercise the reopen+replan path.
crate::failpoints::maybe_fail("optimize.before_compact")?;
// Phase B: scrub stale auto_cleanup (keeps optimize non-destructive on a
// graph upgraded from a pre-v7 binary whose `compact_files`/`optimize_indices`
// commits would otherwise fire Lance's auto-cleanup GC hook), compact,
// incremental reindex, then materialize declared-but-missing indexes. Each is
// an inline-commit residual covered by the sidecar. A retryable Lance conflict
// here means a concurrent writer preempted an overlapping fragment → reopen at
// the new HEAD and re-plan. Baseline captured BEFORE the scrub so that if the
// scrub is the only commit, `committed` still triggers the Phase-C publish.
let version_before = ds.version().version;
match clear_stale_auto_cleanup_config(&mut ds).await {
// `true` ⇒ the strip committed and advanced HEAD past the manifest.
Ok(stripped) => head_advanced |= stripped,
Err(e) if attempt < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => {
continue;
}
Err(e) => return Err(OmniError::Lance(e.to_string())),
}
let metrics: CompactionMetrics = if will_compact {
match compact_files(&mut ds, options, None).await {
Ok(m) => {
head_advanced = true;
m
}
Err(e) if attempt < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => {
continue;
}
Err(e) => return Err(OmniError::Lance(e.to_string())),
}
} else {
CompactionMetrics::default()
};
// Test seam: inject one retryable reindex conflict AFTER compaction has
// committed (so HEAD is already ahead of the manifest from our own work),
// exercising the own-HEAD (not external) drift classification on the next
// reopened attempt.
if crate::failpoints::maybe_fail("optimize.inject_reindex_conflict").is_err()
&& attempt < COMPACTION_RETRY_BUDGET
{
continue;
}
match ds.optimize_indices(&OptimizeOptions::default()).await {
Ok(()) => {}
Err(e) if attempt < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e) => {
continue;
}
Err(e) => {
return Err(OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)));
}
}
let catalog = db.catalog();
let mut snapshot = crate::storage_layer::SnapshotHandle::new(ds);
let pending_indexes: Vec<super::PendingIndex> =
super::table_ops::build_indices_on_dataset_for_catalog(
db,
&catalog,
&table_key,
&mut snapshot,
)
.await?;
// optimize_indices / index build may also have committed (folded fragments,
// built a deferred index). Any HEAD advance this attempt counts too.
let version_after = snapshot.dataset().version().version;
head_advanced |= version_after != version_before;
break (snapshot, metrics, pending_indexes, head_advanced);
};
ds.optimize_indices(&OptimizeOptions::default())
.await
.map_err(|e| OmniError::Lance(format!("optimize_indices on {}: {}", table_key, e)))?;
// Materialize any declared-but-missing index over the just-compacted layout,
// reusing the build chokepoint (idempotent: skips existing indexes; fault-
// isolates an untrainable vector column into `pending` rather than failing).
// Run it UNCONDITIONALLY now that we are past the no-op gate — not only when
// `needs_index_create`. A table can enter this path for compaction or
// reindex while its sole missing index is an untrainable Vector column
// (which `needs_index_work_*` does not count as buildable work); calling the
// build here is what surfaces that column in `pending_indexes`, so optimize
// can't compact a table yet silently drop the deferred-index signal.
// Idempotent + cheap when there is nothing to build. Vector index creation
// is an inline-commit residual; the Optimize sidecar's loose post_commit_pin
// covers the extra commits.
let catalog = db.catalog();
let mut snapshot = crate::storage_layer::SnapshotHandle::new(ds);
let pending_indexes: Vec<super::PendingIndex> =
super::table_ops::build_indices_on_dataset_for_catalog(
db,
&catalog,
&table_key,
&mut snapshot,
)
.await?;
let version_after = snapshot.dataset().version().version;
let committed = version_after != version_before;
// Pin the per-writer Phase B → Phase C residual for optimize: Lance HEAD has
// advanced but the manifest publish below hasn't run.
// Pin the per-writer Phase B → Phase C residual: Lance HEAD has advanced but the
// manifest publish below hasn't run.
crate::failpoints::maybe_fail("optimize.post_phase_b_pre_manifest_commit")?;
// Phase C: publish the compacted version to the manifest (one CAS commit,
// expected = the version observed under the queue). On failure the sidecar
// is intentionally left for the open-time recovery sweep to roll forward.
// Phase C: monotonic fast-forward publish. The compaction is committed at Lance
// HEAD `N`; publish a manifest pointer that includes it. If a concurrent writer
// already advanced the manifest to ≥ N (it built on our compaction), there is
// nothing to do. Otherwise advance to N; a concurrent advance during this window
// is a retryable manifest conflict — re-read the current version and re-evaluate
// (NOT a reopen: the compaction is already committed).
if committed {
let state = db.storage().table_state(&full_path, &snapshot).await?;
let update = crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata,
};
let mut expected = std::collections::HashMap::new();
expected.insert(table_key.clone(), expected_version);
db.coordinator
.write()
.await
.commit_updates_with_actor_with_expected(&[update], &expected, None)
.await?;
let mut published = false;
let mut last_conflict: Option<OmniError> = None;
for _ in 0..COMPACTION_RETRY_BUDGET {
let current = current_manifest_version(db, &table_key).await?;
if current >= state.version {
// The manifest already points at a version that includes our
// compaction (Lance versions are linear). Nothing to publish.
published = true;
break;
}
let update = crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: state.version,
table_branch: None,
row_count: state.row_count,
version_metadata: state.version_metadata.clone(),
};
let mut expected = std::collections::HashMap::new();
expected.insert(table_key.clone(), current);
match db
.coordinator
.write()
.await
.commit_updates_with_actor_with_expected(&[update], &expected, None)
.await
{
Ok(_) => {
published = true;
break;
}
// A retryable manifest conflict means the manifest moved under us —
// loop and re-read `current` (the top check converges if it now
// already includes our compaction). Record it for the exhaustion path.
Err(e) if is_retryable_manifest_conflict(&e) => last_conflict = Some(e),
// Leave the sidecar for the open-time recovery sweep to roll forward.
Err(e) => return Err(e),
}
}
if !published {
// Budget exhausted under sustained contention. The final conflict may
// itself mean a concurrent writer published a version that already
// includes our (content-preserving) compaction — the postcondition is
// "the manifest reflects our compaction," not "we won the CAS" — so
// re-check before surfacing an error (§6.6).
let current = current_manifest_version(db, &table_key).await?;
if current < state.version {
return Err(last_conflict.unwrap_or_else(|| {
OmniError::manifest_conflict(format!(
"optimize publish of {table_key} exhausted {COMPACTION_RETRY_BUDGET} \
retries against concurrent writers"
))
}));
}
}
}
// Phase D: delete the sidecar (best-effort; recovery resolves a leftover).
if let Err(err) = crate::db::manifest::delete_sidecar(&handle, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = handle.operation_id.as_str(),
"optimize recovery sidecar cleanup failed; next open's recovery sweep will resolve it"
);
if let Some(h) = sidecar.take() {
if let Err(err) = crate::db::manifest::delete_sidecar(&h, db.storage_adapter()).await {
tracing::warn!(
error = %err,
operation_id = h.operation_id.as_str(),
"optimize recovery sidecar cleanup failed; next open's recovery sweep will resolve it"
);
}
}
let mut stat = TableOptimizeStats::compacted(table_key, &metrics, committed);
@ -567,7 +668,7 @@ async fn optimize_one_table(
/// Bound on the app-level retry of an internal-table compaction against a
/// concurrent live writer (see [`is_retryable_lance_conflict`]).
const INTERNAL_COMPACTION_RETRY_BUDGET: u32 = 5;
const COMPACTION_RETRY_BUDGET: u32 = 5;
/// A Lance commit error that means "a concurrent writer preempted us; reload the
/// dataset and rerun." `compact_files` commits via `commit_compaction` ->
@ -588,6 +689,29 @@ fn is_retryable_lance_conflict(err: &lance::Error) -> bool {
)
}
/// A manifest publish conflict that optimize's monotonic Phase-C loop re-evaluates
/// (re-read the current version, then no-op or fast-forward). Both shapes that reach
/// here are `Conflict`-kind and mean "the manifest moved under us; reconsider," never
/// a lost update: the typed `ExpectedVersionMismatch` (a concurrent writer advanced
/// the table) and the publisher's exhausted row-level CAS (`manifest_conflict`).
fn is_retryable_manifest_conflict(err: &OmniError) -> bool {
matches!(
err,
OmniError::Manifest(m) if m.kind == crate::error::ManifestErrorKind::Conflict
)
}
/// The table's current manifest version on `main` (0 if absent), read fresh. Used by
/// optimize's monotonic publish loop to decide no-op (`current >= N`) vs fast-forward.
async fn current_manifest_version(db: &Omnigraph, table_key: &str) -> Result<u64> {
Ok(db
.fresh_snapshot_for_branch(None)
.await?
.entry(table_key)
.map(|e| e.table_version)
.unwrap_or(0))
}
/// Remove any stored `lance.auto_cleanup.*` config from a table so compaction
/// stays **non-destructive by construction**. Used by both the internal-table
/// path ([`compact_internal_table`]) and the data-table path
@ -666,7 +790,7 @@ async fn compact_internal_table(
// so optimize would otherwise fail spuriously on a live graph. On a retryable
// conflict we re-open at the new HEAD and rerun — the canonical Lance-consumer
// pattern. Each attempt opens fresh because the conflict means the version moved.
for attempt in 0..INTERNAL_COMPACTION_RETRY_BUDGET {
for attempt in 0..COMPACTION_RETRY_BUDGET {
let handle = db
.storage()
.open_dataset_head_for_write(table_key, &uri, None)
@ -678,7 +802,7 @@ async fn compact_internal_table(
let cleared_config = match clear_stale_auto_cleanup_config(&mut ds).await {
Ok(cleared) => cleared,
Err(e) => {
if attempt + 1 < INTERNAL_COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e)
if attempt + 1 < COMPACTION_RETRY_BUDGET && is_retryable_lance_conflict(&e)
{
continue;
}
@ -718,7 +842,7 @@ async fn compact_internal_table(
));
}
Err(e)
if attempt + 1 < INTERNAL_COMPACTION_RETRY_BUDGET
if attempt + 1 < COMPACTION_RETRY_BUDGET
&& is_retryable_lance_conflict(&e) =>
{
continue;
@ -727,7 +851,7 @@ async fn compact_internal_table(
}
}
Err(OmniError::manifest_conflict(format!(
"internal-table compaction of {table_key} exhausted {INTERNAL_COMPACTION_RETRY_BUDGET} \
"internal-table compaction of {table_key} exhausted {COMPACTION_RETRY_BUDGET} \
retries against concurrent writers"
)))
}

View file

@ -3089,6 +3089,7 @@ edge WorksAt: Person -> Company
/// forward on next open so the manifest tracks the Lance HEAD — and the healed
/// table must then accept a schema apply (the original bug's victim).
#[tokio::test]
#[serial(optimize)]
async fn optimize_phase_b_failure_recovered_on_next_open() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
@ -3178,6 +3179,242 @@ async fn optimize_phase_b_failure_recovered_on_next_open() {
.expect("schema apply after optimize recovery must succeed");
}
/// Cross-process race (the prod bug): a served write advances the manifest on the
/// same table while a SEPARATE `optimize` process is paused between its compaction
/// and its manifest publish. The in-process write queue does NOT serialize across
/// processes, so optimize's equality-CAS publish (expected = its pre-compaction
/// version) finds the manifest already advanced. optimize must CONVERGE — the
/// concurrent write built on top of the compacted HEAD, so the compaction is
/// already reflected — not fail with "expected X but current Y". RED before the
/// monotonic-publish fix.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial(optimize)]
async fn optimize_survives_concurrent_insert_advancing_manifest() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] {
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age)]),
)
.await
.unwrap();
}
}
// Pause optimize BEFORE it compacts, so the concurrent insert lands while
// HEAD == manifest (no in-flight optimize drift for the writer to trip on); the
// insert advances the manifest, then optimize compacts on top and must converge
// its publish over the advanced manifest rather than fail the equality CAS.
let failpoint = ScopedFailPoint::new("optimize.before_compact", "pause");
let uri_opt = uri.clone();
let optimize = tokio::spawn(async move {
let db = Omnigraph::open(&uri_opt).await.unwrap();
db.optimize().await
});
// Wait until optimize reaches the pause (its Optimize sidecar is on disk).
assert!(
wait_for_sidecar(dir.path()).await,
"optimize never reached the pre-compact pause",
);
// Concurrent insert on the SAME table via a SEPARATE handle (= separate
// in-process write queue = a different process) advances the manifest.
{
let db_b = Omnigraph::open(&uri).await.unwrap();
db_b.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "eve")], &[("$age", 34)]),
)
.await
.unwrap();
}
drop(failpoint); // release optimize
let result = tokio::time::timeout(std::time::Duration::from_secs(20), optimize)
.await
.expect("optimize task hung")
.unwrap();
result.expect("optimize must survive a concurrent same-table write (cross-process)");
// No lost write: 4 seed + eve all present; graph remains re-optimizable.
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
5,
"concurrent insert must not be lost",
);
db.optimize()
.await
.expect("graph must remain healthy / re-optimizable");
}
/// Cross-process race: a served DELETE commits on the same table while a SEPARATE
/// `optimize` process is parked just before its compaction. Lance rebases the
/// compaction past the delete cleanly (so this surfaces as a manifest-CAS mismatch
/// at publish, not a Lance `Rewrite` conflict — the genuine `Rewrite`-vs-`Rewrite`
/// overlap is the rarer many-fragment/concurrent-compaction case, covered by the
/// shared `is_retryable_lance_conflict` retry the internal-table path already
/// exercises). optimize must converge its publish over the advanced manifest and
/// preserve the delete. RED before the fix.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial(optimize)]
async fn optimize_survives_concurrent_delete_before_compaction() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] {
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age)]),
)
.await
.unwrap();
}
}
// Pause optimize BEFORE its compaction commits.
let failpoint = ScopedFailPoint::new("optimize.before_compact", "pause");
let uri_opt = uri.clone();
let optimize = tokio::spawn(async move {
let db = Omnigraph::open(&uri_opt).await.unwrap();
db.optimize().await
});
assert!(
wait_for_sidecar(dir.path()).await,
"optimize never reached the pre-compact pause",
);
// Concurrent DELETE of an existing row writes a deletion vector onto the
// fragment optimize is about to compact → optimize's Rewrite overlap-conflicts
// at the Lance level ("Rewrite … preempted by concurrent Delete/Update").
{
let db_b = Omnigraph::open(&uri).await.unwrap();
db_b.mutate(
"main",
MUTATION_QUERIES,
"remove_person",
&mixed_params(&[("$name", "alice")], &[]),
)
.await
.unwrap();
}
drop(failpoint); // release optimize
let result = tokio::time::timeout(std::time::Duration::from_secs(20), optimize)
.await
.expect("optimize task hung")
.unwrap();
result.expect("optimize must reopen+replan past a concurrent overlapping delete");
// No lost write: alice's delete persisted (3 rows); graph remains re-optimizable.
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
3,
"the concurrent delete must persist (alice removed)",
);
db.optimize()
.await
.expect("graph must remain healthy / re-optimizable");
}
/// Regression: the outer compaction retry loop must NOT misclassify optimize's OWN
/// committed Phase-B work as external drift. Attempt 1 compacts (HEAD → V+1); if a
/// LATER Phase-B op (reindex) then hits a retryable conflict, the reopened attempt
/// sees Lance HEAD ahead of the manifest — from OUR compaction, not an external
/// writer. The drift guard must skip it (we hold the sidecar) and converge, not
/// delete the sidecar and return `skipped_for_drift` (which would strand uncovered
/// drift). Reproduced by injecting one retryable reindex conflict after the compact.
#[tokio::test]
#[serial(optimize)]
async fn optimize_retry_does_not_misclassify_own_head_drift() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] {
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age)]),
)
.await
.unwrap();
}
}
// Inject exactly one retryable reindex conflict: attempt 1 compacts (HEAD+1) then
// "conflicts" on reindex → retry; attempt 2 reopens with HEAD ahead of the manifest
// from our own compaction — the misclassification trigger.
let _failpoint = ScopedFailPoint::new("optimize.inject_reindex_conflict", "1*return");
let db = Omnigraph::open(&uri).await.unwrap();
let stats = db
.optimize()
.await
.expect("optimize must converge, not misclassify its own HEAD drift");
let person = stats
.iter()
.find(|s| s.table_key == "node:Person")
.expect("node:Person stat present");
assert!(
person.skipped.is_none(),
"node:Person must converge, not skipped_for_drift: {:?}",
person.skipped,
);
// No uncovered drift stranded: a follow-up optimize is clean and all rows read.
let stats2 = db.optimize().await.unwrap();
let person2 = stats2
.iter()
.find(|s| s.table_key == "node:Person")
.unwrap();
assert!(
person2.skipped.is_none(),
"follow-up optimize must be clean (no stranded drift): {:?}",
person2.skipped,
);
assert_eq!(helpers::count_rows(&db, "node:Person").await, 4);
}
/// Poll until `optimize` has written its recovery sidecar (i.e. reached Phase B
/// and is about to / has compacted), signalling it is parked at its failpoint.
async fn wait_for_sidecar(root: &std::path::Path) -> bool {
let recovery_dir = root.join("__recovery");
for _ in 0..1000 {
if recovery_dir.exists()
&& std::fs::read_dir(&recovery_dir)
.map(|d| d.count() > 0)
.unwrap_or(false)
{
return true;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
false
}
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_phase_b_failure_recovered_on_next_open() {