diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 22d8148..9616e0d 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -8,6 +8,7 @@ use lance::Dataset; use lance::blob::BlobArrayBuilder; use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner}; use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder}; +use lance::dataset::write::merge_insert::SourceDedupeBehavior; use lance::dataset::{ CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams, @@ -651,15 +652,58 @@ impl TableStore { return self.table_state(dataset_uri, &ds).await; } + // Precondition for the FirstSeen workaround below: every caller of + // this primitive must hand in a source batch that is unique by + // `key_columns`. Without this check, `SourceDedupeBehavior::FirstSeen` + // would silently collapse genuine duplicates instead of erroring. + check_batch_unique_by_keys(&batch, &key_columns, "merge_insert_batch")?; + // TODO(lance-upstream): MergeInsertBuilder does not accept WriteParams, // so allow_external_blob_outside_bases cannot be set here. External URI // blobs via merge_insert (LoadMode::Merge, mutations) are unsupported // until Lance exposes WriteParams on MergeInsertBuilder. let ds = Arc::new(ds); - let job = MergeInsertBuilder::try_new(ds, key_columns) - .map_err(|e| OmniError::Lance(e.to_string()))? - .when_matched(when_matched) - .when_not_matched(when_not_matched) + let mut builder = MergeInsertBuilder::try_new(ds, key_columns) + .map_err(|e| OmniError::Lance(e.to_string()))?; + builder.when_matched(when_matched); + builder.when_not_matched(when_not_matched); + // Workaround for a Lance 4.0.x bug class where sequential + // merge_insert calls against rows previously rewritten by + // merge_insert produce a spurious "Ambiguous merge inserts: + // multiple source rows match the same target row on (id = ...)" + // error. Lance's `processed_row_ids: Mutex>` + // (lance-4.0.0 `src/dataset/write/merge_insert.rs:2099`) + // double-processes the same source/target match against + // datasets previously rewritten by merge_insert, and the default + // `SourceDedupeBehavior::Fail` errors on the second insertion. + // `FirstSeen` makes Lance skip the duplicate match instead. + // + // Covers both observed surfaces: + // - PR #98 (sequential `load --mode merge` against same keys). + // - MR-920 (sequential `update T set {f} where x=y` on same row). + // + // Correctness-preserving for OmniGraph because every call path + // that reaches this primitive either pre-dedupes the source batch + // by id, or surfaces a real source dup via the + // `check_batch_unique_by_keys` precondition above (which fires + // before the FirstSeen setter has a chance to silently collapse + // anything): + // - Load path: `enforce_unique_constraints_intra_batch` + // (`loader/mod.rs:1453`) errors on intra-batch `@key` dups. + // - Mutate path: `MutationStaging::finalize` (`exec/staging.rs`) + // accumulates and dedupes by `id`. + // - Branch-merge path: `compute_source_delta` / + // `compute_three_way_delta` (`exec/merge.rs`) walk via + // `OrderedTableCursor` and `push_row` each id at most once. + // So FirstSeen only suppresses the spurious Lance behavior, never + // user data. Pinned by `loader_rejects_intra_batch_duplicate_keys` + // in `tests/consistency.rs` plus the + // `check_batch_unique_by_keys` precondition. + // + // Retire when upstream Lance fixes the bug class. Tracked at + // MR-957; upstream: lance-format/lance#6877. + builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen); + let job = builder .try_build() .map_err(|e| OmniError::Lance(e.to_string()))?; @@ -870,11 +914,26 @@ impl TableStore { "stage_merge_insert called with empty batch".to_string(), )); } + + // Precondition for FirstSeen below. See the comment on + // `merge_insert_batch` for why this check is here, not on the caller: + // every call path that reaches stage_merge_insert (load, + // MutationStaging::finalize, branch_merge::publish_rewritten_merge_table) + // must hand in a source batch that is unique by `key_columns`. + check_batch_unique_by_keys(&batch, &key_columns, "stage_merge_insert")?; + let ds = Arc::new(ds); - let job = MergeInsertBuilder::try_new(ds, key_columns) - .map_err(|e| OmniError::Lance(e.to_string()))? - .when_matched(when_matched) - .when_not_matched(when_not_matched) + let mut builder = MergeInsertBuilder::try_new(ds, key_columns) + .map_err(|e| OmniError::Lance(e.to_string()))?; + builder.when_matched(when_matched); + builder.when_not_matched(when_not_matched); + // See `merge_insert_batch` for the FirstSeen rationale. Workaround + // for the Lance 4.0.x bug class where sequential merge_insert / + // update against rows previously rewritten by merge_insert trips + // Lance's `processed_row_ids` HashSet and errors under the default + // `SourceDedupeBehavior::Fail`. Retire when upstream Lance is fixed. + builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen); + let job = builder .try_build() .map_err(|e| OmniError::Lance(e.to_string()))?; let schema = batch.schema(); @@ -1651,3 +1710,107 @@ fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec Result<()> { + if key_columns.len() != 1 { + return Err(OmniError::manifest_internal(format!( + "{}: check_batch_unique_by_keys currently supports single-column keys only, got {:?}", + context, key_columns + ))); + } + let key_col_name = &key_columns[0]; + let column = batch.column_by_name(key_col_name).ok_or_else(|| { + OmniError::manifest_internal(format!( + "{}: source batch missing key column '{}'", + context, key_col_name + )) + })?; + let strs = column + .as_any() + .downcast_ref::() + .ok_or_else(|| { + OmniError::manifest_internal(format!( + "{}: key column '{}' is not a StringArray (got {:?})", + context, + key_col_name, + column.data_type() + )) + })?; + + let mut seen: std::collections::HashSet<&str> = + std::collections::HashSet::with_capacity(batch.num_rows()); + for i in 0..strs.len() { + if !strs.is_valid(i) { + continue; + } + let v = strs.value(i); + if !seen.insert(v) { + return Err(OmniError::manifest(format!( + "{}: duplicate source row for key '{}' (column '{}'); \ + callers must hand in a batch unique by `key_columns` \ + — see MR-957", + context, v, key_col_name + ))); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::StringArray; + use arrow_schema::{DataType, Field, Schema}; + + fn batch_with_ids(ids: &[&str]) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)])); + let col = Arc::new(StringArray::from(ids.to_vec())) as ArrayRef; + RecordBatch::try_new(schema, vec![col]).unwrap() + } + + #[test] + fn check_batch_unique_by_keys_passes_when_all_unique() { + let batch = batch_with_ids(&["a", "b", "c"]); + check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap(); + } + + #[test] + fn check_batch_unique_by_keys_errors_on_duplicate_id() { + let batch = batch_with_ids(&["a", "b", "a"]); + let err = + check_batch_unique_by_keys(&batch, &["id".to_string()], "test").unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("duplicate source row for key 'a'"), + "unexpected error: {msg}" + ); + assert!(msg.contains("MR-957"), "error should reference MR-957: {msg}"); + } + + #[test] + fn check_batch_unique_by_keys_rejects_multi_column_keys() { + let batch = batch_with_ids(&["a"]); + let err = check_batch_unique_by_keys( + &batch, + &["id".to_string(), "other".to_string()], + "test", + ) + .unwrap_err(); + assert!(err.to_string().contains("single-column keys only")); + } +} diff --git a/crates/omnigraph/tests/consistency.rs b/crates/omnigraph/tests/consistency.rs index 63dc3f7..8986ecb 100644 --- a/crates/omnigraph/tests/consistency.rs +++ b/crates/omnigraph/tests/consistency.rs @@ -119,6 +119,189 @@ async fn load_merge_upserts_existing_and_inserts_new() { } } +/// Regression: two sequential `LoadMode::Merge` invocations against the +/// same set of keys must both succeed. Pre-fix, the second one failed +/// with `Ambiguous merge inserts are prohibited: multiple source rows +/// match the same target row on (id = "TEST-1")` even though every +/// source batch had one row per key. +/// +/// Triggered by Lance's `processed_row_ids: Mutex>` +/// (lance-4.0.0 `src/dataset/write/merge_insert.rs:2099`) double- +/// processing the same source/target match against datasets previously +/// rewritten by merge_insert. Worked around by opting +/// `MergeInsertBuilder` into `SourceDedupeBehavior::FirstSeen` in +/// `crates/omnigraph/src/table_store.rs` — see that file for the full +/// rationale and the safety pin (`loader_rejects_intra_batch_duplicate_keys`). +/// Tracked at MR-957; upstream: lance-format/lance#6877. +#[tokio::test] +async fn load_merge_repeated_against_overlapping_keys_succeeds() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let schema = r#" +node Thing { + key: String @key + required_val: String + optional_val: String? +} +"#; + let mut db = Omnigraph::init(uri, schema).await.unwrap(); + + // Seed with 50 fully-populated rows (id + required + optional). + let mut seed = String::new(); + for i in 1..=50 { + seed.push_str(&format!( + r#"{{"type":"Thing","data":{{"key":"TEST-{i}","required_val":"required {i}","optional_val":"optional {i}"}}}} +"#, + )); + } + load_jsonl(&mut db, &seed, LoadMode::Overwrite) + .await + .unwrap(); + + // Partial-schema delta — mirrors the bug report exactly: omits + // `optional_val`. 25 existing keys + 5 new keys, one row per key. + let mut delta = String::new(); + for i in (1..=25).chain(51..=55) { + delta.push_str(&format!( + r#"{{"type":"Thing","data":{{"key":"TEST-{i}","required_val":"required {i} UPDATED"}}}} +"#, + )); + } + + load_jsonl(&mut db, &delta, LoadMode::Merge) + .await + .expect("first merge must succeed"); + assert_eq!(count_rows(&db, "node:Thing").await, 55); + + load_jsonl(&mut db, &delta, LoadMode::Merge) + .await + .expect("second merge against same keys must succeed"); + assert_eq!(count_rows(&db, "node:Thing").await, 55); +} + +/// Safety pin for the `SourceDedupeBehavior::FirstSeen` workaround in +/// `crates/omnigraph/src/table_store.rs`. FirstSeen tells Lance to +/// silently skip a duplicate source row instead of erroring. Our use of +/// it depends on user-provided duplicates being rejected *before* the +/// batch reaches Lance — otherwise FirstSeen could silently drop user +/// data. +/// +/// Defense in depth: +/// 1. The loader's `enforce_unique_constraints_intra_batch` +/// (`loader/mod.rs:1453`), invoked unconditionally on any node type +/// with a `@key`, errors on intra-batch duplicate `@key` values at +/// intake — pinned by this test across every `LoadMode`. +/// 2. The `check_batch_unique_by_keys` precondition at the top of +/// `merge_insert_batch` and `stage_merge_insert` is the final +/// fail-fast guard: even if a future caller bypasses the loader path +/// (e.g. branch-merge's `publish_rewritten_merge_table` builds its +/// own source batch directly), a real duplicate id reaches Lance +/// only after surfacing as an `OmniError::Manifest`, never silently +/// via FirstSeen. Pinned by the unit tests in `table_store::tests`. +#[tokio::test] +async fn loader_rejects_intra_batch_duplicate_keys() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let schema = r#" +node Thing { + key: String @key + value: String +} +"#; + let mut db = Omnigraph::init(uri, schema).await.unwrap(); + + let dupes = r#"{"type":"Thing","data":{"key":"DUP","value":"first"}} +{"type":"Thing","data":{"key":"DUP","value":"second"}} +"#; + + for mode in [LoadMode::Overwrite, LoadMode::Append, LoadMode::Merge] { + let err = load_jsonl(&mut db, dupes, mode).await.unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("@unique violation") && msg.contains("DUP"), + "load mode {mode:?} must reject intra-batch duplicate @key (got: {msg})" + ); + assert_eq!( + count_rows(&db, "node:Thing").await, + 0, + "load mode {mode:?} must not persist any rows when the batch is rejected" + ); + } +} + +/// Canary for the upstream Lance gap that the `FirstSeen` workaround +/// in `table_store.rs` masks. The bug class is "Window 2": load → +/// indices built explicitly → merge → merge. Even with the engine +/// fully aligned to the "indexes are derived state" invariant +/// (MR-848), as long as an `id` index has been built between the +/// first and second merge_insert, the Lance internal that triggers +/// the bug remains reachable. +/// +/// This test runs the Window-2 sequence under the FirstSeen workaround. +/// It is expected to pass today. If a future Lance upgrade or local +/// change makes it START failing, the workaround has lost effectiveness +/// (upstream Lance changed something, or the FirstSeen setter was +/// dropped from `table_store.rs`). If a future Lance upgrade fixes the +/// bug class, this test continues to pass and the FirstSeen setter can +/// be retired. +/// +/// Tracked at MR-957; upstream: lance-format/lance#6877. +#[tokio::test] +async fn load_merge_window_2_documents_upstream_lance_gap() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let schema = r#" +node Thing { + key: String @key + required_val: String + optional_val: String? +} +"#; + let mut db = Omnigraph::init(uri, schema).await.unwrap(); + + let mut seed = String::new(); + for i in 1..=50 { + seed.push_str(&format!( + r#"{{"type":"Thing","data":{{"key":"TEST-{i}","required_val":"required {i}","optional_val":"optional {i}"}}}} +"#, + )); + } + load_jsonl(&mut db, &seed, LoadMode::Overwrite) + .await + .unwrap(); + + // Explicit ensure_indices between seed and the merges — the Window + // 2 trigger. The eager-build behavior (MR-583) means the BTREE on + // `id` is already present here, but calling explicitly pins the + // invariant for the post-MR-848 future where the eager build is + // gone. + db.ensure_indices().await.unwrap(); + + let mut delta = String::new(); + for i in (1..=25).chain(51..=55) { + delta.push_str(&format!( + r#"{{"type":"Thing","data":{{"key":"TEST-{i}","required_val":"required {i} UPDATED"}}}} +"#, + )); + } + + // Both merges must succeed under the FirstSeen workaround. + // `processed_row_ids` re-processes the same target row_id under + // the default `SourceDedupeBehavior::Fail`; FirstSeen tolerates it. + load_jsonl(&mut db, &delta, LoadMode::Merge) + .await + .expect("first merge after ensure_indices must succeed"); + db.ensure_indices().await.unwrap(); + load_jsonl(&mut db, &delta, LoadMode::Merge) + .await + .expect( + "second merge after ensure_indices must succeed \ + (Window 2 canary: drop the FirstSeen setter in table_store.rs \ + only when this stays green WITHOUT it)", + ); + assert_eq!(count_rows(&db, "node:Thing").await, 55); +} + #[tokio::test] async fn cross_type_traversal_deduplicates_duplicate_edges() { let dir = tempfile::tempdir().unwrap(); diff --git a/crates/omnigraph/tests/runs.rs b/crates/omnigraph/tests/runs.rs index 4e363bf..f2d7dc3 100644 --- a/crates/omnigraph/tests/runs.rs +++ b/crates/omnigraph/tests/runs.rs @@ -521,6 +521,10 @@ query delete_two_persons($first: String, $second: String) { delete Person where name = $first delete Person where name = $second } + +query update_age_by_name($name: String, $age: I32) { + update Person set { age: $age } where name = $name +} "#; /// D₂: a query mixing inserts/updates with deletes is rejected at parse @@ -1362,3 +1366,85 @@ query insert_then_update_note( .unwrap(); assert_eq!(qr.num_rows(), 0, "letter must not be visible after early error"); } + +/// MR-920 regression: two sequential `update T set {f:v} where x=y` +/// invocations against the same row must both succeed. Pre-fix, the +/// second one failed with `Ambiguous merge inserts are prohibited: +/// multiple source rows match the same target row on (id = "Alice")` +/// even though the scan returned exactly one row. +/// +/// Root cause hypothesis (per MR-920): Lance's +/// `processed_row_ids: Mutex>` +/// (`src/dataset/write/merge_insert.rs:2099`) double-processes the +/// same target row_id against datasets previously rewritten by +/// merge_insert. `SourceDedupeBehavior::FirstSeen` makes Lance skip +/// rather than error. +/// +/// Companion to `consistency.rs::load_merge_repeated_against_overlapping_keys_succeeds` +/// (PR #98 / Window 1 of the bug class via the load surface). +#[tokio::test] +async fn second_sequential_update_on_same_row_succeeds() { + let dir = tempfile::tempdir().unwrap(); + let mut db = init_and_load(&dir).await; + + db.mutate( + "main", + STAGED_QUERIES, + "update_age_by_name", + &mixed_params(&[("$name", "Alice")], &[("$age", 99)]), + ) + .await + .expect("first sequential update on Alice must succeed"); + + let batches = read_table(&db, "node:Person").await; + let alice_count: usize = batches + .iter() + .map(|b| { + let names = b + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..b.num_rows()) + .filter(|i| names.is_valid(*i) && names.value(*i) == "Alice") + .count() + }) + .sum(); + assert_eq!( + alice_count, 1, + "after first update, exactly one Alice row should be visible" + ); + + db.mutate( + "main", + STAGED_QUERIES, + "update_age_by_name", + &mixed_params(&[("$name", "Alice")], &[("$age", 42)]), + ) + .await + .expect("second sequential update on Alice must succeed"); + + let batches = read_table(&db, "node:Person").await; + let mut alice_age: Option = None; + for batch in &batches { + let names = batch + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let ages = batch + .column_by_name("age") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + if names.is_valid(i) && names.value(i) == "Alice" && ages.is_valid(i) { + alice_age = Some(ages.value(i)); + } + } + } + assert_eq!(alice_age, Some(42), "Alice's age must reflect the second update"); +}