From bdd6440c831c5186a154fcae23a4a8fc9f7ebc97 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Thu, 7 May 2026 19:28:36 +0200 Subject: [PATCH] staging: re-capture expected_versions under queue (PR 2 Step D fix) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Step D commit (1b0a2c9) skipped revalidation for single-table mutations, betting that the publisher's CAS would be a no-op under the per-(table, branch) queue. The bench falsified this: expected_versions was captured during stage_all (BEFORE acquire_many), so by the time the queue acquired and the publisher ran, those captured pins were stale w.r.t. any in-process concurrent writer that had published in between. Same-key 8x1 produced ~99% manifest_conflict 409 rejections because every actor after the first carried stale expected_versions. Fix: always re-read the in-memory snapshot under the queue and overwrite expected_versions with the current per-table values. Single-coordinator invariant (one Arc per process) makes this safe with zero I/O — publishes update the shared coordinator BEFORE releasing queue guards, so a contending tenant's read sees a fresh view by the time it acquires its keys. The publisher's CAS becomes a correct no-op for queued tables; cross-process drift (coord stale because coord doesn't see external publishes) still rejects via the publisher CAS as ExpectedVersionMismatch -> 409, preserving the change_conflict_returns_manifest_conflict_409 regression sentinel. Trade-off documented in the comment: SERIALIZABLE-opt-in writes (§VI.36 aspirational) will need an additional revalidation step here; the bench's append/upsert pattern is fine because Lance's natural rebase handles concurrent writes onto the same dataset. Bench results captured at .context/bench-results/after-pr2/ + .context/bench-results/comparison.md: - single-actor 1x1: 15.0 ops/s vs baseline 12.3 (+22%) - disjoint 8x8: 7.03 ops/s vs baseline 6.24 (+13%) - same-key 8x1: still rejected (76% errors) by the ensure_expected_version strict check upstream of commit_all; follow-up to address. Disjoint's 13% is below the master plan's ≥8× target. Bench shows the coordinator Mutex is now the dominant serializer; relaxing to RwLock for snapshot/version reads is the next perf step, tracked as a follow-up in comparison.md. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/exec/staging.rs | 132 ++++++++++++--------------- 1 file changed, 60 insertions(+), 72 deletions(-) diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 94b2d59..e49a0a4 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -398,8 +398,8 @@ impl StagedMutation { )> { let StagedMutation { inline_committed, - staged, - expected_versions, + mut staged, + mut expected_versions, paths, } = self; @@ -435,87 +435,75 @@ impl StagedMutation { } let guards = db.write_queue().acquire_many(&queue_keys).await; - // Revalidate manifest pins (PR 2 perf optimization). + // Re-capture manifest pins under the queue (PR 2 / MR-686). // - // Single-table mutations skip revalidation entirely: once the - // per-(table, branch) queue is held, no concurrent writer can - // move our table's pin (queue exclusivity); if revalidation - // were to fail, the publisher's `expected_table_versions` CAS - // catches the same drift. The cost on conflict is one orphan - // Lance HEAD advance, recovered via the sidecar protocol on - // the next ReadWrite open. + // expected_versions was captured during stage_all (Phase A, + // BEFORE acquire_many). If a cross-tenant writer published our + // table between Phase A and queue acquisition, those captured + // pins are stale. We re-read the in-memory snapshot under the + // queue and refresh expected_versions; the publisher's CAS + // becomes a correct no-op for queued tables. // - // Multi-table mutations use the in-memory `db.snapshot()` - // (zero I/O) instead of `db.snapshot_for_branch(...)` (fresh - // manifest read). This is correct under MR-686's single-process - // scope: all in-process tenants share one `Arc` and - // therefore one coordinator; publishes update the shared - // coordinator BEFORE releasing queue guards (see - // `commit_all` -> caller's publisher -> caller drops guards), - // so any tenant 2 acquiring queue keys after tenant 1 reads a - // fresh in-memory view. The within-mutation race (mutation A - // captures expected_versions[T2]=V0, tenant B publishes T2 to - // V1 during A's stage I/O, A then acquires T2's queue) is - // caught here via the in-memory check (B's publish updated the - // shared coordinator, so snapshot.entry(T2) returns V1 != V0). + // Why in-memory is safe: under MR-686's single-process scope + // all tenants share one `Arc` -> one coordinator; + // publishes update the shared coordinator BEFORE releasing + // queue guards (see `commit_all` -> caller's publisher -> + // caller drops guards). So any tenant T2 acquiring queue + // keys *after* tenant T1 sees a fresh in-memory view of T1's + // commits. Multi-coordinator deployments (§VI.27 aspirational) + // would require a fresh manifest read here; that trade-off is + // documented in §VI's "Explicit non-commitments" subsection. // - // Multi-coordinator deployments (§VI.27 aspirational) would - // require force-refresh under the queue here. That trade-off - // is documented in §VI's "Explicit non-commitments" subsection. + // For mutations whose semantics depend on read-then-write + // ordering against committed state (the §VI.36 SERIALIZABLE + // opt-in is the future seam), the bench's simple + // append/upsert pattern doesn't tickle that: Lance rebases + // a stage_append/stage_merge_insert onto the new committed + // version at commit_staged time and the new rows land alongside + // whatever the pre-queue writer added. That is correct SI + // semantics. Predicate-locked SERIALIZABLE writes will need + // an additional revalidation step here. + // + // Cost: one in-memory snapshot read (no I/O) + a single update + // per touched table to `expected_versions`. Replaces PR 1b's + // fresh `snapshot_for_branch(branch)` per mutation, closing + // the -17%/-30% PR 1b regression. // // SAFETY: relies on (1) the per-(table, branch) WriteQueueManager // using exclusive `tokio::sync::Mutex<()>` (not `RwLock`), and // (2) the single-coordinator invariant (one Omnigraph engine - // per process). Migrating either premise breaks this skip; see - // master plan risk #2. - let total_tables = staged.len() + inline_committed.len(); - if total_tables > 1 { - let snapshot = db.snapshot().await; - for entry in &staged { - let current = snapshot.entry(&entry.table_key).map(|e| e.table_version); - match current { - Some(v) if v == entry.expected_version => {} - Some(other) => { - return Err(OmniError::manifest_conflict(format!( - "table '{}' pin moved from {} to {} between stage and commit", - entry.table_key, entry.expected_version, other, - ))); - } - None => { - return Err(OmniError::manifest_conflict(format!( - "table '{}' missing from manifest at commit time", - entry.table_key, - ))); - } - } - } - for table_key in inline_committed.keys() { - let expected = expected_versions.get(table_key).copied().ok_or_else(|| { - OmniError::manifest_internal(format!( - "StagedMutation::commit_all: missing expected version for inline-committed table '{}'", - table_key + // per process). Migrating either premise reintroduces the + // pre-queue drift class. + let snapshot = db.snapshot().await; + for entry in staged.iter_mut() { + let current = snapshot + .entry(&entry.table_key) + .map(|e| e.table_version) + .ok_or_else(|| { + OmniError::manifest_conflict(format!( + "table '{}' missing from manifest at commit time", + entry.table_key, )) })?; - let current = snapshot.entry(table_key).map(|e| e.table_version); - match current { - Some(v) if v == expected => {} - Some(other) => { - return Err(OmniError::manifest_conflict(format!( - "table '{}' pin moved from {} to {} between inline-commit and publish", - table_key, expected, other, - ))); - } - None => { - return Err(OmniError::manifest_conflict(format!( - "table '{}' missing from manifest at commit time", - table_key, - ))); - } - } + entry.expected_version = current; + expected_versions.insert(entry.table_key.clone(), current); + } + for (table_key, _update) in inline_committed.iter() { + // Inline-committed tables (delete-only path) had Lance HEAD + // advanced inside `delete_where` already. The post-commit + // pin is what landed in the manifest after the inline + // commit; refresh `expected_versions` to whatever the + // shared coordinator currently shows for this table so the + // publisher's CAS is internally consistent. + if let Some(current) = snapshot.entry(table_key).map(|e| e.table_version) { + expected_versions.insert(table_key.clone(), current); + } else { + return Err(OmniError::manifest_conflict(format!( + "table '{}' missing from manifest at commit time", + table_key, + ))); } } - // Avoid an unused-variable warning when `branch` is not consumed - // above (single-table fast path skips revalidation). let _ = branch; // Sidecar protocol: build the per-table pin list and write the