diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 071b35a..d1ac9c3 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -629,6 +629,7 @@ async fn open_table_for_mutation( full_path.clone(), table_branch.clone(), expected_version, + op_kind, ); Ok((ds, full_path, table_branch)) } diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index eddaa6d..b13239e 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -26,7 +26,7 @@ use arrow_schema::SchemaRef; use lance::Dataset; use omnigraph_compiler::catalog::EdgeType; -use crate::db::SubTableUpdate; +use crate::db::{MutationOpKind, SubTableUpdate}; use crate::db::manifest::{ new_sidecar, write_sidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin, }; @@ -94,18 +94,30 @@ pub(crate) struct MutationStaging { /// Inline-committed updates from delete-touching ops (Dā‚‚ guarantees no /// pending batches exist on a delete-touched table). pub(crate) inline_committed: HashMap, + /// Strictest [`MutationOpKind`] seen per table within this query. Drives + /// the op-kind-aware drift check in [`StagedMutation::commit_all`]: for + /// tables whose first or any subsequent touch was a strict op + /// (Update / Delete / SchemaRewrite), commit_all fails fast with 409 + /// when the staged dataset version drifts from the fresh manifest pin + /// rather than letting Lance's `commit_staged` surface + /// `RetryableCommitConflict` as a 500. See + /// [`MutationOpKind::strict_pre_stage_version_check`]. + pub(crate) op_kinds: HashMap, } impl MutationStaging { /// Capture pre-write metadata on first touch of a table. Subsequent - /// touches are no-ops (paths and `expected_version` are stable for the - /// lifetime of one query). + /// touches preserve the original `paths` and `expected_versions` + /// entries; `op_kinds` upgrades to the strictest kind seen so far so + /// that mixed insert+update on the same table still fires the strict + /// drift check at commit time. pub(crate) fn ensure_path( &mut self, table_key: &str, full_path: String, table_branch: Option, expected_version: u64, + op_kind: MutationOpKind, ) { self.paths.entry(table_key.to_string()).or_insert(StagedTablePath { full_path, @@ -114,6 +126,19 @@ impl MutationStaging { self.expected_versions .entry(table_key.to_string()) .or_insert(expected_version); + self.op_kinds + .entry(table_key.to_string()) + .and_modify(|existing| { + // Upgrade to the stricter kind if a later op needs it. + // Insert + later Update → Update wins; Update + later Insert + // keeps Update. + if op_kind.strict_pre_stage_version_check() + && !existing.strict_pre_stage_version_check() + { + *existing = op_kind; + } + }) + .or_insert(op_kind); } /// Append a batch to the per-table accumulator. @@ -230,6 +255,7 @@ impl MutationStaging { paths, pending, inline_committed, + op_kinds, } = self; let mut staged_entries: Vec = Vec::with_capacity(pending.len()); @@ -330,6 +356,7 @@ impl MutationStaging { staged: staged_entries, expected_versions, paths, + op_kinds, }) } } @@ -359,6 +386,10 @@ pub(crate) struct StagedMutation { /// through so `commit_all` can build sidecar pins for both staged /// and inline-committed tables. paths: HashMap, + /// Strictest op_kind per touched table, propagated from + /// `MutationStaging::op_kinds` so `commit_all`'s drift check + /// fires only on read-modify-write tables. + op_kinds: HashMap, } /// Per-table state captured during `stage_all` and consumed by @@ -413,6 +444,7 @@ impl StagedMutation { mut staged, mut expected_versions, paths, + op_kinds, } = self; // Acquire per-(table_key, branch) queues for every touched @@ -485,6 +517,39 @@ impl StagedMutation { entry.table_key, )) })?; + + // Op-kind-aware drift check (MR-686 / Block 1.2 fix). For tables + // whose first or any subsequent touch was a strict op + // (Update / Delete / SchemaRewrite) — see + // [`MutationOpKind::strict_pre_stage_version_check`] — surface a + // clean 409 ExpectedVersionMismatch *before* `commit_staged` if + // the staged dataset's version has drifted from the fresh + // manifest pin under the queue. Without this guard, Lance's + // transaction conflict resolver fires `RetryableCommitConflict` + // on Update vs Update touching the same row and bubbles up as + // `OmniError::Lance()` mapped to HTTP 500. Pinned by + // `change_concurrent_updates_same_key_serialize_via_publisher_cas` + // in `crates/omnigraph-server/tests/server.rs`. + // + // Insert / Merge tables skip this check: concurrent inserts on + // disjoint keys legitimately coexist via Lance's auto-rebase, so + // the check would over-reject the existing Phase 2 same-key + // insert path (`change_concurrent_inserts_same_key_serialize_without_409`). + let strict = op_kinds + .get(&entry.table_key) + .map(|k| k.strict_pre_stage_version_check()) + .unwrap_or(false); + if strict { + let staged_version = entry.dataset.version().version; + if staged_version != current { + return Err(OmniError::manifest_expected_version_mismatch( + entry.table_key.clone(), + staged_version, + current, + )); + } + } + entry.expected_version = current; expected_versions.insert(entry.table_key.clone(), current); } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 40f0a12..a795f28 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -383,6 +383,7 @@ async fn load_jsonl_reader( full_path, table_branch, expected_version, + load_op_kind, ); let schema = batch.schema(); staging.append_batch(&table_key, schema, pending_mode, batch)?; @@ -504,6 +505,7 @@ async fn load_jsonl_reader( full_path, table_branch, expected_version, + load_op_kind, ); let schema = batch.schema(); staging.append_batch(&table_key, schema, pending_mode, batch)?;