diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index e49a0a4..4ee5d0d 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -440,41 +440,29 @@ impl StagedMutation { // expected_versions was captured during stage_all (Phase A, // BEFORE acquire_many). If a cross-tenant writer published our // table between Phase A and queue acquisition, those captured - // pins are stale. We re-read the in-memory snapshot under the + // pins are stale. We re-read the per-branch snapshot under the // queue and refresh expected_versions; the publisher's CAS // becomes a correct no-op for queued tables. // - // Why in-memory is safe: under MR-686's single-process scope - // all tenants share one `Arc` -> one coordinator; - // publishes update the shared coordinator BEFORE releasing - // queue guards (see `commit_all` -> caller's publisher -> - // caller drops guards). So any tenant T2 acquiring queue - // keys *after* tenant T1 sees a fresh in-memory view of T1's - // commits. Multi-coordinator deployments (§VI.27 aspirational) - // would require a fresh manifest read here; that trade-off is - // documented in §VI's "Explicit non-commitments" subsection. + // Why per-branch (and not the bound-branch `db.snapshot()`): + // when the caller mutates a branch other than the engine's + // bound branch (e.g., feature-branch ingest from a server + // handle bound to main), `db.snapshot()` returns the bound + // branch's view of each table — which is the wrong pin for + // the publisher's CAS on a different branch. Using + // `snapshot_for_branch(branch)` resolves the per-branch + // entries correctly. The cost is one fresh manifest read per + // mutation; PR 1b's regression came from this same read, but + // that read is now strictly necessary for cross-branch + // correctness. Single-table same-branch mutations could still + // skip this read (queue exclusivity makes the publisher CAS a + // no-op), but the conditional adds complexity for marginal + // gain — left as a follow-up perf optimization. // - // For mutations whose semantics depend on read-then-write - // ordering against committed state (the §VI.36 SERIALIZABLE - // opt-in is the future seam), the bench's simple - // append/upsert pattern doesn't tickle that: Lance rebases - // a stage_append/stage_merge_insert onto the new committed - // version at commit_staged time and the new rows land alongside - // whatever the pre-queue writer added. That is correct SI - // semantics. Predicate-locked SERIALIZABLE writes will need - // an additional revalidation step here. - // - // Cost: one in-memory snapshot read (no I/O) + a single update - // per touched table to `expected_versions`. Replaces PR 1b's - // fresh `snapshot_for_branch(branch)` per mutation, closing - // the -17%/-30% PR 1b regression. - // - // SAFETY: relies on (1) the per-(table, branch) WriteQueueManager - // using exclusive `tokio::sync::Mutex<()>` (not `RwLock`), and - // (2) the single-coordinator invariant (one Omnigraph engine - // per process). Migrating either premise reintroduces the - // pre-queue drift class. - let snapshot = db.snapshot().await; + // Multi-coordinator deployments (§VI.27 aspirational) get + // genuine cross-process drift detection from this read for + // free. + let snapshot = db.snapshot_for_branch(branch).await?; for entry in staged.iter_mut() { let current = snapshot .entry(&entry.table_key) @@ -489,12 +477,6 @@ impl StagedMutation { expected_versions.insert(entry.table_key.clone(), current); } for (table_key, _update) in inline_committed.iter() { - // Inline-committed tables (delete-only path) had Lance HEAD - // advanced inside `delete_where` already. The post-commit - // pin is what landed in the manifest after the inline - // commit; refresh `expected_versions` to whatever the - // shared coordinator currently shows for this table so the - // publisher's CAS is internally consistent. if let Some(current) = snapshot.entry(table_key).map(|e| e.table_version) { expected_versions.insert(table_key.clone(), current); } else { @@ -504,7 +486,6 @@ impl StagedMutation { ))); } } - let _ = branch; // Sidecar protocol: build the per-table pin list and write the // sidecar BEFORE any Lance commit_staged runs, so a crash