fix(writes): tolerate benign drift, defer sidecar-covered (OCC fence = fresh manifest pin)

The shared pre-stage precondition rejected any strict write (update / delete
/ schema apply) whenever a table's Lance HEAD differed from the manifest pin
the caller captured. But `HEAD > pin` is ambiguous: it can be benign
content-preserving drift that never published (compaction, a recovery
restore, an old-binary optimize, an external compact_files) — which carries
no recovery sidecar and is safe to write over — or a real in-flight partial
write a recovery sidecar covers and the open-time sweep will roll back. The
old check failed both with a stale-view 409, so schema apply (and strict
mutation) could not run on any write-active graph that had ever compacted or
recovered, and the original +1-per-retry loop followed from it.

Replace it with `Omnigraph::ensure_writable_or_defer`, which makes the OCC
fence the *current* manifest pin (re-read fresh on the conflict path), not the
caller's possibly-stale snapshot pin:

- HEAD == caller pin: fresh, no drift -> proceed (fast path, no extra read).
- caller pin != current pin: the caller is stale -> ExpectedVersionMismatch
  here, before any staged commit or sidecar, so a stale handle still fails
  loudly and leaves no residue (the prior early-reject behavior is preserved).
- caller pin == current pin, HEAD > pin, no sidecar: benign drift -> proceed;
  the writer's commit + publisher CAS reconcile the manifest.
- caller pin == current pin, HEAD > pin, a (foreign) sidecar pins the table:
  defer with an actionable 'reopen to recover' error.
- HEAD < current pin: manifest leads durable Lance state -> loud invariant.

Load-bearing details:

- The OCC fence staging records is now the manifest pin threaded out of
  `open_for_mutation_on_branch` (4th tuple element), not `ds.version()`. With
  benign drift tolerated, capturing the drifted HEAD would relocate the
  rejection into a spurious 409 at the post-queue strict check.
- `sidecar_pins_table` takes `exclude_operation_id`: schema apply writes its
  own recovery sidecar before its per-table head re-checks, so it must skip
  that sidecar or it would defer against itself.
- The delete path's `initial_version` (reopen-for-mutation) stays `ds.version()`
  — it is a HEAD-vs-HEAD self-consistency check, not an OCC fence.

Only the pre-stage check changes; the post-queue strict checks and the
publisher CAS (manifest-vs-manifest) are unchanged.
This commit is contained in:
Ragnor Comerford 2026-06-08 11:04:10 +02:00
parent d599a8522a
commit d69b15d975
No known key found for this signature in database
8 changed files with 219 additions and 27 deletions

View file

@ -34,9 +34,9 @@ pub(crate) use namespace::open_table_head_for_write;
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::{
RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
SidecarTableRegistration, SidecarTombstone, delete_sidecar, has_schema_apply_sidecar,
list_sidecars, new_sidecar, recover_manifest_drift, write_sidecar,
RecoveryMode, RecoverySidecarHandle, SidecarKind, SidecarTablePin, SidecarTableRegistration,
SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, list_sidecars, new_sidecar,
recover_manifest_drift, sidecar_pins_table, write_sidecar,
};
pub use state::SubTableEntry;
#[cfg(test)]

View file

@ -379,6 +379,65 @@ pub(crate) async fn list_sidecars(
Ok(out)
}
/// True if any pending recovery sidecar pins `(table_key, branch)` — i.e. a
/// failed multi-table write left partial state that the open-time sweep will
/// roll back. Consumer-write preconditions use this to DEFER on sidecar-covered
/// drift while tolerating benign orphaned drift (compaction / a recovery
/// `restore` / an old-binary optimize / external `compact_files`), which is
/// content-preserving and carries NO sidecar.
///
/// Branch match normalizes `None` (main/default) and `Some("main")` as equal.
/// Checks both `RecoverySidecar.tables` (pins) AND `additional_registrations` —
/// a SchemaApply sidecar pins a not-yet-registered added/renamed-target table
/// only in `additional_registrations`.
///
/// `exclude_operation_id` skips the caller's OWN in-flight sidecar: a writer
/// that has already written its sidecar (schema apply writes one before its
/// per-table head re-checks) must not treat that sidecar as foreign and defer
/// against itself. Mutation/load writers pass `None` (they check before writing
/// any sidecar, so every sidecar found is foreign). A foreign sidecar means a
/// PRIOR op crashed mid-flight on this still-open handle; the open-time sweep
/// has not yet run for it.
///
/// Single-coordinator note: this reads `__recovery/` at precondition time. The
/// only writer of sidecars on a graph is the local process, and the open-time
/// sweep resolves them before request handlers run, so there is no
/// read-then-write race under the single-coordinator model (see
/// `recover_manifest_drift`). A future multi-coordinator deployment would need
/// queue coordination here.
pub(crate) async fn sidecar_pins_table(
root_uri: &str,
storage: &dyn StorageAdapter,
table_key: &str,
branch: Option<&str>,
exclude_operation_id: Option<&str>,
) -> Result<bool> {
fn norm(b: Option<&str>) -> Option<&str> {
match b {
None | Some("main") => None,
other => other,
}
}
let want = norm(branch);
for sidecar in list_sidecars(root_uri, storage).await? {
if exclude_operation_id == Some(sidecar.operation_id.as_str()) {
continue;
}
let pinned = sidecar
.tables
.iter()
.any(|p| p.table_key == table_key && norm(p.table_branch.as_deref()) == want)
|| sidecar
.additional_registrations
.iter()
.any(|r| r.table_key == table_key && norm(r.table_branch.as_deref()) == want);
if pinned {
return Ok(true);
}
}
Ok(false)
}
/// Parse a sidecar body, enforcing the schema-version refusal policy.
/// Exposed separately so unit tests can exercise the parse path without
/// going through storage.

View file

@ -628,6 +628,109 @@ impl Omnigraph {
&self.root_uri
}
/// Pre-stage write precondition for strict writers (Update / Delete /
/// SchemaRewrite / schema-apply index rebuild). Decides whether the writer
/// may proceed against a table whose Lance HEAD may sit ahead of the
/// manifest pin it captured in its snapshot.
///
/// `expected_version` is the manifest pin from the caller's snapshot, which
/// may be STALE (another writer published since the caller opened). The OCC
/// fence must therefore be the *current* manifest pin, re-read fresh here on
/// the conflict path. `Lance HEAD > caller pin` is ambiguous between a stale
/// handle and genuine drift, and only the fresh pin disambiguates:
///
/// - `HEAD == caller pin` — fresh and in sync (the current pin is squeezed
/// between them); proceed without an extra manifest read.
/// - `caller pin != current pin` — the caller's pre-write view is stale (or
/// inconsistent) relative to the live manifest: a normal optimistic-
/// concurrency conflict. Reject with `ExpectedVersionMismatch` here, BEFORE
/// any staged commit or recovery sidecar, so the client refreshes and
/// retries and no residue is left. (Tolerating this would let a stale write
/// advance Lance HEAD and write a sidecar before failing the publisher CAS.)
/// - `caller pin == current pin`, `HEAD == current pin` — no drift; proceed.
/// - `caller pin == current pin`, `HEAD > current pin` — genuine drift, the
/// caller is fresh w.r.t. the manifest but durable Lance HEAD is ahead:
/// - no sidecar pins the table — benign content-preserving drift
/// (compaction / a recovery `restore` / an old-binary optimize / an
/// external `compact_files`), which never published and leaves no
/// sidecar. Proceed: the writer's commit + the publisher CAS reconcile
/// the manifest.
/// - a sidecar pins the table — a real in-flight partial write the
/// open-time sweep will roll back. Defer; never write onto state
/// recovery may revert.
/// - `HEAD < current pin` — the manifest cannot lead durable Lance state
/// under the commit protocol; a hard, loud invariant violation.
///
/// `exclude_operation_id` is the caller's OWN in-flight sidecar id, if it
/// has already written one (schema apply does, before its per-table head
/// re-checks); that sidecar is skipped so the writer does not defer against
/// itself. Mutation/load writers check before writing any sidecar and pass
/// `None`.
///
/// Tolerating uncovered drift is correct only because such drift is always
/// content-preserving (see [`crate::db::manifest::sidecar_pins_table`]); a
/// future non-content-preserving HEAD advance with no sidecar would have to
/// register one or this would become a silent-data-loss vector.
pub(crate) async fn ensure_writable_or_defer(
&self,
ds: &Dataset,
table_key: &str,
branch: Option<&str>,
expected_version: u64,
exclude_operation_id: Option<&str>,
) -> Result<()> {
let head = ds.version().version;
if head == expected_version {
// Caller's pinned version matches durable Lance HEAD: fresh and no
// drift (the live pin is squeezed between them). Fast path.
return Ok(());
}
// head != caller pin. Re-read the CURRENT manifest pin fresh — the
// caller's snapshot may be stale — to tell a stale handle (normal OCC
// conflict) apart from genuine Lance-HEAD-vs-manifest drift.
let coordinator = self.open_coordinator_for_branch(branch).await?;
let current_pin = coordinator
.snapshot()
.entry(table_key)
.map(|entry| entry.table_version)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {table_key}")))?;
if expected_version != current_pin {
// Stale (or inconsistent) pre-write view vs. the live manifest.
// Reject early — before any staged commit or sidecar — so the client
// refreshes and retries with no residue left behind.
return Err(OmniError::manifest_expected_version_mismatch(
table_key,
expected_version,
current_pin,
));
}
if head < current_pin {
return Err(OmniError::manifest_internal(format!(
"manifest pin for '{table_key}' leads durable Lance HEAD \
(pin={current_pin}, head={head}); the manifest cannot be ahead \
of committed Lance state under the commit protocol",
)));
}
// caller is fresh (expected == current_pin) and head > current_pin:
// genuine drift. Benign (no sidecar) vs. a partial write (sidecar).
if crate::db::manifest::sidecar_pins_table(
self.root_uri(),
self.storage_adapter(),
table_key,
branch,
exclude_operation_id,
)
.await?
{
return Err(OmniError::manifest_conflict(format!(
"'{table_key}' has a pending recovery sidecar (Lance head={head} \
is ahead of manifest pin={current_pin}); reopen the graph \
to run the recovery sweep before writing",
)));
}
Ok(())
}
pub(crate) async fn open_coordinator_for_branch(
&self,
branch: Option<&str>,
@ -1349,7 +1452,7 @@ impl Omnigraph {
&self,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
) -> Result<(Dataset, String, Option<String>, u64)> {
table_ops::open_for_mutation(self, table_key, op_kind).await
}
@ -1358,7 +1461,7 @@ impl Omnigraph {
branch: Option<&str>,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
) -> Result<(Dataset, String, Option<String>, u64)> {
table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await
}
@ -2131,7 +2234,7 @@ edge WorksAt: Person -> Company
}
async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option<i32>) {
let (mut ds, full_path, table_branch) = db
let (mut ds, full_path, table_branch, _manifest_pin) = db
.open_for_mutation("node:Person", crate::db::MutationOpKind::Insert)
.await
.unwrap();

View file

@ -467,6 +467,13 @@ where
)
};
// This apply's own in-flight sidecar id (if one was written above). The
// per-table head re-checks below exclude it so they tolerate benign drift
// without deferring against the apply's own recovery coverage.
let recovery_operation_id = recovery_handle
.as_ref()
.map(|handle| handle.operation_id.clone());
for table_key in &added_tables {
let table_path = table_path_for_table_key(table_key)?;
let dataset_uri = db.table_store.dataset_uri(&table_path);
@ -495,7 +502,8 @@ where
source_table_key
))
})?;
ensure_snapshot_entry_head_matches(db, source_entry).await?;
ensure_snapshot_entry_head_matches(db, source_entry, recovery_operation_id.as_deref())
.await?;
let source_ds = snapshot.open(source_table_key).await?;
let current_catalog = db.catalog();
let batch = batch_for_schema_apply_rewrite(
@ -541,7 +549,7 @@ where
table_key
))
})?;
ensure_snapshot_entry_head_matches(db, entry).await?;
ensure_snapshot_entry_head_matches(db, entry, recovery_operation_id.as_deref()).await?;
let source_ds = snapshot.open(table_key).await?;
let current_catalog = db.catalog();
let batch = batch_for_schema_apply_rewrite(
@ -610,14 +618,14 @@ where
table_key
))
})?;
ensure_snapshot_entry_head_matches(db, entry).await?;
ensure_snapshot_entry_head_matches(db, entry, recovery_operation_id.as_deref()).await?;
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
let mut ds = db
.table_store
.open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref())
.await?;
db.table_store
.ensure_expected_version(&ds, table_key, entry.table_version)?;
// No redundant strict re-check here: `ensure_snapshot_entry_head_matches`
// above already applied the tolerant precondition for this entry.
db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds)
.await?;
let state = db.table_store.table_state(&dataset_uri, &ds).await?;
@ -868,6 +876,7 @@ pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &s
pub(super) async fn ensure_snapshot_entry_head_matches(
db: &Omnigraph,
entry: &SubTableEntry,
exclude_operation_id: Option<&str>,
) -> Result<()> {
let dataset_uri = db.table_store.dataset_uri(&entry.table_path);
let ds = db
@ -878,8 +887,20 @@ pub(super) async fn ensure_snapshot_entry_head_matches(
entry.table_branch.as_deref(),
)
.await?;
db.table_store
.ensure_expected_version(&ds, &entry.table_key, entry.table_version)
// Tolerate benign content-preserving drift (Lance HEAD ahead of the manifest
// pin with no recovery sidecar) — schema apply reads the source at the pinned
// version and rewrites onto HEAD, which is safe precisely because uncovered
// drift preserves content. A FOREIGN sidecar-covered drift defers to recovery;
// `exclude_operation_id` skips schema apply's OWN already-written sidecar so
// these per-table re-checks don't defer against the in-flight apply itself.
db.ensure_writable_or_defer(
&ds,
&entry.table_key,
entry.table_branch.as_deref(),
entry.table_version,
exclude_operation_id,
)
.await
}
pub(super) async fn batch_for_schema_apply_rewrite(

View file

@ -404,7 +404,7 @@ pub(super) async fn open_for_mutation(
db: &Omnigraph,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
) -> Result<(Dataset, String, Option<String>, u64)> {
let current_branch = db
.coordinator
.read()
@ -425,7 +425,7 @@ pub(super) async fn open_for_mutation_on_branch(
branch: Option<&str>,
table_key: &str,
op_kind: crate::db::MutationOpKind,
) -> Result<(Dataset, String, Option<String>)> {
) -> Result<(Dataset, String, Option<String>, u64)> {
db.ensure_schema_apply_not_locked("write").await?;
let resolved = db.resolved_branch_target(branch).await?;
let entry = resolved
@ -433,6 +433,10 @@ pub(super) async fn open_for_mutation_on_branch(
.entry(table_key)
.ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?;
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
// The manifest pin (`entry.table_version`) is the OCC fence threaded back to
// the caller — staging records it as `expected_version`, never the Lance
// HEAD, so benign drift tolerated below doesn't relocate into a spurious
// post-queue 409.
match resolved.branch.as_deref() {
None => {
let ds = db
@ -440,10 +444,10 @@ pub(super) async fn open_for_mutation_on_branch(
.open_dataset_head_for_write(table_key, &full_path, None)
.await?;
if op_kind.strict_pre_stage_version_check() {
db.table_store
.ensure_expected_version(&ds, table_key, entry.table_version)?;
db.ensure_writable_or_defer(&ds, table_key, None, entry.table_version, None)
.await?;
}
Ok((ds, full_path, None))
Ok((ds, full_path, None, entry.table_version))
}
Some(active_branch) => {
let (ds, table_branch) = open_owned_dataset_for_branch_write(
@ -456,7 +460,7 @@ pub(super) async fn open_for_mutation_on_branch(
op_kind,
)
.await?;
Ok((ds, full_path, table_branch))
Ok((ds, full_path, table_branch, entry.table_version))
}
}
}
@ -477,8 +481,8 @@ pub(super) async fn open_owned_dataset_for_branch_write(
.open_dataset_head_for_write(table_key, full_path, Some(active_branch))
.await?;
if op_kind.strict_pre_stage_version_check() {
db.table_store
.ensure_expected_version(&ds, table_key, entry_version)?;
db.ensure_writable_or_defer(&ds, table_key, Some(active_branch), entry_version, None)
.await?;
}
Ok((ds, Some(active_branch.to_string())))
}

View file

@ -950,7 +950,7 @@ async fn publish_rewritten_merge_table(
// source onto target). The inline `delete_where` later in this
// function operates on rows the rewrite chose to remove, not
// user-facing predicates, so Merge is the correct policy here.
let (ds, full_path, table_branch) = target_db
let (ds, full_path, table_branch, _manifest_pin) = target_db
.open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
.await?;
let mut current_ds = ds;

View file

@ -620,10 +620,15 @@ async fn open_table_for_mutation(
.await?;
return Ok((ds, path.full_path.clone(), path.table_branch.clone()));
}
let (ds, full_path, table_branch) = db
let (ds, full_path, table_branch, manifest_pin) = db
.open_for_mutation_on_branch(branch, table_key, op_kind)
.await?;
let expected_version = ds.version().version;
// The OCC fence staging records is the manifest pin, NOT the Lance HEAD
// (`ds.version()`). With benign drift now tolerated at the pre-stage check,
// capturing HEAD here would relocate the rejection into a spurious 409 at
// the post-queue strict check (`StagedMutation::commit_all`), which compares
// this value against the freshly re-read pin.
let expected_version = manifest_pin;
staging.ensure_path(
table_key,
full_path.clone(),

View file

@ -418,7 +418,7 @@ async fn load_jsonl_reader<R: BufRead>(
// accumulator. Overwrite → concurrent inline-commit (legacy path).
if use_staging {
for (type_name, table_key, batch, loaded_count) in prepared_nodes {
let (ds, full_path, table_branch) = db
let (ds, full_path, table_branch, _manifest_pin) = db
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
.await?;
let expected_version = ds.version().version;
@ -528,7 +528,7 @@ async fn load_jsonl_reader<R: BufRead>(
// Phase 2e: write every edge type. Same dispatch as Phase 2b.
if use_staging {
for (edge_name, table_key, batch, loaded_count) in prepared_edges {
let (ds, full_path, table_branch) = db
let (ds, full_path, table_branch, _manifest_pin) = db
.open_for_mutation_on_branch(branch, &table_key, load_op_kind)
.await?;
let expected_version = ds.version().version;
@ -1208,7 +1208,7 @@ async fn write_batch_to_dataset(
LoadMode::Merge => crate::db::MutationOpKind::Merge,
LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite,
};
let (mut ds, full_path, table_branch) = db
let (mut ds, full_path, table_branch, _manifest_pin) = db
.open_for_mutation_on_branch(branch, table_key, op_kind)
.await?;
let table_store = db.table_store();