From 0c863caf0158d2d561b64cb801e6dc6f27715622 Mon Sep 17 00:00:00 2001 From: andrew Date: Sun, 10 May 2026 16:23:01 +0300 Subject: [PATCH] table_store: opt MergeInsertBuilder into SourceDedupeBehavior::FirstSeen MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lance 4.0.x's MergeInsertBuilder rejects the second sequential `update T set {f:v} where x=y` against the same row in a multi-row table with "Ambiguous merge inserts: multiple source rows match the same target row on (id = ...)". omnigraph passes exactly 1 source row (verified by instrumenting `execute_update` and `MutationStaging::finalize`: scan returns 1 row, apply_assignments returns 1 row, the batch fed to `stage_merge_insert` has 1 row with the expected id), but Lance's `processed_row_ids: Mutex>` (lance-4.0.0 src/dataset/write/merge_insert.rs:2099) double-processes the same source/target match against datasets previously rewritten by merge_insert and trips its own dedupe guard. Fix: opt the engine's two MergeInsertBuilder call sites (`stage_merge_insert`, `merge_insert_batch`) into `SourceDedupeBehavior::FirstSeen`, which silently skips duplicate matches instead of erroring. Correctness-preserving for omnigraph because `MutationStaging::finalize` already dedupes the source by id before this call — there cannot be true source-side duplicates at this layer, so FirstSeen only suppresses the spurious Lance bug. Test `runs.rs::second_sequential_update_on_same_row_succeeds` is the positive regression for the cross-query update path. Existing `chained_updates_with_overlapping_predicate_respects_intermediate_value` covers chained updates within ONE query (handled by MutationStaging's in-memory accumulator); this new test exercises the case where each call gets a fresh staging accumulator and re-opens the dataset at HEAD. This is a workaround at the omnigraph layer. The right long-term fix likely lives in Lance (hardening the merge plan against double-processing, or making FirstSeen the default for stable-row-id datasets) or in the storage-layer refactor (MR-793 family) where the merge_insert primitives are scoped to the engine. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/table_store.rs | 36 +++++++--- crates/omnigraph/tests/runs.rs | 104 ++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 8 deletions(-) diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 22d8148..0f11463 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -12,6 +12,7 @@ use lance::dataset::{ CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams, }; +use lance::dataset::write::merge_insert::SourceDedupeBehavior; use lance::datatypes::BlobKind; use lance::index::scalar::IndexDetails; use lance_file::version::LanceFileVersion; @@ -656,10 +657,13 @@ impl TableStore { // blobs via merge_insert (LoadMode::Merge, mutations) are unsupported // until Lance exposes WriteParams on MergeInsertBuilder. let ds = Arc::new(ds); - let job = MergeInsertBuilder::try_new(ds, key_columns) - .map_err(|e| OmniError::Lance(e.to_string()))? - .when_matched(when_matched) - .when_not_matched(when_not_matched) + let mut builder = MergeInsertBuilder::try_new(ds, key_columns) + .map_err(|e| OmniError::Lance(e.to_string()))?; + builder.when_matched(when_matched); + builder.when_not_matched(when_not_matched); + // See `stage_merge_insert` for the rationale on FirstSeen. + builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen); + let job = builder .try_build() .map_err(|e| OmniError::Lance(e.to_string()))?; @@ -871,10 +875,26 @@ impl TableStore { )); } let ds = Arc::new(ds); - let job = MergeInsertBuilder::try_new(ds, key_columns) - .map_err(|e| OmniError::Lance(e.to_string()))? - .when_matched(when_matched) - .when_not_matched(when_not_matched) + let mut builder = MergeInsertBuilder::try_new(ds, key_columns) + .map_err(|e| OmniError::Lance(e.to_string()))?; + builder.when_matched(when_matched); + builder.when_not_matched(when_not_matched); + // Lance 4.0.x's MergeInsertBuilder occasionally double-processes + // the same source/target match against datasets that were + // previously rewritten by merge_insert (the second sequential + // update on the same row in a multi-row table consistently fails + // with "Ambiguous merge inserts: multiple source rows match the + // same target row"). Confirmed in + // `runs.rs::second_sequential_update_on_same_row_succeeds`: the + // engine hands Lance exactly 1 source row, yet Lance's + // `processed_row_ids` set rejects the second matching write. + // `SourceDedupeBehavior::FirstSeen` makes Lance skip the + // duplicate match instead of failing — for the omnigraph layer + // this is correctness-preserving because we always pass + // dedup-by-id source batches (`MutationStaging::finalize` + // dedupes before this call). + builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen); + let job = builder .try_build() .map_err(|e| OmniError::Lance(e.to_string()))?; let schema = batch.schema(); diff --git a/crates/omnigraph/tests/runs.rs b/crates/omnigraph/tests/runs.rs index 4e363bf..052b53b 100644 --- a/crates/omnigraph/tests/runs.rs +++ b/crates/omnigraph/tests/runs.rs @@ -521,6 +521,10 @@ query delete_two_persons($first: String, $second: String) { delete Person where name = $first delete Person where name = $second } + +query update_age_by_name($name: String, $age: I32) { + update Person set { age: $age } where name = $name +} "#; /// D₂: a query mixing inserts/updates with deletes is rejected at parse @@ -1362,3 +1366,103 @@ query insert_then_update_note( .unwrap(); assert_eq!(qr.num_rows(), 0, "letter must not be visible after early error"); } + +/// Two SEQUENTIAL `update T set {f:v} where =$x` queries on the +/// same row across SEPARATE `mutate` calls must both succeed. +/// +/// Existing coverage at `chained_updates_with_overlapping_predicate_respects_intermediate_value` +/// tests chained updates within ONE query (handled by `MutationStaging`'s +/// in-memory accumulator). This test exercises the cross-query case +/// where each call gets a fresh staging accumulator and re-opens the +/// dataset at HEAD. +/// +/// Discovered 2026-05-08 via the omnigraph-ui server-mode demo +/// (Approve→Reject toggle on the same PolicyClause). Lance 4.0.x's +/// `MergeInsertBuilder` rejected the second update with "Ambiguous +/// merge inserts: multiple source rows match the same target row on +/// (id = ...)" even though omnigraph passes exactly 1 source row. +/// Instrumented the source batch (1 row, correct id) and bisected to +/// Lance's `processed_row_ids` Mutex double-processing the same +/// source/target match against datasets that were previously rewritten +/// by merge_insert. Fix: opt the engine's `stage_merge_insert` and +/// `merge_insert_batch` into Lance's `SourceDedupeBehavior::FirstSeen` +/// so the duplicate match is silently skipped (correctness-preserving +/// because `MutationStaging::finalize` already dedupes by id before +/// this call). +#[tokio::test] +async fn second_sequential_update_on_same_row_succeeds() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + // First update: Alice -> 99. Should succeed. + db.mutate( + "main", + STAGED_QUERIES, + "update_age_by_name", + &mixed_params(&[("$name", "Alice")], &[("$age", 99)]), + ) + .await + .expect("first sequential update on Alice must succeed"); + + // Confirm exactly one row matches between the two updates — this + // pins the expectation that the *committed* state is fine and the + // bug is in how the second update's source batch is constructed. + let batches = read_table(&db, "node:Person").await; + let alice_count: usize = batches + .iter() + .map(|b| { + let names = b + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..b.num_rows()) + .filter(|i| names.is_valid(*i) && names.value(*i) == "Alice") + .count() + }) + .sum(); + assert_eq!( + alice_count, 1, + "after first update, exactly one Alice row should be visible" + ); + + // Second update: Alice -> 42. Currently fails with + // "Ambiguous merge inserts are prohibited: multiple source rows + // match the same target row on (id = \"Alice\")". + db.mutate( + "main", + STAGED_QUERIES, + "update_age_by_name", + &mixed_params(&[("$name", "Alice")], &[("$age", 42)]), + ) + .await + .expect("second sequential update on Alice must succeed"); + + // Final state assertion (will only run once the bug is fixed). + let batches = read_table(&db, "node:Person").await; + let mut alice_age: Option = None; + for batch in &batches { + let names = batch + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let ages = batch + .column_by_name("age") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + if names.is_valid(i) && names.value(i) == "Alice" && ages.is_valid(i) { + alice_age = Some(ages.value(i)); + } + } + } + assert_eq!(alice_age, Some(42), "Alice's age must reflect the second update"); +} + + +