From a6d244e648e743fec5850bda97de632755e8ab1f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 9 May 2026 20:06:25 +0000 Subject: [PATCH] engine: strict drift check uses read-time pin --- crates/omnigraph/src/exec/staging.rs | 77 ++++++++++++++++------------ 1 file changed, 44 insertions(+), 33 deletions(-) diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index b13239e..8054e9c 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -481,12 +481,24 @@ impl StagedMutation { // Re-capture manifest pins under the queue (PR 2 / MR-686). // - // expected_versions was captured during stage_all (Phase A, - // BEFORE acquire_many). If a cross-tenant writer published our - // table between Phase A and queue acquisition, those captured - // pins are stale. We re-read the per-branch snapshot under the - // queue and refresh expected_versions; the publisher's CAS - // becomes a correct no-op for queued tables. + // expected_versions was captured when the mutation first opened + // each table for mutation (the query's read-time pin). For + // non-strict inserts / merge-style appends, a writer may advance + // the table before we acquire the queue and Lance can still + // safely rebase the write, so we refresh expected_versions to + // the queued manifest pin. + // + // Strict read-modify-write ops (update / delete / + // schema-rewrite) are different: the staged batch was computed + // against the read-time pin, even if stage_all later re-opened + // the dataset at HEAD. For those ops, compare read-time + // expected_version to the queued manifest pin and fail before + // any Lance HEAD movement if the target drifted. This can + // over-reject a single mutation that inserts, then upgrades to + // update, while another writer advances the table between the + // two touches; that is safe-by-default and keeps one invariant + // until `ensure_path` learns how to bump expected_version on + // op-kind upgrade. // // Why per-branch (and not the bound-branch `db.snapshot()`): // when the caller mutates a branch other than the engine's @@ -518,19 +530,6 @@ impl StagedMutation { )) })?; - // Op-kind-aware drift check (MR-686 / Block 1.2 fix). For tables - // whose first or any subsequent touch was a strict op - // (Update / Delete / SchemaRewrite) — see - // [`MutationOpKind::strict_pre_stage_version_check`] — surface a - // clean 409 ExpectedVersionMismatch *before* `commit_staged` if - // the staged dataset's version has drifted from the fresh - // manifest pin under the queue. Without this guard, Lance's - // transaction conflict resolver fires `RetryableCommitConflict` - // on Update vs Update touching the same row and bubbles up as - // `OmniError::Lance()` mapped to HTTP 500. Pinned by - // `change_concurrent_updates_same_key_serialize_via_publisher_cas` - // in `crates/omnigraph-server/tests/server.rs`. - // // Insert / Merge tables skip this check: concurrent inserts on // disjoint keys legitimately coexist via Lance's auto-rebase, so // the check would over-reject the existing Phase 2 same-key @@ -539,29 +538,41 @@ impl StagedMutation { .get(&entry.table_key) .map(|k| k.strict_pre_stage_version_check()) .unwrap_or(false); - if strict { - let staged_version = entry.dataset.version().version; - if staged_version != current { - return Err(OmniError::manifest_expected_version_mismatch( - entry.table_key.clone(), - staged_version, - current, - )); - } + if strict && entry.expected_version != current { + return Err(OmniError::manifest_expected_version_mismatch( + entry.table_key.clone(), + entry.expected_version, + current, + )); } entry.expected_version = current; expected_versions.insert(entry.table_key.clone(), current); } for (table_key, _update) in inline_committed.iter() { - if let Some(current) = snapshot.entry(table_key).map(|e| e.table_version) { - expected_versions.insert(table_key.clone(), current); - } else { - return Err(OmniError::manifest_conflict(format!( + let current = snapshot + .entry(table_key) + .map(|e| e.table_version) + .ok_or_else(|| { + OmniError::manifest_conflict(format!( "table '{}' missing from manifest at commit time", table_key, - ))); + )) + })?; + let expected = expected_versions.get(table_key).copied().ok_or_else(|| { + OmniError::manifest_internal(format!( + "StagedMutation::commit_all: missing expected version for inline-committed table '{}'", + table_key + )) + })?; + if expected != current { + return Err(OmniError::manifest_expected_version_mismatch( + table_key.clone(), + expected, + current, + )); } + expected_versions.insert(table_key.clone(), current); } // Sidecar protocol: build the per-table pin list and write the