From 4c5fa3d8b81ceb62f561ca5c701a3e0036c72449 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Thu, 30 Apr 2026 23:10:19 +0200 Subject: [PATCH] =?UTF-8?q?MR-794=20step=201:=20address=20PR=20#67=20Codex?= =?UTF-8?q?=20P1=20=E2=80=94=20document=20chained-merge=20contract?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex flagged that combine_committed_with_staged can return duplicates on chained stage_merge_inserts: each call's MergeInsertBuilder runs against the committed view (it does not see prior staged fragments), so two staged merges whose source rows share keys both produce Operation::Update transactions whose new_fragments contain the shared row. The combined scan returns it twice. The bug is intrinsic to Lance's API: there is no public way to make MergeInsertBuilder see uncommitted fragments. Fixing the primitive itself requires either a Lance API extension or in-memory pre-merge logic, neither in scope for v1. The v1 fix is a parse-time companion (D₂′) added with the engine rewire in MR-794 step 2+: per touched table, ops must be all stage_append OR exactly one stage_merge_insert. Multi-table queries and append-chains remain safe; only chained merges on a single table are rejected. This commit: - Documents the contract on stage_merge_insert and combine_committed_with_staged so callers know the invariant the primitive relies on. - Adds tests/staged_writes.rs with four primitive-level tests: - stage_append + scan_with_staged shows committed + staged - stage_merge_insert dedupes superseded committed fragments (regression for the removed_fragment_ids fix that PR #66's 730631c added) - count_rows_with_staged matches scan - chained stage_merge_insert with shared key documents the duplicate-row behavior; assertion pins it so a future change either preserves the contract or consciously fixes it (and updates the test) Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/table_store.rs | 35 ++++ crates/omnigraph/tests/staged_writes.rs | 241 ++++++++++++++++++++++++ 2 files changed, 276 insertions(+) create mode 100644 crates/omnigraph/tests/staged_writes.rs diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index ccf4c2a..99ff2aa 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -601,6 +601,29 @@ impl TableStore { /// fragments. The transaction's `Operation::Update` carries the /// fragments-to-remove and fragments-to-add; for read-your-writes we /// expose `new_fragments` (rows that will be visible after commit). + /// + /// **Contract: do not chain `stage_merge_insert` calls on the same + /// table within one query.** Each call's `MergeInsertBuilder` runs + /// against the supplied dataset's committed view — it does not see + /// fragments produced by a previous staged merge on the same table. + /// Two chained `stage_merge_insert`s whose source rows share keys will + /// each independently produce `Operation::Update` transactions whose + /// `new_fragments` contain a row for the shared key. `scan_with_staged` + /// (and `count_rows_with_staged`) will then return both — i.e. + /// **duplicates by key**. + /// + /// This is intrinsic to the underlying Lance API: there is no public + /// way to make `MergeInsertBuilder` see uncommitted fragments. The + /// engine's mutation path enforces the rule "per touched table: all + /// stage_append OR exactly one stage_merge_insert" at parse time + /// (the D₂′ check landing with [MR-794](https://linear.app/modernrelay/issue/MR-794) + /// step 2+ in `exec/mutation.rs`). Multi-table queries and append-chains + /// remain safe; only chained merges on a single table are rejected. + /// + /// Lift path: either a Lance API extension that lets + /// `MergeInsertBuilder` accept additional staged fragments, or an + /// in-memory pre-merge here that folds prior staged batches into the + /// input stream. See `docs/runs.md` and MR-793. pub async fn stage_merge_insert( &self, ds: Dataset, @@ -877,6 +900,7 @@ impl TableStore { /// 1. committed fragments whose IDs are NOT in any staged /// `removed_fragment_ids` (preserves committed order), /// 2. all staged `new_fragments` in stage order. +/// /// Lance's `Scanner` does not require any particular ordering between /// committed and staged fragments — `with_fragments` scopes the scan to /// exactly the supplied list. The dedup matters because merge_insert @@ -884,6 +908,17 @@ impl TableStore { /// fragment is in `new_fragments`, the original (which it supersedes) is /// in `committed` until manifest commit, and including both would yield /// duplicate rows. +/// +/// **Inter-stage supersession is not handled here.** Each StagedWrite's +/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a +/// later staged merge cannot know about an earlier staged merge's +/// fragments (Lance's `MergeInsertBuilder` runs against the committed +/// view). If two `stage_merge_insert`s on the same table produce rows +/// with the same key, the combined view returns duplicates by key. The +/// engine's mutation path enforces "per touched table: all stage_append +/// OR exactly one stage_merge_insert" at parse time (D₂′ in +/// `exec/mutation.rs`) so this primitive's caller never chains merges. +/// See `stage_merge_insert` for the full contract. fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec { let removed: std::collections::HashSet = staged .iter() diff --git a/crates/omnigraph/tests/staged_writes.rs b/crates/omnigraph/tests/staged_writes.rs new file mode 100644 index 0000000..06fffc2 --- /dev/null +++ b/crates/omnigraph/tests/staged_writes.rs @@ -0,0 +1,241 @@ +//! Primitive-level tests for `TableStore`'s staged-write API +//! (MR-794 step 1). These exercise `stage_append`, `stage_merge_insert`, +//! `scan_with_staged`, and `count_rows_with_staged` directly against a +//! Lance dataset — no Omnigraph engine involved. The engine-level rewire +//! (MR-794 step 2+) lives in `tests/runs.rs` once it lands. +//! +//! Test surface here: +//! 1. `stage_append` + `scan_with_staged` shows committed + staged data +//! without duplicates. +//! 2. `stage_merge_insert` of a row that supersedes a committed fragment +//! surfaces only the rewritten row, not both — the +//! `removed_fragment_ids` dedup landed in PR #66's `730631c`. +//! 3. **Documented contract**: chained `stage_merge_insert` calls on the +//! same dataset whose source rows share keys produce duplicate rows in +//! `scan_with_staged`. The engine's parse-time D₂′ check (MR-794 step +//! 2+) prevents callers from triggering this; this test pins the +//! primitive's behavior so a future change either (a) preserves it or +//! (b) consciously fixes it (and updates this test). + +use arrow_array::{Int32Array, RecordBatch, StringArray}; +use arrow_schema::{DataType, Field, Schema}; +use lance::dataset::{WhenMatched, WhenNotMatched}; +use omnigraph::table_store::TableStore; +use std::sync::Arc; + +fn person_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("age", DataType::Int32, true), + ])) +} + +fn person_batch(rows: &[(&str, Option)]) -> RecordBatch { + let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect(); + let ages: Vec> = rows.iter().map(|(_, age)| *age).collect(); + RecordBatch::try_new( + person_schema(), + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(Int32Array::from(ages)), + ], + ) + .unwrap() +} + +fn collect_ids(batches: &[RecordBatch]) -> Vec { + let mut out = Vec::new(); + for b in batches { + let ids = b + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ids.len() { + out.push(ids.value(i).to_string()); + } + } + out.sort(); + out +} + +#[tokio::test] +async fn stage_append_is_visible_via_scan_with_staged() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + // Seed: one committed row. + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + + // Stage a second row. + let staged = store + .stage_append(&ds, person_batch(&[("bob", Some(25))])) + .await + .unwrap(); + + // scan_with_staged sees both committed alice + staged bob, no duplicates. + let batches = store + .scan_with_staged(&ds, std::slice::from_ref(&staged), None, None) + .await + .unwrap(); + assert_eq!(collect_ids(&batches), vec!["alice", "bob"]); + + // Plain scan (no staged) still sees only committed alice — dataset HEAD + // hasn't moved. + let plain = store.scan_batches(&ds).await.unwrap(); + assert_eq!(collect_ids(&plain), vec!["alice"]); +} + +#[tokio::test] +async fn stage_merge_insert_dedupes_superseded_committed_fragment() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + // Seed: alice age 30 in one committed fragment. + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + + // Stage a merge_insert that rewrites alice's row. This produces an + // Operation::Update whose removed_fragment_ids excludes the committed + // fragment that contained the old alice. + let staged = store + .stage_merge_insert( + ds.clone(), + person_batch(&[("alice", Some(31))]), + vec!["id".to_string()], + WhenMatched::UpdateAll, + WhenNotMatched::InsertAll, + ) + .await + .unwrap(); + assert!( + !staged.removed_fragment_ids.is_empty(), + "merge_insert that rewrites a committed row must set removed_fragment_ids \ + (this is the dedup invariant from PR #66 commit 730631c — its absence \ + was caught by Cubic/Cursor/Codex on PR #66)" + ); + + // scan_with_staged: alice appears exactly once, with the new age. + let batches = store + .scan_with_staged(&ds, std::slice::from_ref(&staged), None, None) + .await + .unwrap(); + let ids = collect_ids(&batches); + assert_eq!(ids, vec!["alice"], "merge_insert must not surface duplicates"); + + // Confirm the visible row is the rewritten one. + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 1); + let ages: Vec = batches + .iter() + .flat_map(|b| { + let col = b + .column_by_name("age") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..col.len()).map(|i| col.value(i)).collect::>() + }) + .collect(); + assert_eq!(ages, vec![31]); +} + +#[tokio::test] +async fn count_rows_with_staged_matches_scan() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + let staged = store + .stage_append(&ds, person_batch(&[("bob", Some(25)), ("carol", Some(40))])) + .await + .unwrap(); + + let count = store + .count_rows_with_staged(&ds, std::slice::from_ref(&staged), None) + .await + .unwrap(); + assert_eq!(count, 3); +} + +/// **Documented contract** (see `stage_merge_insert` doc): chained +/// `stage_merge_insert` calls on the same table whose source rows share +/// keys cannot dedupe across stages. Each call's `MergeInsertBuilder` runs +/// against the committed view; neither sees the other's staged fragments. +/// The combined `scan_with_staged` therefore returns the shared key +/// twice. +/// +/// The engine's mutation path enforces D₂′ (per touched table: all +/// stage_append OR exactly one stage_merge_insert) at parse time so this +/// scenario is unreachable through public APIs. This test pins the +/// primitive behavior — if a future change makes the primitive itself +/// dedupe across stages (e.g. via a Lance API extension or in-memory +/// pre-merge), update this assertion. +#[tokio::test] +async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + // Seed empty (an unrelated row keeps the schema unambiguous). + let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))])) + .await + .unwrap(); + + // Op-1: stage merge_insert of alice. Against committed view: alice + // doesn't exist, so this lands as a fresh insert into Operation::Update.new_fragments. + let staged_1 = store + .stage_merge_insert( + ds.clone(), + person_batch(&[("alice", Some(30))]), + vec!["id".to_string()], + WhenMatched::UpdateAll, + WhenNotMatched::InsertAll, + ) + .await + .unwrap(); + + // Op-2: stage merge_insert of alice with a different age. Also runs + // against the committed view (alice doesn't exist there either), so + // Lance produces another fresh insert. Op-2 has no knowledge of + // op-1's staged fragments. + let staged_2 = store + .stage_merge_insert( + ds.clone(), + person_batch(&[("alice", Some(31))]), + vec!["id".to_string()], + WhenMatched::UpdateAll, + WhenNotMatched::InsertAll, + ) + .await + .unwrap(); + + // scan_with_staged sees committed (seed) + op-1.new (alice age=30) + + // op-2.new (alice age=31). Alice appears twice — the documented + // contract violation that D₂′ prevents at the engine layer. + let batches = store + .scan_with_staged(&ds, &[staged_1, staged_2], None, None) + .await + .unwrap(); + let ids = collect_ids(&batches); + let alice_count = ids.iter().filter(|id| *id == "alice").count(); + assert_eq!( + alice_count, 2, + "chained stage_merge_insert with shared key produces duplicates — \ + this is the contract documented on stage_merge_insert. If you're \ + here because this assertion failed: either (a) the primitive was \ + improved to dedupe across stages (good — update to assert == 1) \ + or (b) something subtler broke (investigate before changing the \ + assertion). See PR #67 Codex P1 thread + .context/mr-794-step2-design.md §3.1." + ); +}