staging: op-kind-aware drift check at commit_all entry

Closes the bug class "Lance internal conflict surfaces as 500 instead
of 409" for in-process concurrent strict-op writers on the same row.

Pre-fix: in `MutationStaging::commit_all`, after queue acquisition, the
staged Lance transaction (built against V0) was handed straight to
`commit_staged`. When Lance HEAD has advanced past V0 (because the
queue's prior winner already published), Lance's transaction conflict
resolver fires `RetryableCommitConflict` for Update vs Update on the
same row, which wraps as `OmniError::Lance(<string>)` and the API maps
it to HTTP 500. Users see "internal server error" instead of a clean
retryable conflict.

Fix: track the strictest `MutationOpKind` per touched table on
`MutationStaging` and propagate through `StagedMutation`. In
`commit_all`'s recapture loop, before each `commit_staged`, fail-fast
with `OmniError::manifest_expected_version_mismatch` (→ HTTP 409
ExpectedVersionMismatch) for tables whose tracked op_kind has
`strict_pre_stage_version_check() == true` (Update/Delete/SchemaRewrite)
when the staged dataset's version doesn't match the fresh manifest pin
under the queue. Insert/Merge tables skip the check — concurrent
inserts on disjoint keys legitimately coexist via Lance's auto-rebase,
so the check would over-reject the existing same-key insert path.

Threading: `ensure_path` now takes `op_kind` and stores it in a new
`op_kinds: HashMap<String, MutationOpKind>` on `MutationStaging`, with
strictness-upgrade semantics so mixed insert+update on the same table
still fires the strict check at commit time. `StagedMutation` carries
`op_kinds` through to `commit_all`.

Pinned by `change_concurrent_updates_same_key_serialize_via_publisher_cas`
in `crates/omnigraph-server/tests/server.rs` (added in the previous
commit). All Phase 2 sentinels still pass:
change_concurrent_inserts_same_key_serialize_without_409,
change_conflict_returns_manifest_conflict_409,
branch_merge_conflict_response_includes_structured_conflicts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-08 16:42:14 +02:00
parent ebf5a5769d
commit 4ca527cc53
No known key found for this signature in database
3 changed files with 71 additions and 3 deletions

View file

@ -629,6 +629,7 @@ async fn open_table_for_mutation(
full_path.clone(),
table_branch.clone(),
expected_version,
op_kind,
);
Ok((ds, full_path, table_branch))
}

View file

@ -26,7 +26,7 @@ use arrow_schema::SchemaRef;
use lance::Dataset;
use omnigraph_compiler::catalog::EdgeType;
use crate::db::SubTableUpdate;
use crate::db::{MutationOpKind, SubTableUpdate};
use crate::db::manifest::{
new_sidecar, write_sidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
};
@ -94,18 +94,30 @@ pub(crate) struct MutationStaging {
/// Inline-committed updates from delete-touching ops (D₂ guarantees no
/// pending batches exist on a delete-touched table).
pub(crate) inline_committed: HashMap<String, SubTableUpdate>,
/// Strictest [`MutationOpKind`] seen per table within this query. Drives
/// the op-kind-aware drift check in [`StagedMutation::commit_all`]: for
/// tables whose first or any subsequent touch was a strict op
/// (Update / Delete / SchemaRewrite), commit_all fails fast with 409
/// when the staged dataset version drifts from the fresh manifest pin
/// rather than letting Lance's `commit_staged` surface
/// `RetryableCommitConflict` as a 500. See
/// [`MutationOpKind::strict_pre_stage_version_check`].
pub(crate) op_kinds: HashMap<String, MutationOpKind>,
}
impl MutationStaging {
/// Capture pre-write metadata on first touch of a table. Subsequent
/// touches are no-ops (paths and `expected_version` are stable for the
/// lifetime of one query).
/// touches preserve the original `paths` and `expected_versions`
/// entries; `op_kinds` upgrades to the strictest kind seen so far so
/// that mixed insert+update on the same table still fires the strict
/// drift check at commit time.
pub(crate) fn ensure_path(
&mut self,
table_key: &str,
full_path: String,
table_branch: Option<String>,
expected_version: u64,
op_kind: MutationOpKind,
) {
self.paths.entry(table_key.to_string()).or_insert(StagedTablePath {
full_path,
@ -114,6 +126,19 @@ impl MutationStaging {
self.expected_versions
.entry(table_key.to_string())
.or_insert(expected_version);
self.op_kinds
.entry(table_key.to_string())
.and_modify(|existing| {
// Upgrade to the stricter kind if a later op needs it.
// Insert + later Update → Update wins; Update + later Insert
// keeps Update.
if op_kind.strict_pre_stage_version_check()
&& !existing.strict_pre_stage_version_check()
{
*existing = op_kind;
}
})
.or_insert(op_kind);
}
/// Append a batch to the per-table accumulator.
@ -230,6 +255,7 @@ impl MutationStaging {
paths,
pending,
inline_committed,
op_kinds,
} = self;
let mut staged_entries: Vec<StagedTableEntry> = Vec::with_capacity(pending.len());
@ -330,6 +356,7 @@ impl MutationStaging {
staged: staged_entries,
expected_versions,
paths,
op_kinds,
})
}
}
@ -359,6 +386,10 @@ pub(crate) struct StagedMutation {
/// through so `commit_all` can build sidecar pins for both staged
/// and inline-committed tables.
paths: HashMap<String, StagedTablePath>,
/// Strictest op_kind per touched table, propagated from
/// `MutationStaging::op_kinds` so `commit_all`'s drift check
/// fires only on read-modify-write tables.
op_kinds: HashMap<String, MutationOpKind>,
}
/// Per-table state captured during `stage_all` and consumed by
@ -413,6 +444,7 @@ impl StagedMutation {
mut staged,
mut expected_versions,
paths,
op_kinds,
} = self;
// Acquire per-(table_key, branch) queues for every touched
@ -485,6 +517,39 @@ impl StagedMutation {
entry.table_key,
))
})?;
// Op-kind-aware drift check (MR-686 / Block 1.2 fix). For tables
// whose first or any subsequent touch was a strict op
// (Update / Delete / SchemaRewrite) — see
// [`MutationOpKind::strict_pre_stage_version_check`] — surface a
// clean 409 ExpectedVersionMismatch *before* `commit_staged` if
// the staged dataset's version has drifted from the fresh
// manifest pin under the queue. Without this guard, Lance's
// transaction conflict resolver fires `RetryableCommitConflict`
// on Update vs Update touching the same row and bubbles up as
// `OmniError::Lance(<string>)` mapped to HTTP 500. Pinned by
// `change_concurrent_updates_same_key_serialize_via_publisher_cas`
// in `crates/omnigraph-server/tests/server.rs`.
//
// Insert / Merge tables skip this check: concurrent inserts on
// disjoint keys legitimately coexist via Lance's auto-rebase, so
// the check would over-reject the existing Phase 2 same-key
// insert path (`change_concurrent_inserts_same_key_serialize_without_409`).
let strict = op_kinds
.get(&entry.table_key)
.map(|k| k.strict_pre_stage_version_check())
.unwrap_or(false);
if strict {
let staged_version = entry.dataset.version().version;
if staged_version != current {
return Err(OmniError::manifest_expected_version_mismatch(
entry.table_key.clone(),
staged_version,
current,
));
}
}
entry.expected_version = current;
expected_versions.insert(entry.table_key.clone(), current);
}

View file

@ -383,6 +383,7 @@ async fn load_jsonl_reader<R: BufRead>(
full_path,
table_branch,
expected_version,
load_op_kind,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;
@ -504,6 +505,7 @@ async fn load_jsonl_reader<R: BufRead>(
full_path,
table_branch,
expected_version,
load_op_kind,
);
let schema = batch.schema();
staging.append_batch(&table_key, schema, pending_mode, batch)?;