From 4ca527cc539efcb70849c1163145ff7eee98925a Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 16:42:14 +0200 Subject: [PATCH] staging: op-kind-aware drift check at commit_all entry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the bug class "Lance internal conflict surfaces as 500 instead of 409" for in-process concurrent strict-op writers on the same row. Pre-fix: in `MutationStaging::commit_all`, after queue acquisition, the staged Lance transaction (built against V0) was handed straight to `commit_staged`. When Lance HEAD has advanced past V0 (because the queue's prior winner already published), Lance's transaction conflict resolver fires `RetryableCommitConflict` for Update vs Update on the same row, which wraps as `OmniError::Lance()` and the API maps it to HTTP 500. Users see "internal server error" instead of a clean retryable conflict. Fix: track the strictest `MutationOpKind` per touched table on `MutationStaging` and propagate through `StagedMutation`. In `commit_all`'s recapture loop, before each `commit_staged`, fail-fast with `OmniError::manifest_expected_version_mismatch` (→ HTTP 409 ExpectedVersionMismatch) for tables whose tracked op_kind has `strict_pre_stage_version_check() == true` (Update/Delete/SchemaRewrite) when the staged dataset's version doesn't match the fresh manifest pin under the queue. Insert/Merge tables skip the check — concurrent inserts on disjoint keys legitimately coexist via Lance's auto-rebase, so the check would over-reject the existing same-key insert path. Threading: `ensure_path` now takes `op_kind` and stores it in a new `op_kinds: HashMap` on `MutationStaging`, with strictness-upgrade semantics so mixed insert+update on the same table still fires the strict check at commit time. `StagedMutation` carries `op_kinds` through to `commit_all`. Pinned by `change_concurrent_updates_same_key_serialize_via_publisher_cas` in `crates/omnigraph-server/tests/server.rs` (added in the previous commit). All Phase 2 sentinels still pass: change_concurrent_inserts_same_key_serialize_without_409, change_conflict_returns_manifest_conflict_409, branch_merge_conflict_response_includes_structured_conflicts. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/exec/mutation.rs | 1 + crates/omnigraph/src/exec/staging.rs | 71 +++++++++++++++++++++++++-- crates/omnigraph/src/loader/mod.rs | 2 + 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index 071b35a..d1ac9c3 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -629,6 +629,7 @@ async fn open_table_for_mutation( full_path.clone(), table_branch.clone(), expected_version, + op_kind, ); Ok((ds, full_path, table_branch)) } diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index eddaa6d..b13239e 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -26,7 +26,7 @@ use arrow_schema::SchemaRef; use lance::Dataset; use omnigraph_compiler::catalog::EdgeType; -use crate::db::SubTableUpdate; +use crate::db::{MutationOpKind, SubTableUpdate}; use crate::db::manifest::{ new_sidecar, write_sidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin, }; @@ -94,18 +94,30 @@ pub(crate) struct MutationStaging { /// Inline-committed updates from delete-touching ops (D₂ guarantees no /// pending batches exist on a delete-touched table). pub(crate) inline_committed: HashMap, + /// Strictest [`MutationOpKind`] seen per table within this query. Drives + /// the op-kind-aware drift check in [`StagedMutation::commit_all`]: for + /// tables whose first or any subsequent touch was a strict op + /// (Update / Delete / SchemaRewrite), commit_all fails fast with 409 + /// when the staged dataset version drifts from the fresh manifest pin + /// rather than letting Lance's `commit_staged` surface + /// `RetryableCommitConflict` as a 500. See + /// [`MutationOpKind::strict_pre_stage_version_check`]. + pub(crate) op_kinds: HashMap, } impl MutationStaging { /// Capture pre-write metadata on first touch of a table. Subsequent - /// touches are no-ops (paths and `expected_version` are stable for the - /// lifetime of one query). + /// touches preserve the original `paths` and `expected_versions` + /// entries; `op_kinds` upgrades to the strictest kind seen so far so + /// that mixed insert+update on the same table still fires the strict + /// drift check at commit time. pub(crate) fn ensure_path( &mut self, table_key: &str, full_path: String, table_branch: Option, expected_version: u64, + op_kind: MutationOpKind, ) { self.paths.entry(table_key.to_string()).or_insert(StagedTablePath { full_path, @@ -114,6 +126,19 @@ impl MutationStaging { self.expected_versions .entry(table_key.to_string()) .or_insert(expected_version); + self.op_kinds + .entry(table_key.to_string()) + .and_modify(|existing| { + // Upgrade to the stricter kind if a later op needs it. + // Insert + later Update → Update wins; Update + later Insert + // keeps Update. + if op_kind.strict_pre_stage_version_check() + && !existing.strict_pre_stage_version_check() + { + *existing = op_kind; + } + }) + .or_insert(op_kind); } /// Append a batch to the per-table accumulator. @@ -230,6 +255,7 @@ impl MutationStaging { paths, pending, inline_committed, + op_kinds, } = self; let mut staged_entries: Vec = Vec::with_capacity(pending.len()); @@ -330,6 +356,7 @@ impl MutationStaging { staged: staged_entries, expected_versions, paths, + op_kinds, }) } } @@ -359,6 +386,10 @@ pub(crate) struct StagedMutation { /// through so `commit_all` can build sidecar pins for both staged /// and inline-committed tables. paths: HashMap, + /// Strictest op_kind per touched table, propagated from + /// `MutationStaging::op_kinds` so `commit_all`'s drift check + /// fires only on read-modify-write tables. + op_kinds: HashMap, } /// Per-table state captured during `stage_all` and consumed by @@ -413,6 +444,7 @@ impl StagedMutation { mut staged, mut expected_versions, paths, + op_kinds, } = self; // Acquire per-(table_key, branch) queues for every touched @@ -485,6 +517,39 @@ impl StagedMutation { entry.table_key, )) })?; + + // Op-kind-aware drift check (MR-686 / Block 1.2 fix). For tables + // whose first or any subsequent touch was a strict op + // (Update / Delete / SchemaRewrite) — see + // [`MutationOpKind::strict_pre_stage_version_check`] — surface a + // clean 409 ExpectedVersionMismatch *before* `commit_staged` if + // the staged dataset's version has drifted from the fresh + // manifest pin under the queue. Without this guard, Lance's + // transaction conflict resolver fires `RetryableCommitConflict` + // on Update vs Update touching the same row and bubbles up as + // `OmniError::Lance()` mapped to HTTP 500. Pinned by + // `change_concurrent_updates_same_key_serialize_via_publisher_cas` + // in `crates/omnigraph-server/tests/server.rs`. + // + // Insert / Merge tables skip this check: concurrent inserts on + // disjoint keys legitimately coexist via Lance's auto-rebase, so + // the check would over-reject the existing Phase 2 same-key + // insert path (`change_concurrent_inserts_same_key_serialize_without_409`). + let strict = op_kinds + .get(&entry.table_key) + .map(|k| k.strict_pre_stage_version_check()) + .unwrap_or(false); + if strict { + let staged_version = entry.dataset.version().version; + if staged_version != current { + return Err(OmniError::manifest_expected_version_mismatch( + entry.table_key.clone(), + staged_version, + current, + )); + } + } + entry.expected_version = current; expected_versions.insert(entry.table_key.clone(), current); } diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 40f0a12..a795f28 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -383,6 +383,7 @@ async fn load_jsonl_reader( full_path, table_branch, expected_version, + load_op_kind, ); let schema = batch.schema(); staging.append_batch(&table_key, schema, pending_mode, batch)?; @@ -504,6 +505,7 @@ async fn load_jsonl_reader( full_path, table_branch, expected_version, + load_op_kind, ); let schema = batch.schema(); staging.append_batch(&table_key, schema, pending_mode, batch)?;