table_store: opt MergeInsertBuilder into SourceDedupeBehavior::FirstSeen

Lance 4.0.x's MergeInsertBuilder rejects the second sequential
`update T set {f:v} where x=y` against the same row in a multi-row
table with "Ambiguous merge inserts: multiple source rows match the
same target row on (id = ...)". omnigraph passes exactly 1 source row
(verified by instrumenting `execute_update` and `MutationStaging::finalize`:
scan returns 1 row, apply_assignments returns 1 row, the batch fed to
`stage_merge_insert` has 1 row with the expected id), but Lance's
`processed_row_ids: Mutex<HashSet<u64>>` (lance-4.0.0
src/dataset/write/merge_insert.rs:2099) double-processes the same
source/target match against datasets previously rewritten by
merge_insert and trips its own dedupe guard.

Fix: opt the engine's two MergeInsertBuilder call sites
(`stage_merge_insert`, `merge_insert_batch`) into
`SourceDedupeBehavior::FirstSeen`, which silently skips duplicate
matches instead of erroring. Correctness-preserving for omnigraph
because `MutationStaging::finalize` already dedupes the source by id
before this call — there cannot be true source-side duplicates at
this layer, so FirstSeen only suppresses the spurious Lance bug.

Test `runs.rs::second_sequential_update_on_same_row_succeeds` is the
positive regression for the cross-query update path. Existing
`chained_updates_with_overlapping_predicate_respects_intermediate_value`
covers chained updates within ONE query (handled by MutationStaging's
in-memory accumulator); this new test exercises the case where each
call gets a fresh staging accumulator and re-opens the dataset at HEAD.

This is a workaround at the omnigraph layer. The right long-term fix
likely lives in Lance (hardening the merge plan against
double-processing, or making FirstSeen the default for stable-row-id
datasets) or in the storage-layer refactor (MR-793 family) where the
merge_insert primitives are scoped to the engine.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
andrew 2026-05-10 16:23:01 +03:00 committed by aaltshuler
parent 60eee78465
commit 0c863caf01
2 changed files with 132 additions and 8 deletions

View file

@ -12,6 +12,7 @@ use lance::dataset::{
CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
WriteParams,
};
use lance::dataset::write::merge_insert::SourceDedupeBehavior;
use lance::datatypes::BlobKind;
use lance::index::scalar::IndexDetails;
use lance_file::version::LanceFileVersion;
@ -656,10 +657,13 @@ impl TableStore {
// blobs via merge_insert (LoadMode::Merge, mutations) are unsupported
// until Lance exposes WriteParams on MergeInsertBuilder.
let ds = Arc::new(ds);
let job = MergeInsertBuilder::try_new(ds, key_columns)
.map_err(|e| OmniError::Lance(e.to_string()))?
.when_matched(when_matched)
.when_not_matched(when_not_matched)
let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
.map_err(|e| OmniError::Lance(e.to_string()))?;
builder.when_matched(when_matched);
builder.when_not_matched(when_not_matched);
// See `stage_merge_insert` for the rationale on FirstSeen.
builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
let job = builder
.try_build()
.map_err(|e| OmniError::Lance(e.to_string()))?;
@ -871,10 +875,26 @@ impl TableStore {
));
}
let ds = Arc::new(ds);
let job = MergeInsertBuilder::try_new(ds, key_columns)
.map_err(|e| OmniError::Lance(e.to_string()))?
.when_matched(when_matched)
.when_not_matched(when_not_matched)
let mut builder = MergeInsertBuilder::try_new(ds, key_columns)
.map_err(|e| OmniError::Lance(e.to_string()))?;
builder.when_matched(when_matched);
builder.when_not_matched(when_not_matched);
// Lance 4.0.x's MergeInsertBuilder occasionally double-processes
// the same source/target match against datasets that were
// previously rewritten by merge_insert (the second sequential
// update on the same row in a multi-row table consistently fails
// with "Ambiguous merge inserts: multiple source rows match the
// same target row"). Confirmed in
// `runs.rs::second_sequential_update_on_same_row_succeeds`: the
// engine hands Lance exactly 1 source row, yet Lance's
// `processed_row_ids` set rejects the second matching write.
// `SourceDedupeBehavior::FirstSeen` makes Lance skip the
// duplicate match instead of failing — for the omnigraph layer
// this is correctness-preserving because we always pass
// dedup-by-id source batches (`MutationStaging::finalize`
// dedupes before this call).
builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
let job = builder
.try_build()
.map_err(|e| OmniError::Lance(e.to_string()))?;
let schema = batch.schema();

View file

@ -521,6 +521,10 @@ query delete_two_persons($first: String, $second: String) {
delete Person where name = $first
delete Person where name = $second
}
query update_age_by_name($name: String, $age: I32) {
update Person set { age: $age } where name = $name
}
"#;
/// D₂: a query mixing inserts/updates with deletes is rejected at parse
@ -1362,3 +1366,103 @@ query insert_then_update_note(
.unwrap();
assert_eq!(qr.num_rows(), 0, "letter must not be visible after early error");
}
/// Two SEQUENTIAL `update T set {f:v} where <key>=$x` queries on the
/// same row across SEPARATE `mutate` calls must both succeed.
///
/// Existing coverage at `chained_updates_with_overlapping_predicate_respects_intermediate_value`
/// tests chained updates within ONE query (handled by `MutationStaging`'s
/// in-memory accumulator). This test exercises the cross-query case
/// where each call gets a fresh staging accumulator and re-opens the
/// dataset at HEAD.
///
/// Discovered 2026-05-08 via the omnigraph-ui server-mode demo
/// (Approve→Reject toggle on the same PolicyClause). Lance 4.0.x's
/// `MergeInsertBuilder` rejected the second update with "Ambiguous
/// merge inserts: multiple source rows match the same target row on
/// (id = ...)" even though omnigraph passes exactly 1 source row.
/// Instrumented the source batch (1 row, correct id) and bisected to
/// Lance's `processed_row_ids` Mutex double-processing the same
/// source/target match against datasets that were previously rewritten
/// by merge_insert. Fix: opt the engine's `stage_merge_insert` and
/// `merge_insert_batch` into Lance's `SourceDedupeBehavior::FirstSeen`
/// so the duplicate match is silently skipped (correctness-preserving
/// because `MutationStaging::finalize` already dedupes by id before
/// this call).
#[tokio::test]
async fn second_sequential_update_on_same_row_succeeds() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
// First update: Alice -> 99. Should succeed.
db.mutate(
"main",
STAGED_QUERIES,
"update_age_by_name",
&mixed_params(&[("$name", "Alice")], &[("$age", 99)]),
)
.await
.expect("first sequential update on Alice must succeed");
// Confirm exactly one row matches between the two updates — this
// pins the expectation that the *committed* state is fine and the
// bug is in how the second update's source batch is constructed.
let batches = read_table(&db, "node:Person").await;
let alice_count: usize = batches
.iter()
.map(|b| {
let names = b
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
(0..b.num_rows())
.filter(|i| names.is_valid(*i) && names.value(*i) == "Alice")
.count()
})
.sum();
assert_eq!(
alice_count, 1,
"after first update, exactly one Alice row should be visible"
);
// Second update: Alice -> 42. Currently fails with
// "Ambiguous merge inserts are prohibited: multiple source rows
// match the same target row on (id = \"Alice\")".
db.mutate(
"main",
STAGED_QUERIES,
"update_age_by_name",
&mixed_params(&[("$name", "Alice")], &[("$age", 42)]),
)
.await
.expect("second sequential update on Alice must succeed");
// Final state assertion (will only run once the bug is fixed).
let batches = read_table(&db, "node:Person").await;
let mut alice_age: Option<i32> = None;
for batch in &batches {
let names = batch
.column_by_name("name")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
let ages = batch
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.unwrap();
for i in 0..batch.num_rows() {
if names.is_valid(i) && names.value(i) == "Alice" && ages.is_valid(i) {
alice_age = Some(ages.value(i));
}
}
}
assert_eq!(alice_age, Some(42), "Alice's age must reflect the second update");
}