From d69b15d9750aea616a688c72ea9d655e1d034388 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Mon, 8 Jun 2026 11:04:10 +0200 Subject: [PATCH] fix(writes): tolerate benign drift, defer sidecar-covered (OCC fence = fresh manifest pin) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The shared pre-stage precondition rejected any strict write (update / delete / schema apply) whenever a table's Lance HEAD differed from the manifest pin the caller captured. But `HEAD > pin` is ambiguous: it can be benign content-preserving drift that never published (compaction, a recovery restore, an old-binary optimize, an external compact_files) — which carries no recovery sidecar and is safe to write over — or a real in-flight partial write a recovery sidecar covers and the open-time sweep will roll back. The old check failed both with a stale-view 409, so schema apply (and strict mutation) could not run on any write-active graph that had ever compacted or recovered, and the original +1-per-retry loop followed from it. Replace it with `Omnigraph::ensure_writable_or_defer`, which makes the OCC fence the *current* manifest pin (re-read fresh on the conflict path), not the caller's possibly-stale snapshot pin: - HEAD == caller pin: fresh, no drift -> proceed (fast path, no extra read). - caller pin != current pin: the caller is stale -> ExpectedVersionMismatch here, before any staged commit or sidecar, so a stale handle still fails loudly and leaves no residue (the prior early-reject behavior is preserved). - caller pin == current pin, HEAD > pin, no sidecar: benign drift -> proceed; the writer's commit + publisher CAS reconcile the manifest. - caller pin == current pin, HEAD > pin, a (foreign) sidecar pins the table: defer with an actionable 'reopen to recover' error. - HEAD < current pin: manifest leads durable Lance state -> loud invariant. Load-bearing details: - The OCC fence staging records is now the manifest pin threaded out of `open_for_mutation_on_branch` (4th tuple element), not `ds.version()`. With benign drift tolerated, capturing the drifted HEAD would relocate the rejection into a spurious 409 at the post-queue strict check. - `sidecar_pins_table` takes `exclude_operation_id`: schema apply writes its own recovery sidecar before its per-table head re-checks, so it must skip that sidecar or it would defer against itself. - The delete path's `initial_version` (reopen-for-mutation) stays `ds.version()` — it is a HEAD-vs-HEAD self-consistency check, not an OCC fence. Only the pre-stage check changes; the post-queue strict checks and the publisher CAS (manifest-vs-manifest) are unchanged. --- crates/omnigraph/src/db/manifest.rs | 6 +- crates/omnigraph/src/db/manifest/recovery.rs | 59 ++++++++++ crates/omnigraph/src/db/omnigraph.rs | 109 +++++++++++++++++- .../src/db/omnigraph/schema_apply.rs | 35 ++++-- .../omnigraph/src/db/omnigraph/table_ops.rs | 20 ++-- crates/omnigraph/src/exec/merge.rs | 2 +- crates/omnigraph/src/exec/mutation.rs | 9 +- crates/omnigraph/src/loader/mod.rs | 6 +- 8 files changed, 219 insertions(+), 27 deletions(-) diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index 5bf1f87..5b4176f 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -34,9 +34,9 @@ pub(crate) use namespace::open_table_head_for_write; use namespace::{branch_manifest_namespace, staged_table_namespace}; use publisher::{GraphNamespacePublisher, ManifestBatchPublisher}; pub(crate) use recovery::{ - RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin, - SidecarTableRegistration, SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, - list_sidecars, new_sidecar, recover_manifest_drift, write_sidecar, + RecoveryMode, RecoverySidecarHandle, SidecarKind, SidecarTablePin, SidecarTableRegistration, + SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, list_sidecars, new_sidecar, + recover_manifest_drift, sidecar_pins_table, write_sidecar, }; pub use state::SubTableEntry; #[cfg(test)] diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 3119531..0c974ef 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -379,6 +379,65 @@ pub(crate) async fn list_sidecars( Ok(out) } +/// True if any pending recovery sidecar pins `(table_key, branch)` — i.e. a +/// failed multi-table write left partial state that the open-time sweep will +/// roll back. Consumer-write preconditions use this to DEFER on sidecar-covered +/// drift while tolerating benign orphaned drift (compaction / a recovery +/// `restore` / an old-binary optimize / external `compact_files`), which is +/// content-preserving and carries NO sidecar. +/// +/// Branch match normalizes `None` (main/default) and `Some("main")` as equal. +/// Checks both `RecoverySidecar.tables` (pins) AND `additional_registrations` — +/// a SchemaApply sidecar pins a not-yet-registered added/renamed-target table +/// only in `additional_registrations`. +/// +/// `exclude_operation_id` skips the caller's OWN in-flight sidecar: a writer +/// that has already written its sidecar (schema apply writes one before its +/// per-table head re-checks) must not treat that sidecar as foreign and defer +/// against itself. Mutation/load writers pass `None` (they check before writing +/// any sidecar, so every sidecar found is foreign). A foreign sidecar means a +/// PRIOR op crashed mid-flight on this still-open handle; the open-time sweep +/// has not yet run for it. +/// +/// Single-coordinator note: this reads `__recovery/` at precondition time. The +/// only writer of sidecars on a graph is the local process, and the open-time +/// sweep resolves them before request handlers run, so there is no +/// read-then-write race under the single-coordinator model (see +/// `recover_manifest_drift`). A future multi-coordinator deployment would need +/// queue coordination here. +pub(crate) async fn sidecar_pins_table( + root_uri: &str, + storage: &dyn StorageAdapter, + table_key: &str, + branch: Option<&str>, + exclude_operation_id: Option<&str>, +) -> Result { + fn norm(b: Option<&str>) -> Option<&str> { + match b { + None | Some("main") => None, + other => other, + } + } + let want = norm(branch); + for sidecar in list_sidecars(root_uri, storage).await? { + if exclude_operation_id == Some(sidecar.operation_id.as_str()) { + continue; + } + let pinned = sidecar + .tables + .iter() + .any(|p| p.table_key == table_key && norm(p.table_branch.as_deref()) == want) + || sidecar + .additional_registrations + .iter() + .any(|r| r.table_key == table_key && norm(r.table_branch.as_deref()) == want); + if pinned { + return Ok(true); + } + } + Ok(false) +} + /// Parse a sidecar body, enforcing the schema-version refusal policy. /// Exposed separately so unit tests can exercise the parse path without /// going through storage. diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index ba2b70e..eb6ba0a 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -628,6 +628,109 @@ impl Omnigraph { &self.root_uri } + /// Pre-stage write precondition for strict writers (Update / Delete / + /// SchemaRewrite / schema-apply index rebuild). Decides whether the writer + /// may proceed against a table whose Lance HEAD may sit ahead of the + /// manifest pin it captured in its snapshot. + /// + /// `expected_version` is the manifest pin from the caller's snapshot, which + /// may be STALE (another writer published since the caller opened). The OCC + /// fence must therefore be the *current* manifest pin, re-read fresh here on + /// the conflict path. `Lance HEAD > caller pin` is ambiguous between a stale + /// handle and genuine drift, and only the fresh pin disambiguates: + /// + /// - `HEAD == caller pin` — fresh and in sync (the current pin is squeezed + /// between them); proceed without an extra manifest read. + /// - `caller pin != current pin` — the caller's pre-write view is stale (or + /// inconsistent) relative to the live manifest: a normal optimistic- + /// concurrency conflict. Reject with `ExpectedVersionMismatch` here, BEFORE + /// any staged commit or recovery sidecar, so the client refreshes and + /// retries and no residue is left. (Tolerating this would let a stale write + /// advance Lance HEAD and write a sidecar before failing the publisher CAS.) + /// - `caller pin == current pin`, `HEAD == current pin` — no drift; proceed. + /// - `caller pin == current pin`, `HEAD > current pin` — genuine drift, the + /// caller is fresh w.r.t. the manifest but durable Lance HEAD is ahead: + /// - no sidecar pins the table — benign content-preserving drift + /// (compaction / a recovery `restore` / an old-binary optimize / an + /// external `compact_files`), which never published and leaves no + /// sidecar. Proceed: the writer's commit + the publisher CAS reconcile + /// the manifest. + /// - a sidecar pins the table — a real in-flight partial write the + /// open-time sweep will roll back. Defer; never write onto state + /// recovery may revert. + /// - `HEAD < current pin` — the manifest cannot lead durable Lance state + /// under the commit protocol; a hard, loud invariant violation. + /// + /// `exclude_operation_id` is the caller's OWN in-flight sidecar id, if it + /// has already written one (schema apply does, before its per-table head + /// re-checks); that sidecar is skipped so the writer does not defer against + /// itself. Mutation/load writers check before writing any sidecar and pass + /// `None`. + /// + /// Tolerating uncovered drift is correct only because such drift is always + /// content-preserving (see [`crate::db::manifest::sidecar_pins_table`]); a + /// future non-content-preserving HEAD advance with no sidecar would have to + /// register one or this would become a silent-data-loss vector. + pub(crate) async fn ensure_writable_or_defer( + &self, + ds: &Dataset, + table_key: &str, + branch: Option<&str>, + expected_version: u64, + exclude_operation_id: Option<&str>, + ) -> Result<()> { + let head = ds.version().version; + if head == expected_version { + // Caller's pinned version matches durable Lance HEAD: fresh and no + // drift (the live pin is squeezed between them). Fast path. + return Ok(()); + } + // head != caller pin. Re-read the CURRENT manifest pin fresh — the + // caller's snapshot may be stale — to tell a stale handle (normal OCC + // conflict) apart from genuine Lance-HEAD-vs-manifest drift. + let coordinator = self.open_coordinator_for_branch(branch).await?; + let current_pin = coordinator + .snapshot() + .entry(table_key) + .map(|entry| entry.table_version) + .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {table_key}")))?; + if expected_version != current_pin { + // Stale (or inconsistent) pre-write view vs. the live manifest. + // Reject early — before any staged commit or sidecar — so the client + // refreshes and retries with no residue left behind. + return Err(OmniError::manifest_expected_version_mismatch( + table_key, + expected_version, + current_pin, + )); + } + if head < current_pin { + return Err(OmniError::manifest_internal(format!( + "manifest pin for '{table_key}' leads durable Lance HEAD \ + (pin={current_pin}, head={head}); the manifest cannot be ahead \ + of committed Lance state under the commit protocol", + ))); + } + // caller is fresh (expected == current_pin) and head > current_pin: + // genuine drift. Benign (no sidecar) vs. a partial write (sidecar). + if crate::db::manifest::sidecar_pins_table( + self.root_uri(), + self.storage_adapter(), + table_key, + branch, + exclude_operation_id, + ) + .await? + { + return Err(OmniError::manifest_conflict(format!( + "'{table_key}' has a pending recovery sidecar (Lance head={head} \ + is ahead of manifest pin={current_pin}); reopen the graph \ + to run the recovery sweep before writing", + ))); + } + Ok(()) + } + pub(crate) async fn open_coordinator_for_branch( &self, branch: Option<&str>, @@ -1349,7 +1452,7 @@ impl Omnigraph { &self, table_key: &str, op_kind: crate::db::MutationOpKind, - ) -> Result<(Dataset, String, Option)> { + ) -> Result<(Dataset, String, Option, u64)> { table_ops::open_for_mutation(self, table_key, op_kind).await } @@ -1358,7 +1461,7 @@ impl Omnigraph { branch: Option<&str>, table_key: &str, op_kind: crate::db::MutationOpKind, - ) -> Result<(Dataset, String, Option)> { + ) -> Result<(Dataset, String, Option, u64)> { table_ops::open_for_mutation_on_branch(self, branch, table_key, op_kind).await } @@ -2131,7 +2234,7 @@ edge WorksAt: Person -> Company } async fn seed_person_row(db: &mut Omnigraph, name: &str, age: Option) { - let (mut ds, full_path, table_branch) = db + let (mut ds, full_path, table_branch, _manifest_pin) = db .open_for_mutation("node:Person", crate::db::MutationOpKind::Insert) .await .unwrap(); diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 7cb3193..fe14b13 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -467,6 +467,13 @@ where ) }; + // This apply's own in-flight sidecar id (if one was written above). The + // per-table head re-checks below exclude it so they tolerate benign drift + // without deferring against the apply's own recovery coverage. + let recovery_operation_id = recovery_handle + .as_ref() + .map(|handle| handle.operation_id.clone()); + for table_key in &added_tables { let table_path = table_path_for_table_key(table_key)?; let dataset_uri = db.table_store.dataset_uri(&table_path); @@ -495,7 +502,8 @@ where source_table_key )) })?; - ensure_snapshot_entry_head_matches(db, source_entry).await?; + ensure_snapshot_entry_head_matches(db, source_entry, recovery_operation_id.as_deref()) + .await?; let source_ds = snapshot.open(source_table_key).await?; let current_catalog = db.catalog(); let batch = batch_for_schema_apply_rewrite( @@ -541,7 +549,7 @@ where table_key )) })?; - ensure_snapshot_entry_head_matches(db, entry).await?; + ensure_snapshot_entry_head_matches(db, entry, recovery_operation_id.as_deref()).await?; let source_ds = snapshot.open(table_key).await?; let current_catalog = db.catalog(); let batch = batch_for_schema_apply_rewrite( @@ -610,14 +618,14 @@ where table_key )) })?; - ensure_snapshot_entry_head_matches(db, entry).await?; + ensure_snapshot_entry_head_matches(db, entry, recovery_operation_id.as_deref()).await?; let dataset_uri = db.table_store.dataset_uri(&entry.table_path); let mut ds = db .table_store .open_dataset_head_for_write(table_key, &dataset_uri, entry.table_branch.as_deref()) .await?; - db.table_store - .ensure_expected_version(&ds, table_key, entry.table_version)?; + // No redundant strict re-check here: `ensure_snapshot_entry_head_matches` + // above already applied the tolerant precondition for this entry. db.build_indices_on_dataset_for_catalog(&desired_catalog, table_key, &mut ds) .await?; let state = db.table_store.table_state(&dataset_uri, &ds).await?; @@ -868,6 +876,7 @@ pub(super) async fn ensure_schema_apply_not_locked(db: &Omnigraph, operation: &s pub(super) async fn ensure_snapshot_entry_head_matches( db: &Omnigraph, entry: &SubTableEntry, + exclude_operation_id: Option<&str>, ) -> Result<()> { let dataset_uri = db.table_store.dataset_uri(&entry.table_path); let ds = db @@ -878,8 +887,20 @@ pub(super) async fn ensure_snapshot_entry_head_matches( entry.table_branch.as_deref(), ) .await?; - db.table_store - .ensure_expected_version(&ds, &entry.table_key, entry.table_version) + // Tolerate benign content-preserving drift (Lance HEAD ahead of the manifest + // pin with no recovery sidecar) — schema apply reads the source at the pinned + // version and rewrites onto HEAD, which is safe precisely because uncovered + // drift preserves content. A FOREIGN sidecar-covered drift defers to recovery; + // `exclude_operation_id` skips schema apply's OWN already-written sidecar so + // these per-table re-checks don't defer against the in-flight apply itself. + db.ensure_writable_or_defer( + &ds, + &entry.table_key, + entry.table_branch.as_deref(), + entry.table_version, + exclude_operation_id, + ) + .await } pub(super) async fn batch_for_schema_apply_rewrite( diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 3ed9c43..8d49b0a 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -404,7 +404,7 @@ pub(super) async fn open_for_mutation( db: &Omnigraph, table_key: &str, op_kind: crate::db::MutationOpKind, -) -> Result<(Dataset, String, Option)> { +) -> Result<(Dataset, String, Option, u64)> { let current_branch = db .coordinator .read() @@ -425,7 +425,7 @@ pub(super) async fn open_for_mutation_on_branch( branch: Option<&str>, table_key: &str, op_kind: crate::db::MutationOpKind, -) -> Result<(Dataset, String, Option)> { +) -> Result<(Dataset, String, Option, u64)> { db.ensure_schema_apply_not_locked("write").await?; let resolved = db.resolved_branch_target(branch).await?; let entry = resolved @@ -433,6 +433,10 @@ pub(super) async fn open_for_mutation_on_branch( .entry(table_key) .ok_or_else(|| OmniError::manifest(format!("no manifest entry for {}", table_key)))?; let full_path = format!("{}/{}", db.root_uri, entry.table_path); + // The manifest pin (`entry.table_version`) is the OCC fence threaded back to + // the caller — staging records it as `expected_version`, never the Lance + // HEAD, so benign drift tolerated below doesn't relocate into a spurious + // post-queue 409. match resolved.branch.as_deref() { None => { let ds = db @@ -440,10 +444,10 @@ pub(super) async fn open_for_mutation_on_branch( .open_dataset_head_for_write(table_key, &full_path, None) .await?; if op_kind.strict_pre_stage_version_check() { - db.table_store - .ensure_expected_version(&ds, table_key, entry.table_version)?; + db.ensure_writable_or_defer(&ds, table_key, None, entry.table_version, None) + .await?; } - Ok((ds, full_path, None)) + Ok((ds, full_path, None, entry.table_version)) } Some(active_branch) => { let (ds, table_branch) = open_owned_dataset_for_branch_write( @@ -456,7 +460,7 @@ pub(super) async fn open_for_mutation_on_branch( op_kind, ) .await?; - Ok((ds, full_path, table_branch)) + Ok((ds, full_path, table_branch, entry.table_version)) } } } @@ -477,8 +481,8 @@ pub(super) async fn open_owned_dataset_for_branch_write( .open_dataset_head_for_write(table_key, full_path, Some(active_branch)) .await?; if op_kind.strict_pre_stage_version_check() { - db.table_store - .ensure_expected_version(&ds, table_key, entry_version)?; + db.ensure_writable_or_defer(&ds, table_key, Some(active_branch), entry_version, None) + .await?; } Ok((ds, Some(active_branch.to_string()))) } diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index eb6c4a3..1ce31ef 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -950,7 +950,7 @@ async fn publish_rewritten_merge_table( // source onto target). The inline `delete_where` later in this // function operates on rows the rewrite chose to remove, not // user-facing predicates, so Merge is the correct policy here. - let (ds, full_path, table_branch) = target_db + let (ds, full_path, table_branch, _manifest_pin) = target_db .open_for_mutation(table_key, crate::db::MutationOpKind::Merge) .await?; let mut current_ds = ds; diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 02b2a21..e73818e 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -620,10 +620,15 @@ async fn open_table_for_mutation( .await?; return Ok((ds, path.full_path.clone(), path.table_branch.clone())); } - let (ds, full_path, table_branch) = db + let (ds, full_path, table_branch, manifest_pin) = db .open_for_mutation_on_branch(branch, table_key, op_kind) .await?; - let expected_version = ds.version().version; + // The OCC fence staging records is the manifest pin, NOT the Lance HEAD + // (`ds.version()`). With benign drift now tolerated at the pre-stage check, + // capturing HEAD here would relocate the rejection into a spurious 409 at + // the post-queue strict check (`StagedMutation::commit_all`), which compares + // this value against the freshly re-read pin. + let expected_version = manifest_pin; staging.ensure_path( table_key, full_path.clone(), diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index d5d74c0..9c11bb0 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -418,7 +418,7 @@ async fn load_jsonl_reader( // accumulator. Overwrite → concurrent inline-commit (legacy path). if use_staging { for (type_name, table_key, batch, loaded_count) in prepared_nodes { - let (ds, full_path, table_branch) = db + let (ds, full_path, table_branch, _manifest_pin) = db .open_for_mutation_on_branch(branch, &table_key, load_op_kind) .await?; let expected_version = ds.version().version; @@ -528,7 +528,7 @@ async fn load_jsonl_reader( // Phase 2e: write every edge type. Same dispatch as Phase 2b. if use_staging { for (edge_name, table_key, batch, loaded_count) in prepared_edges { - let (ds, full_path, table_branch) = db + let (ds, full_path, table_branch, _manifest_pin) = db .open_for_mutation_on_branch(branch, &table_key, load_op_kind) .await?; let expected_version = ds.version().version; @@ -1208,7 +1208,7 @@ async fn write_batch_to_dataset( LoadMode::Merge => crate::db::MutationOpKind::Merge, LoadMode::Overwrite => crate::db::MutationOpKind::SchemaRewrite, }; - let (mut ds, full_path, table_branch) = db + let (mut ds, full_path, table_branch, _manifest_pin) = db .open_for_mutation_on_branch(branch, table_key, op_kind) .await?; let table_store = db.table_store();