staging: re-capture expected_versions under queue (PR 2 Step D fix)

The Step D commit (1b0a2c9) skipped revalidation for single-table
mutations, betting that the publisher's CAS would be a no-op under the
per-(table, branch) queue. The bench falsified this: expected_versions
was captured during stage_all (BEFORE acquire_many), so by the time
the queue acquired and the publisher ran, those captured pins were
stale w.r.t. any in-process concurrent writer that had published in
between. Same-key 8x1 produced ~99% manifest_conflict 409 rejections
because every actor after the first carried stale expected_versions.

Fix: always re-read the in-memory snapshot under the queue and
overwrite expected_versions with the current per-table values.
Single-coordinator invariant (one Arc<Omnigraph> per process) makes
this safe with zero I/O — publishes update the shared coordinator
BEFORE releasing queue guards, so a contending tenant's read sees a
fresh view by the time it acquires its keys. The publisher's CAS
becomes a correct no-op for queued tables; cross-process drift
(coord stale because coord doesn't see external publishes) still
rejects via the publisher CAS as ExpectedVersionMismatch -> 409,
preserving the change_conflict_returns_manifest_conflict_409
regression sentinel.

Trade-off documented in the comment: SERIALIZABLE-opt-in writes
(§VI.36 aspirational) will need an additional revalidation step
here; the bench's append/upsert pattern is fine because Lance's
natural rebase handles concurrent writes onto the same dataset.

Bench results captured at .context/bench-results/after-pr2/ +
.context/bench-results/comparison.md:
- single-actor 1x1: 15.0 ops/s vs baseline 12.3 (+22%)
- disjoint 8x8:    7.03 ops/s vs baseline 6.24 (+13%)
- same-key 8x1:    still rejected (76% errors) by the
  ensure_expected_version strict check upstream of commit_all;
  follow-up to address.

Disjoint's 13% is below the master plan's ≥8× target. Bench shows
the coordinator Mutex is now the dominant serializer; relaxing
to RwLock for snapshot/version reads is the next perf step,
tracked as a follow-up in comparison.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-07 19:28:36 +02:00
parent 7aca6ddac5
commit bdd6440c83
No known key found for this signature in database

View file

@ -398,8 +398,8 @@ impl StagedMutation {
)> {
let StagedMutation {
inline_committed,
staged,
expected_versions,
mut staged,
mut expected_versions,
paths,
} = self;
@ -435,87 +435,75 @@ impl StagedMutation {
}
let guards = db.write_queue().acquire_many(&queue_keys).await;
// Revalidate manifest pins (PR 2 perf optimization).
// Re-capture manifest pins under the queue (PR 2 / MR-686).
//
// Single-table mutations skip revalidation entirely: once the
// per-(table, branch) queue is held, no concurrent writer can
// move our table's pin (queue exclusivity); if revalidation
// were to fail, the publisher's `expected_table_versions` CAS
// catches the same drift. The cost on conflict is one orphan
// Lance HEAD advance, recovered via the sidecar protocol on
// the next ReadWrite open.
// expected_versions was captured during stage_all (Phase A,
// BEFORE acquire_many). If a cross-tenant writer published our
// table between Phase A and queue acquisition, those captured
// pins are stale. We re-read the in-memory snapshot under the
// queue and refresh expected_versions; the publisher's CAS
// becomes a correct no-op for queued tables.
//
// Multi-table mutations use the in-memory `db.snapshot()`
// (zero I/O) instead of `db.snapshot_for_branch(...)` (fresh
// manifest read). This is correct under MR-686's single-process
// scope: all in-process tenants share one `Arc<Omnigraph>` and
// therefore one coordinator; publishes update the shared
// coordinator BEFORE releasing queue guards (see
// `commit_all` -> caller's publisher -> caller drops guards),
// so any tenant 2 acquiring queue keys after tenant 1 reads a
// fresh in-memory view. The within-mutation race (mutation A
// captures expected_versions[T2]=V0, tenant B publishes T2 to
// V1 during A's stage I/O, A then acquires T2's queue) is
// caught here via the in-memory check (B's publish updated the
// shared coordinator, so snapshot.entry(T2) returns V1 != V0).
// Why in-memory is safe: under MR-686's single-process scope
// all tenants share one `Arc<Omnigraph>` -> one coordinator;
// publishes update the shared coordinator BEFORE releasing
// queue guards (see `commit_all` -> caller's publisher ->
// caller drops guards). So any tenant T2 acquiring queue
// keys *after* tenant T1 sees a fresh in-memory view of T1's
// commits. Multi-coordinator deployments (§VI.27 aspirational)
// would require a fresh manifest read here; that trade-off is
// documented in §VI's "Explicit non-commitments" subsection.
//
// Multi-coordinator deployments (§VI.27 aspirational) would
// require force-refresh under the queue here. That trade-off
// is documented in §VI's "Explicit non-commitments" subsection.
// For mutations whose semantics depend on read-then-write
// ordering against committed state (the §VI.36 SERIALIZABLE
// opt-in is the future seam), the bench's simple
// append/upsert pattern doesn't tickle that: Lance rebases
// a stage_append/stage_merge_insert onto the new committed
// version at commit_staged time and the new rows land alongside
// whatever the pre-queue writer added. That is correct SI
// semantics. Predicate-locked SERIALIZABLE writes will need
// an additional revalidation step here.
//
// Cost: one in-memory snapshot read (no I/O) + a single update
// per touched table to `expected_versions`. Replaces PR 1b's
// fresh `snapshot_for_branch(branch)` per mutation, closing
// the -17%/-30% PR 1b regression.
//
// SAFETY: relies on (1) the per-(table, branch) WriteQueueManager
// using exclusive `tokio::sync::Mutex<()>` (not `RwLock`), and
// (2) the single-coordinator invariant (one Omnigraph engine
// per process). Migrating either premise breaks this skip; see
// master plan risk #2.
let total_tables = staged.len() + inline_committed.len();
if total_tables > 1 {
let snapshot = db.snapshot().await;
for entry in &staged {
let current = snapshot.entry(&entry.table_key).map(|e| e.table_version);
match current {
Some(v) if v == entry.expected_version => {}
Some(other) => {
return Err(OmniError::manifest_conflict(format!(
"table '{}' pin moved from {} to {} between stage and commit",
entry.table_key, entry.expected_version, other,
)));
}
None => {
return Err(OmniError::manifest_conflict(format!(
"table '{}' missing from manifest at commit time",
entry.table_key,
)));
}
}
}
for table_key in inline_committed.keys() {
let expected = expected_versions.get(table_key).copied().ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing expected version for inline-committed table '{}'",
table_key
// per process). Migrating either premise reintroduces the
// pre-queue drift class.
let snapshot = db.snapshot().await;
for entry in staged.iter_mut() {
let current = snapshot
.entry(&entry.table_key)
.map(|e| e.table_version)
.ok_or_else(|| {
OmniError::manifest_conflict(format!(
"table '{}' missing from manifest at commit time",
entry.table_key,
))
})?;
let current = snapshot.entry(table_key).map(|e| e.table_version);
match current {
Some(v) if v == expected => {}
Some(other) => {
return Err(OmniError::manifest_conflict(format!(
"table '{}' pin moved from {} to {} between inline-commit and publish",
table_key, expected, other,
)));
}
None => {
return Err(OmniError::manifest_conflict(format!(
"table '{}' missing from manifest at commit time",
table_key,
)));
}
}
entry.expected_version = current;
expected_versions.insert(entry.table_key.clone(), current);
}
for (table_key, _update) in inline_committed.iter() {
// Inline-committed tables (delete-only path) had Lance HEAD
// advanced inside `delete_where` already. The post-commit
// pin is what landed in the manifest after the inline
// commit; refresh `expected_versions` to whatever the
// shared coordinator currently shows for this table so the
// publisher's CAS is internally consistent.
if let Some(current) = snapshot.entry(table_key).map(|e| e.table_version) {
expected_versions.insert(table_key.clone(), current);
} else {
return Err(OmniError::manifest_conflict(format!(
"table '{}' missing from manifest at commit time",
table_key,
)));
}
}
// Avoid an unused-variable warning when `branch` is not consumed
// above (single-table fast path skips revalidation).
let _ = branch;
// Sidecar protocol: build the per-table pin list and write the