mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
engine: strict drift check uses read-time pin
This commit is contained in:
parent
f9a0f31f80
commit
a6d244e648
1 changed files with 44 additions and 33 deletions
|
|
@ -481,12 +481,24 @@ impl StagedMutation {
|
|||
|
||||
// Re-capture manifest pins under the queue (PR 2 / MR-686).
|
||||
//
|
||||
// expected_versions was captured during stage_all (Phase A,
|
||||
// BEFORE acquire_many). If a cross-tenant writer published our
|
||||
// table between Phase A and queue acquisition, those captured
|
||||
// pins are stale. We re-read the per-branch snapshot under the
|
||||
// queue and refresh expected_versions; the publisher's CAS
|
||||
// becomes a correct no-op for queued tables.
|
||||
// expected_versions was captured when the mutation first opened
|
||||
// each table for mutation (the query's read-time pin). For
|
||||
// non-strict inserts / merge-style appends, a writer may advance
|
||||
// the table before we acquire the queue and Lance can still
|
||||
// safely rebase the write, so we refresh expected_versions to
|
||||
// the queued manifest pin.
|
||||
//
|
||||
// Strict read-modify-write ops (update / delete /
|
||||
// schema-rewrite) are different: the staged batch was computed
|
||||
// against the read-time pin, even if stage_all later re-opened
|
||||
// the dataset at HEAD. For those ops, compare read-time
|
||||
// expected_version to the queued manifest pin and fail before
|
||||
// any Lance HEAD movement if the target drifted. This can
|
||||
// over-reject a single mutation that inserts, then upgrades to
|
||||
// update, while another writer advances the table between the
|
||||
// two touches; that is safe-by-default and keeps one invariant
|
||||
// until `ensure_path` learns how to bump expected_version on
|
||||
// op-kind upgrade.
|
||||
//
|
||||
// Why per-branch (and not the bound-branch `db.snapshot()`):
|
||||
// when the caller mutates a branch other than the engine's
|
||||
|
|
@ -518,19 +530,6 @@ impl StagedMutation {
|
|||
))
|
||||
})?;
|
||||
|
||||
// Op-kind-aware drift check (MR-686 / Block 1.2 fix). For tables
|
||||
// whose first or any subsequent touch was a strict op
|
||||
// (Update / Delete / SchemaRewrite) — see
|
||||
// [`MutationOpKind::strict_pre_stage_version_check`] — surface a
|
||||
// clean 409 ExpectedVersionMismatch *before* `commit_staged` if
|
||||
// the staged dataset's version has drifted from the fresh
|
||||
// manifest pin under the queue. Without this guard, Lance's
|
||||
// transaction conflict resolver fires `RetryableCommitConflict`
|
||||
// on Update vs Update touching the same row and bubbles up as
|
||||
// `OmniError::Lance(<string>)` mapped to HTTP 500. Pinned by
|
||||
// `change_concurrent_updates_same_key_serialize_via_publisher_cas`
|
||||
// in `crates/omnigraph-server/tests/server.rs`.
|
||||
//
|
||||
// Insert / Merge tables skip this check: concurrent inserts on
|
||||
// disjoint keys legitimately coexist via Lance's auto-rebase, so
|
||||
// the check would over-reject the existing Phase 2 same-key
|
||||
|
|
@ -539,29 +538,41 @@ impl StagedMutation {
|
|||
.get(&entry.table_key)
|
||||
.map(|k| k.strict_pre_stage_version_check())
|
||||
.unwrap_or(false);
|
||||
if strict {
|
||||
let staged_version = entry.dataset.version().version;
|
||||
if staged_version != current {
|
||||
return Err(OmniError::manifest_expected_version_mismatch(
|
||||
entry.table_key.clone(),
|
||||
staged_version,
|
||||
current,
|
||||
));
|
||||
}
|
||||
if strict && entry.expected_version != current {
|
||||
return Err(OmniError::manifest_expected_version_mismatch(
|
||||
entry.table_key.clone(),
|
||||
entry.expected_version,
|
||||
current,
|
||||
));
|
||||
}
|
||||
|
||||
entry.expected_version = current;
|
||||
expected_versions.insert(entry.table_key.clone(), current);
|
||||
}
|
||||
for (table_key, _update) in inline_committed.iter() {
|
||||
if let Some(current) = snapshot.entry(table_key).map(|e| e.table_version) {
|
||||
expected_versions.insert(table_key.clone(), current);
|
||||
} else {
|
||||
return Err(OmniError::manifest_conflict(format!(
|
||||
let current = snapshot
|
||||
.entry(table_key)
|
||||
.map(|e| e.table_version)
|
||||
.ok_or_else(|| {
|
||||
OmniError::manifest_conflict(format!(
|
||||
"table '{}' missing from manifest at commit time",
|
||||
table_key,
|
||||
)));
|
||||
))
|
||||
})?;
|
||||
let expected = expected_versions.get(table_key).copied().ok_or_else(|| {
|
||||
OmniError::manifest_internal(format!(
|
||||
"StagedMutation::commit_all: missing expected version for inline-committed table '{}'",
|
||||
table_key
|
||||
))
|
||||
})?;
|
||||
if expected != current {
|
||||
return Err(OmniError::manifest_expected_version_mismatch(
|
||||
table_key.clone(),
|
||||
expected,
|
||||
current,
|
||||
));
|
||||
}
|
||||
expected_versions.insert(table_key.clone(), current);
|
||||
}
|
||||
|
||||
// Sidecar protocol: build the per-table pin list and write the
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue