diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 6ebd7ba..6712df4 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -564,19 +564,41 @@ impl TableStore { /// uncommitted Lance transaction plus the new fragments for /// read-your-writes. /// - /// On stable-row-id datasets we manually populate `row_id_meta` on the - /// cloned `new_fragments` we expose for `scan_with_staged`. Lance's - /// `InsertBuilder::execute_uncommitted` produces fragments with - /// `row_id_meta = None`; row IDs are normally assigned by + /// `prior_stages` is the slice of staged writes already accumulated + /// against the **same dataset** in the same query. Pass `&[]` for the + /// first call; pass the accumulated stages for subsequent calls. The + /// primitive uses this to offset row-ID assignment so chained + /// `stage_append` calls don't produce overlapping `_rowid` ranges. + /// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same + /// slice gets passed to both. + /// + /// On stable-row-id datasets we manually populate `row_id_meta` on + /// the cloned `new_fragments` we expose for `scan_with_staged`. + /// Lance's `InsertBuilder::execute_uncommitted` produces fragments + /// with `row_id_meta = None`; row IDs are normally assigned by /// `Transaction::assign_row_ids` during commit. Because - /// `scan_with_staged` reads the staged fragments *before* commit, the - /// scanner trips on a stable-row-id dataset (`Error::internal("Missing - /// row id meta")` from `dataset/rowids.rs:22`). The transaction's - /// internal fragment copy stays untouched — Lance assigns IDs there - /// independently at commit time, and the two ID assignments don't - /// have to agree because no caller threads `_rowid` from the staged - /// scan into the commit path. - pub async fn stage_append(&self, ds: &Dataset, batch: RecordBatch) -> Result { + /// `scan_with_staged` reads the staged fragments *before* commit, + /// the scanner trips on a stable-row-id dataset + /// (`Error::internal("Missing row id meta")` from + /// `dataset/rowids.rs:22`). The transaction's internal fragment copy + /// stays untouched — Lance assigns IDs there independently at commit + /// time, and the two ID assignments don't have to agree because no + /// caller threads `_rowid` from the staged scan into the commit + /// path. + /// + /// **Contract: `prior_stages` must contain only previous + /// `stage_append` results against the same dataset.** Mixing + /// stage_merge_insert into `prior_stages` would over-count because + /// merge_insert's `new_fragments` include rewrites that don't add + /// rows. The engine's parse-time D₂′ check (per touched table: all + /// stage_append OR exactly one stage_merge_insert) guarantees this + /// upstream; on the primitive layer it's the caller's responsibility. + pub async fn stage_append( + &self, + ds: &Dataset, + batch: RecordBatch, + prior_stages: &[StagedWrite], + ) -> Result { if batch.num_rows() == 0 { return Err(OmniError::manifest_internal( "stage_append called with empty batch".to_string(), @@ -603,7 +625,9 @@ impl TableStore { } }; if ds.manifest.uses_stable_row_ids() { - assign_row_id_meta(&mut new_fragments, ds.manifest.next_row_id)?; + let prior_rows = prior_stages_row_count(prior_stages)?; + let start_row_id = ds.manifest.next_row_id + prior_rows; + assign_row_id_meta(&mut new_fragments, start_row_id)?; } Ok(StagedWrite { transaction, @@ -936,6 +960,29 @@ impl TableStore { /// OR exactly one stage_merge_insert" at parse time (D₂′ in /// `exec/mutation.rs`) so this primitive's caller never chains merges. /// See `stage_merge_insert` for the full contract. +/// Sum `physical_rows` across all fragments in the supplied stages. +/// Used by `stage_append` to compute the row-ID offset for chained +/// `stage_append` calls against the same dataset. +/// +/// Assumes `prior_stages` contains only `stage_append` results — see +/// `stage_append`'s D₂′ contract. For `stage_merge_insert` results the +/// `new_fragments` include rewrites that don't add new rows, so this +/// would over-count. +fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result { + let mut total: u64 = 0; + for stage in prior_stages { + for fragment in &stage.new_fragments { + let physical_rows = fragment.physical_rows.ok_or_else(|| { + OmniError::manifest_internal( + "prior_stages_row_count: fragment is missing physical_rows".to_string(), + ) + })? as u64; + total += physical_rows; + } + } + Ok(total) +} + /// Assign sequential row IDs to fragments that lack them, starting from /// `start_row_id`. Mirrors the relevant arm of Lance's /// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`) diff --git a/crates/omnigraph/tests/staged_writes.rs b/crates/omnigraph/tests/staged_writes.rs index 1d798a6..f0e70cd 100644 --- a/crates/omnigraph/tests/staged_writes.rs +++ b/crates/omnigraph/tests/staged_writes.rs @@ -17,10 +17,13 @@ //! primitive's behavior so a future change either (a) preserves it or //! (b) consciously fixes it (and updates this test). -use arrow_array::{Array, Int32Array, RecordBatch, StringArray}; +use arrow_array::{Array, Int32Array, RecordBatch, StringArray, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; +use futures::TryStreamExt; +use lance::Dataset; use lance::dataset::{WhenMatched, WhenNotMatched}; -use omnigraph::table_store::TableStore; +use lance_table::format::Fragment; +use omnigraph::table_store::{StagedWrite, TableStore}; use std::sync::Arc; fn person_schema() -> Arc { @@ -71,9 +74,9 @@ async fn stage_append_is_visible_via_scan_with_staged() { .await .unwrap(); - // Stage a second row. + // Stage a second row. First call → empty prior_stages. let staged = store - .stage_append(&ds, person_batch(&[("bob", Some(25))])) + .stage_append(&ds, person_batch(&[("bob", Some(25))]), &[]) .await .unwrap(); @@ -157,7 +160,11 @@ async fn count_rows_with_staged_matches_scan() { .await .unwrap(); let staged = store - .stage_append(&ds, person_batch(&[("bob", Some(25)), ("carol", Some(40))])) + .stage_append( + &ds, + person_batch(&[("bob", Some(25)), ("carol", Some(40))]), + &[], + ) .await .unwrap(); @@ -168,6 +175,223 @@ async fn count_rows_with_staged_matches_scan() { assert_eq!(count, 3); } +/// Two `stage_append` calls on the same dataset must produce +/// non-overlapping `_rowid` ranges. Without `prior_stages` threading, +/// both calls would assign IDs starting from `ds.manifest.next_row_id`, +/// producing overlapping ranges that break read paths consulting the +/// row-ID index (prefilter, vector search). With the slice threaded +/// through, the second call offsets by the first call's row count. +/// +/// This is what enables the engine's multi-statement `insert Knows ...; +/// insert Knows ...` (multiple appends to the same edge table) under +/// the D₂′ rule. +#[tokio::test] +async fn chained_stage_appends_have_distinct_row_ids() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))])) + .await + .unwrap(); + + let s1 = store + .stage_append(&ds, person_batch(&[("alice", Some(30))]), &[]) + .await + .unwrap(); + let s2 = store + .stage_append( + &ds, + person_batch(&[("bob", Some(25))]), + std::slice::from_ref(&s1), + ) + .await + .unwrap(); + + // Scan with row IDs requested. If s1 and s2 had overlapping _rowid + // ranges, Lance's scanner would conflict (or surface duplicates) on + // the combined fragment list. + let staged = vec![s1, s2]; + let batches = store + .scan_with_staged(&ds, &staged, None, None) + .await + .unwrap(); + let ids = collect_ids(&batches); + assert_eq!(ids, vec!["alice", "bob", "seed"]); + + // Project _rowid explicitly and assert all rows have distinct IDs. + let mut scanner = ds.scan(); + scanner.with_row_id(); + scanner.with_fragments(combine_for_scan(&ds, &staged)); + let stream = scanner.try_into_stream().await.unwrap(); + let projected: Vec<_> = stream.try_collect().await.unwrap(); + let row_ids: std::collections::BTreeSet = projected + .iter() + .flat_map(|b| { + let arr = b + .column_by_name("_rowid") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..arr.len()).map(|i| arr.value(i)).collect::>() + }) + .collect(); + assert_eq!( + row_ids.len(), + 3, + "all 3 rows (1 committed + 2 staged) should have distinct _rowid; \ + overlap implies stage_append failed to offset by prior_stages" + ); +} + +/// Helper for the chained-append test: replicate the primitive's +/// `combine_committed_with_staged` logic so the test can supply a custom +/// scanner that requests `_rowid`. Kept inline here to avoid making the +/// engine helper public. +fn combine_for_scan(ds: &Dataset, staged: &[StagedWrite]) -> Vec { + let removed: std::collections::HashSet = staged + .iter() + .flat_map(|w| w.removed_fragment_ids.iter().copied()) + .collect(); + let mut combined: Vec<_> = ds + .manifest + .fragments + .iter() + .filter(|f| !removed.contains(&f.id)) + .cloned() + .collect(); + for s in staged { + combined.extend(s.new_fragments.iter().cloned()); + } + combined +} + +/// `stage_append` + `commit_staged` round-trip: after commit, the +/// dataset's HEAD reflects the staged data and a fresh scan sees it. +/// Validates that our pre-assigned `row_id_meta` doesn't break Lance's +/// commit-time row-ID assignment (transaction.rs:2682). +#[tokio::test] +async fn stage_append_then_commit_persists_data() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + let pre_version = ds.version().version; + + let staged = store + .stage_append(&ds, person_batch(&[("bob", Some(25))]), &[]) + .await + .unwrap(); + + let new_ds = store + .commit_staged(Arc::new(ds.clone()), staged.transaction) + .await + .unwrap(); + assert!( + new_ds.version().version > pre_version, + "commit_staged must advance the dataset version" + ); + + // Reopen and confirm rows are visible at HEAD. + let reopened = Dataset::open(&uri).await.unwrap(); + let batches = store.scan_batches(&reopened).await.unwrap(); + assert_eq!(collect_ids(&batches), vec!["alice", "bob"]); +} + +/// `stage_merge_insert` + `commit_staged` round-trip: after commit, the +/// merged view (existing alice updated + new bob inserted) is visible. +#[tokio::test] +async fn stage_merge_insert_then_commit_persists_merged_view() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + + let staged = store + .stage_merge_insert( + ds.clone(), + person_batch(&[("alice", Some(31)), ("bob", Some(25))]), + vec!["id".to_string()], + WhenMatched::UpdateAll, + WhenNotMatched::InsertAll, + ) + .await + .unwrap(); + + store + .commit_staged(Arc::new(ds), staged.transaction) + .await + .unwrap(); + + let reopened = Dataset::open(&uri).await.unwrap(); + let batches = store.scan_batches(&reopened).await.unwrap(); + assert_eq!(collect_ids(&batches), vec!["alice", "bob"]); + + // Confirm alice was updated to age=31, not duplicated. + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 2, "merge_insert must not duplicate the matched row"); +} + +/// Filter pushdown via `scan_with_staged`: a SQL filter applies across +/// both committed and staged fragments. This validates the MR-794 +/// ticket's claim that Lance's `with_fragments` preserves pushdown +/// behavior (per Lance tests `test_scalar_index_respects_fragment_list` +/// etc.). +#[tokio::test] +async fn scan_with_staged_pushes_filter_through_committed_and_staged() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + // Committed: alice=30, carol=40 + let ds = TableStore::write_dataset( + &uri, + person_batch(&[("alice", Some(30)), ("carol", Some(40))]), + ) + .await + .unwrap(); + + // Staged: bob=25, dave=35 + let staged = store + .stage_append( + &ds, + person_batch(&[("bob", Some(25)), ("dave", Some(35))]), + &[], + ) + .await + .unwrap(); + + // Filter: age >= 30 → expect alice, carol, dave (not bob). + let batches = store + .scan_with_staged( + &ds, + std::slice::from_ref(&staged), + None, + Some("age >= 30"), + ) + .await + .unwrap(); + assert_eq!(collect_ids(&batches), vec!["alice", "carol", "dave"]); + + // Same filter as count: same arithmetic. + let count = store + .count_rows_with_staged( + &ds, + std::slice::from_ref(&staged), + Some("age >= 30".to_string()), + ) + .await + .unwrap(); + assert_eq!(count, 3); +} + /// **Documented contract** (see `stage_merge_insert` doc): chained /// `stage_merge_insert` calls on the same table whose source rows share /// keys cannot dedupe across stages. Each call's `MergeInsertBuilder` runs