diff --git a/crates/omnigraph/src/exec/mutation.rs b/crates/omnigraph/src/exec/mutation.rs index ab7df62..d7513a8 100644 --- a/crates/omnigraph/src/exec/mutation.rs +++ b/crates/omnigraph/src/exec/mutation.rs @@ -544,12 +544,15 @@ fn apply_assignments( /// with `Lance HEAD > manifest_version`. The next `mutate_as` against the /// same table will surface `ExpectedVersionMismatch` (Lance HEAD ahead of /// the manifest snapshot). Lance's `restore()` is *not* a rewind — it -/// creates a new commit, monotonically advancing the version. A proper fix -/// requires per-table Lance-internal branches (write to a transient branch, -/// fast-forward main on success, drop branch on failure); tracked as a -/// follow-up to MR-771. In practice this path is narrow: most validation -/// runs before any Lance write, so single-statement mutations are -/// unaffected. See `docs/runs.md`. +/// creates a new commit, monotonically advancing the version. The proper +/// fix uses Lance's distributed-write API (`write_fragments` / +/// `Scanner::with_fragments` / `Operation::Append { fragments }`) — see +/// [MR-794](https://linear.app/modernrelay/issue/MR-794). The staging +/// primitives `TableStore::stage_append` / `stage_merge_insert` / +/// `commit_staged` / `scan_with_staged` are in place; full integration +/// with this struct is the MR-794 follow-up. In practice this path is +/// narrow today: most validation runs before any Lance write, so +/// single-statement mutations are unaffected. See `docs/runs.md`. #[derive(Default)] struct MutationStaging { expected_versions: HashMap, diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index f9d641f..c9ff90a 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -4,14 +4,19 @@ use arrow_select::concat::concat_batches; use futures::TryStreamExt; use lance::Dataset; use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner}; -use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams}; +use lance::dataset::transaction::{Operation, Transaction}; +use lance::dataset::{ + CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, + WriteParams, +}; use lance::datatypes::BlobHandling; use lance::index::scalar::IndexDetails; use lance_file::version::LanceFileVersion; use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams}; use lance_index::{DatasetIndexExt, IndexType, is_system_index}; use lance_linalg::distance::MetricType; -use lance_table::format::IndexMetadata; +use lance_table::format::{Fragment, IndexMetadata, RowIdMeta}; +use lance_table::rowids::{RowIdSequence, write_row_ids}; use std::sync::Arc; use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write}; @@ -33,6 +38,46 @@ pub struct DeleteState { pub(crate) version_metadata: TableVersionMetadata, } +/// A Lance write that has produced fragment files on object storage but is +/// not yet committed to the dataset's manifest. The staged-write primitives +/// are defined here for later integration in `MutationStaging` +/// (`exec/mutation.rs`) and the loader (`loader/mod.rs`) — those rewires +/// land in [MR-794](https://linear.app/modernrelay/issue/MR-794) step 2+. +/// The intent: defer Lance commits to end-of-query so a mid-query failure +/// leaves the touched table at the pre-mutation HEAD instead of drifting +/// ahead. +/// +/// `transaction` is opaque from our side — Lance owns its semantics. We +/// commit it via `CommitBuilder::execute(transaction)` (see +/// `TableStore::commit_staged`). +/// +/// For read-your-writes within the same query, `new_fragments` and +/// `removed_fragment_ids` together describe the post-stage view delta: +/// `scan_with_staged` (and `count_rows_with_staged`) compose +/// `committed - removed + new` so subsequent reads see the staged result +/// without double-counting fragments that `Operation::Update` rewrote. +/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites +/// existing fragments would yield duplicate rows (the original fragment +/// stays in the committed manifest while its rewrite shows up in `new_fragments`). +#[derive(Debug, Clone)] +pub struct StagedWrite { + pub transaction: Transaction, + /// Fragments to surface alongside the committed manifest in + /// `Scanner::with_fragments(committed - removed + new)`. For + /// `Operation::Append` these are the freshly-appended fragments. For + /// `Operation::Update` (merge_insert) these are + /// `updated_fragments + new_fragments` (rewrites + freshly-inserted + /// rows). + pub new_fragments: Vec, + /// Fragment IDs that this staged write supersedes. The committed + /// manifest must filter these out before being combined with + /// `new_fragments` for read-your-writes scans, otherwise rewrites + /// yield duplicate rows. Empty for `stage_append` (`Operation::Append` + /// adds without removing anything); populated from + /// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`. + pub removed_fragment_ids: Vec, +} + #[derive(Debug, Clone)] pub struct TableStore { root_uri: String, @@ -500,6 +545,309 @@ impl TableStore { }) } + // ─── Staged-write API (MR-794) ─────────────────────────────────────────── + // + // These primitives wrap Lance's distributed-write API: each call writes + // fragment files to object storage but does NOT advance the dataset's + // HEAD or commit a manifest entry. The returned `Transaction` is held by + // the caller (typically `MutationStaging` or the loader's accumulator) + // and committed at end-of-query via `commit_staged`. On failure the + // fragments remain unreferenced and are reclaimed by `cleanup_old_versions`. + // + // The extracted `Vec` is for read-your-writes within the same + // query: subsequent ops construct a `Scanner` and call + // `scanner.with_fragments(staged.clone())` to see staged data alongside + // the committed snapshot. Lance's filter pushdown, vector search, and + // FTS all respect the supplied fragment list. + + /// Stage an append: write fragment files for `batch`, return the + /// uncommitted Lance transaction plus the new fragments for + /// read-your-writes. + /// + /// `prior_stages` is the slice of staged writes already accumulated + /// against the **same dataset** in the same query. Pass `&[]` for the + /// first call; pass the accumulated stages for subsequent calls. The + /// primitive uses this to offset row-ID assignment so chained + /// `stage_append` calls don't produce overlapping `_rowid` ranges. + /// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same + /// slice gets passed to both. + /// + /// On stable-row-id datasets we manually populate `row_id_meta` on + /// the cloned `new_fragments` we expose for `scan_with_staged`. + /// Lance's `InsertBuilder::execute_uncommitted` produces fragments + /// with `row_id_meta = None`; row IDs are normally assigned by + /// `Transaction::assign_row_ids` during commit. Because + /// `scan_with_staged` reads the staged fragments *before* commit, + /// the scanner trips on a stable-row-id dataset + /// (`Error::internal("Missing row id meta")` from + /// `dataset/rowids.rs:22`). The transaction's internal fragment copy + /// stays untouched — Lance assigns IDs there independently at commit + /// time, and the two ID assignments don't have to agree because no + /// caller threads `_rowid` from the staged scan into the commit + /// path. + /// + /// **Contract: `prior_stages` must contain only previous + /// `stage_append` results against the same dataset.** Mixing + /// stage_merge_insert into `prior_stages` would over-count because + /// merge_insert's `new_fragments` include rewrites that don't add + /// rows. The engine's parse-time D₂′ check (per touched table: all + /// stage_append OR exactly one stage_merge_insert) guarantees this + /// upstream; on the primitive layer it's the caller's responsibility. + pub async fn stage_append( + &self, + ds: &Dataset, + batch: RecordBatch, + prior_stages: &[StagedWrite], + ) -> Result { + if batch.num_rows() == 0 { + return Err(OmniError::manifest_internal( + "stage_append called with empty batch".to_string(), + )); + } + let params = WriteParams { + mode: WriteMode::Append, + allow_external_blob_outside_bases: true, + ..Default::default() + }; + let transaction = InsertBuilder::new(Arc::new(ds.clone())) + .with_params(¶ms) + .execute_uncommitted(vec![batch]) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let mut new_fragments = match &transaction.operation { + Operation::Append { fragments } => fragments.clone(), + Operation::Overwrite { fragments, .. } => fragments.clone(), + other => { + return Err(OmniError::manifest_internal(format!( + "stage_append: unexpected Lance operation {:?}", + std::mem::discriminant(other) + ))); + } + }; + // Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted` + // returns fragments with `id = 0` ("Temporary ID" — see lance-4.0.0 + // `dataset/write.rs:1044/1712`); the real assignment happens during + // commit via `Transaction::fragments_with_ids`. Because we expose + // these fragments to `scan_with_staged` *before* commit, two staged + // fragments (or one staged + the seed) would collide on `id = 0`, + // causing Lance's scanner to mishandle the combined list (silent + // duplicates / dropped rows). Mirror the commit-time renumbering + // here, using `ds.manifest.max_fragment_id() + 1` as the base and + // accounting for prior stages. + // ds.manifest.max_fragment_id is Option; cast up to u64 because + // Lance's Fragment::id (and the commit-time renumbering counter in + // Transaction::fragments_with_ids) operate on u64. + let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64 + + 1 + + prior_stages_fragment_count(prior_stages); + assign_fragment_ids(&mut new_fragments, next_id_base); + if ds.manifest.uses_stable_row_ids() { + let prior_rows = prior_stages_row_count(prior_stages)?; + let start_row_id = ds.manifest.next_row_id + prior_rows; + assign_row_id_meta(&mut new_fragments, start_row_id)?; + } + Ok(StagedWrite { + transaction, + new_fragments, + // Append never supersedes existing fragments. + removed_fragment_ids: Vec::new(), + }) + } + + /// Stage a merge_insert (upsert): write fragment files describing the + /// merge result, return the uncommitted transaction plus the new + /// fragments. The transaction's `Operation::Update` carries the + /// fragments-to-remove and fragments-to-add; for read-your-writes we + /// expose `new_fragments` (rows that will be visible after commit). + /// + /// **Contract: do not chain `stage_merge_insert` calls on the same + /// table within one query.** Each call's `MergeInsertBuilder` runs + /// against the supplied dataset's committed view — it does not see + /// fragments produced by a previous staged merge on the same table. + /// Two chained `stage_merge_insert`s whose source rows share keys will + /// each independently produce `Operation::Update` transactions whose + /// `new_fragments` contain a row for the shared key. `scan_with_staged` + /// (and `count_rows_with_staged`) will then return both — i.e. + /// **duplicates by key**. + /// + /// This is intrinsic to the underlying Lance API: there is no public + /// way to make `MergeInsertBuilder` see uncommitted fragments. The + /// engine's mutation path enforces the rule "per touched table: all + /// stage_append OR exactly one stage_merge_insert" at parse time + /// (the D₂′ check landing with [MR-794](https://linear.app/modernrelay/issue/MR-794) + /// step 2+ in `exec/mutation.rs`). Multi-table queries and append-chains + /// remain safe; only chained merges on a single table are rejected. + /// + /// Lift path: either a Lance API extension that lets + /// `MergeInsertBuilder` accept additional staged fragments, or an + /// in-memory pre-merge here that folds prior staged batches into the + /// input stream. See `docs/runs.md` and MR-793. + pub async fn stage_merge_insert( + &self, + ds: Dataset, + batch: RecordBatch, + key_columns: Vec, + when_matched: WhenMatched, + when_not_matched: WhenNotMatched, + ) -> Result { + if batch.num_rows() == 0 { + return Err(OmniError::manifest_internal( + "stage_merge_insert called with empty batch".to_string(), + )); + } + let ds = Arc::new(ds); + let job = MergeInsertBuilder::try_new(ds, key_columns) + .map_err(|e| OmniError::Lance(e.to_string()))? + .when_matched(when_matched) + .when_not_matched(when_not_matched) + .try_build() + .map_err(|e| OmniError::Lance(e.to_string()))?; + let schema = batch.schema(); + let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema); + let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader)); + let uncommitted = job + .execute_uncommitted(stream) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + // Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } — + // `new_fragments` are the freshly inserted rows; `updated_fragments` + // are rewrites of existing fragments that include both retained and + // updated rows; `removed_fragment_ids` lists the committed-manifest + // fragments that those rewrites supersede. For read-your-writes we + // expose `updated_fragments + new_fragments` and the + // `removed_fragment_ids` so `scan_with_staged` can filter the + // superseded committed fragments before combining — otherwise a + // single merge_insert appears as duplicate rows (original committed + // version + rewritten staged version). + let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation { + Operation::Update { + new_fragments, + updated_fragments, + removed_fragment_ids, + .. + } => { + let mut all = updated_fragments.clone(); + all.extend(new_fragments.iter().cloned()); + (all, removed_fragment_ids.clone()) + } + Operation::Append { fragments } => (fragments.clone(), Vec::new()), + other => { + return Err(OmniError::manifest_internal(format!( + "stage_merge_insert: unexpected Lance operation {:?}", + std::mem::discriminant(other) + ))); + } + }; + Ok(StagedWrite { + transaction: uncommitted.transaction, + new_fragments, + removed_fragment_ids, + }) + } + + /// Commit a previously-staged transaction onto `ds`, returning the new + /// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by + /// the publisher at end-of-query to materialize all staged writes before + /// the meta-manifest commit. + pub async fn commit_staged( + &self, + ds: Arc, + transaction: Transaction, + ) -> Result { + CommitBuilder::new(ds) + .execute(transaction) + .await + .map_err(|e| OmniError::Lance(e.to_string())) + } + + /// Run a scan with optional uncommitted staged writes visible + /// alongside the committed snapshot. When `staged` is empty this is + /// identical to `scan(...)`. + /// + /// Composes the visible fragment list as `committed - removed + new`: + /// the committed manifest's fragments, minus any fragment IDs that + /// staged `Operation::Update`s (merge_insert rewrites) have superseded, + /// plus the staged new/updated fragments. Without the `removed` + /// filter, a merge_insert that rewrites an existing fragment would + /// surface twice — once via the original committed fragment, once via + /// the rewrite in `new_fragments`. + /// + /// **Filter contract is incomplete on staged fragments.** When `filter` + /// is `Some(...)`, Lance pushes the predicate to per-fragment scans + /// with stats-based pruning. Uncommitted fragments produced by + /// `write_fragments_internal` lack the per-column statistics that + /// committed fragments carry; Lance's optimizer drops them from the + /// filtered scan even when their data would match. Staged-fragment + /// rows are silently absent from the result. `scanner.use_stats(false)` + /// does not fix this in lance 4.0.0. Callers needing correct filtered + /// reads against staged data should use a different strategy (the + /// engine's MR-794 step 2+ design uses in-memory pending-batch + /// accumulation + DataFusion `MemTable` instead — see + /// `.context/mr-794-step2-design.md`). + /// + /// This method remains on the surface for primitive-level testing + /// (basic stage + scan correctness without filters works) and for + /// callers that don't need filter pushdown. + pub async fn scan_with_staged( + &self, + ds: &Dataset, + staged: &[StagedWrite], + projection: Option<&[&str]>, + filter: Option<&str>, + ) -> Result> { + if staged.is_empty() { + return self.scan(ds, projection, filter, None).await; + } + let mut scanner = ds.scan(); + if let Some(cols) = projection { + let owned: Vec = cols.iter().map(|s| s.to_string()).collect(); + scanner + .project(&owned) + .map_err(|e| OmniError::Lance(e.to_string()))?; + } + if let Some(f) = filter { + scanner + .filter(f) + .map_err(|e| OmniError::Lance(e.to_string()))?; + } + scanner.with_fragments(combine_committed_with_staged(ds, staged)); + let stream = scanner + .try_into_stream() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + stream + .try_collect() + .await + .map_err(|e| OmniError::Lance(e.to_string())) + } + + /// `count_rows` variant that respects staged writes. Used for + /// edge-cardinality validation that needs to see staged edges before + /// commit. Same `committed - removed + new` composition as + /// `scan_with_staged`. + pub async fn count_rows_with_staged( + &self, + ds: &Dataset, + staged: &[StagedWrite], + filter: Option, + ) -> Result { + if staged.is_empty() { + return self.count_rows(ds, filter).await; + } + let mut scanner = ds.scan(); + if let Some(f) = filter { + scanner + .filter(&f) + .map_err(|e| OmniError::Lance(e.to_string()))?; + } + scanner.with_fragments(combine_committed_with_staged(ds, staged)); + let count = scanner + .count_rows() + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + Ok(count as usize) + } + async fn user_indices_for_column( &self, ds: &Dataset, @@ -620,3 +968,120 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } } + +/// Build the `Scanner::with_fragments` argument for read-your-writes: +/// committed manifest fragments minus any fragment IDs superseded by the +/// staged writes, plus the staged `new_fragments`. Order is: +/// 1. committed fragments whose IDs are NOT in any staged +/// `removed_fragment_ids` (preserves committed order), +/// 2. all staged `new_fragments` in stage order. +/// +/// Lance's `Scanner` does not require any particular ordering between +/// committed and staged fragments — `with_fragments` scopes the scan to +/// exactly the supplied list. The dedup matters because merge_insert +/// rewrites a fragment in place at the Lance layer: the rewritten +/// fragment is in `new_fragments`, the original (which it supersedes) is +/// in `committed` until manifest commit, and including both would yield +/// duplicate rows. +/// +/// **Inter-stage supersession is not handled here.** Each StagedWrite's +/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a +/// later staged merge cannot know about an earlier staged merge's +/// fragments (Lance's `MergeInsertBuilder` runs against the committed +/// view). If two `stage_merge_insert`s on the same table produce rows +/// with the same key, the combined view returns duplicates by key. The +/// engine's mutation path enforces "per touched table: all stage_append +/// OR exactly one stage_merge_insert" at parse time (D₂′ in +/// `exec/mutation.rs`) so this primitive's caller never chains merges. +/// See `stage_merge_insert` for the full contract. +/// Sum `physical_rows` across all fragments in the supplied stages. +/// Used by `stage_append` to compute the row-ID offset for chained +/// `stage_append` calls against the same dataset. +/// +/// Assumes `prior_stages` contains only `stage_append` results — see +/// `stage_append`'s D₂′ contract. For `stage_merge_insert` results the +/// `new_fragments` include rewrites that don't add new rows, so this +/// would over-count. +fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 { + prior_stages + .iter() + .map(|s| s.new_fragments.len() as u64) + .sum() +} + +/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's +/// commit-time `Transaction::fragments_with_ids` (lance-4.0.0 +/// `dataset/transaction.rs:1456`) — fragments produced by +/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary +/// placeholder; we renumber here so they don't collide with committed +/// fragments (or with each other across chained stages) when the slice is +/// passed to `Scanner::with_fragments`. +fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) { + for (i, fragment) in fragments.iter_mut().enumerate() { + if fragment.id == 0 { + fragment.id = start_id + i as u64; + } + } +} + +fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result { + let mut total: u64 = 0; + for stage in prior_stages { + for fragment in &stage.new_fragments { + let physical_rows = fragment.physical_rows.ok_or_else(|| { + OmniError::manifest_internal( + "prior_stages_row_count: fragment is missing physical_rows".to_string(), + ) + })? as u64; + total += physical_rows; + } + } + Ok(total) +} + +/// Assign sequential row IDs to fragments that lack them, starting from +/// `start_row_id`. Mirrors the relevant arm of Lance's +/// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`) +/// for the `row_id_meta = None` case — fragments produced by +/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset. +/// +/// Used only by `stage_append` for read-your-writes — see its docstring +/// for why pre-commit assignment is needed and why diverging from Lance's +/// commit-time IDs is safe. +fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> { + let mut next_row_id = start_row_id; + for fragment in fragments { + if fragment.row_id_meta.is_some() { + continue; + } + let physical_rows = fragment.physical_rows.ok_or_else(|| { + OmniError::manifest_internal( + "stage_append: fragment is missing physical_rows".to_string(), + ) + })? as u64; + let row_ids = next_row_id..(next_row_id + physical_rows); + let sequence = RowIdSequence::from(row_ids); + let serialized = write_row_ids(&sequence); + fragment.row_id_meta = Some(RowIdMeta::Inline(serialized)); + next_row_id += physical_rows; + } + Ok(()) +} + +fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec { + let removed: std::collections::HashSet = staged + .iter() + .flat_map(|w| w.removed_fragment_ids.iter().copied()) + .collect(); + let mut combined: Vec = ds + .manifest + .fragments + .iter() + .filter(|f| !removed.contains(&f.id)) + .cloned() + .collect(); + for write in staged { + combined.extend(write.new_fragments.iter().cloned()); + } + combined +} diff --git a/crates/omnigraph/tests/staged_writes.rs b/crates/omnigraph/tests/staged_writes.rs new file mode 100644 index 0000000..811ccaf --- /dev/null +++ b/crates/omnigraph/tests/staged_writes.rs @@ -0,0 +1,490 @@ +//! Primitive-level tests for `TableStore`'s staged-write API +//! (MR-794 step 1). These exercise `stage_append`, `stage_merge_insert`, +//! `scan_with_staged`, and `count_rows_with_staged` directly against a +//! Lance dataset — no Omnigraph engine involved. The engine-level rewire +//! (MR-794 step 2+) lives in `tests/runs.rs` once it lands. +//! +//! Test surface here: +//! 1. `stage_append` + `scan_with_staged` shows committed + staged data +//! without duplicates. +//! 2. `stage_merge_insert` of a row that supersedes a committed fragment +//! surfaces only the rewritten row, not both — the +//! `removed_fragment_ids` dedup landed in PR #66's `730631c`. +//! 3. **Documented contract**: chained `stage_merge_insert` calls on the +//! same dataset whose source rows share keys produce duplicate rows in +//! `scan_with_staged`. The engine's parse-time D₂′ check (MR-794 step +//! 2+) prevents callers from triggering this; this test pins the +//! primitive's behavior so a future change either (a) preserves it or +//! (b) consciously fixes it (and updates this test). + +use arrow_array::{Array, Int32Array, RecordBatch, StringArray, UInt64Array}; +use arrow_schema::{DataType, Field, Schema}; +use futures::TryStreamExt; +use lance::Dataset; +use lance::dataset::{WhenMatched, WhenNotMatched}; +use lance_table::format::Fragment; +use omnigraph::table_store::{StagedWrite, TableStore}; +use std::sync::Arc; + +fn person_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("age", DataType::Int32, true), + ])) +} + +fn person_batch(rows: &[(&str, Option)]) -> RecordBatch { + let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect(); + let ages: Vec> = rows.iter().map(|(_, age)| *age).collect(); + RecordBatch::try_new( + person_schema(), + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(Int32Array::from(ages)), + ], + ) + .unwrap() +} + +fn collect_ids(batches: &[RecordBatch]) -> Vec { + let mut out = Vec::new(); + for b in batches { + let ids = b + .column_by_name("id") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ids.len() { + out.push(ids.value(i).to_string()); + } + } + out.sort(); + out +} + +#[tokio::test] +async fn stage_append_is_visible_via_scan_with_staged() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + // Seed: one committed row. + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + + // Stage a second row. First call → empty prior_stages. + let staged = store + .stage_append(&ds, person_batch(&[("bob", Some(25))]), &[]) + .await + .unwrap(); + + // scan_with_staged sees both committed alice + staged bob, no duplicates. + let batches = store + .scan_with_staged(&ds, std::slice::from_ref(&staged), None, None) + .await + .unwrap(); + assert_eq!(collect_ids(&batches), vec!["alice", "bob"]); + + // Plain scan (no staged) still sees only committed alice — dataset HEAD + // hasn't moved. + let plain = store.scan_batches(&ds).await.unwrap(); + assert_eq!(collect_ids(&plain), vec!["alice"]); +} + +#[tokio::test] +async fn stage_merge_insert_dedupes_superseded_committed_fragment() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + // Seed: alice age 30 in one committed fragment. + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + + // Stage a merge_insert that rewrites alice's row. This produces an + // Operation::Update whose removed_fragment_ids excludes the committed + // fragment that contained the old alice. + let staged = store + .stage_merge_insert( + ds.clone(), + person_batch(&[("alice", Some(31))]), + vec!["id".to_string()], + WhenMatched::UpdateAll, + WhenNotMatched::InsertAll, + ) + .await + .unwrap(); + assert!( + !staged.removed_fragment_ids.is_empty(), + "merge_insert that rewrites a committed row must set removed_fragment_ids \ + (this is the dedup invariant from PR #66 commit 730631c — its absence \ + was caught by Cubic/Cursor/Codex on PR #66)" + ); + + // scan_with_staged: alice appears exactly once, with the new age. + let batches = store + .scan_with_staged(&ds, std::slice::from_ref(&staged), None, None) + .await + .unwrap(); + let ids = collect_ids(&batches); + assert_eq!(ids, vec!["alice"], "merge_insert must not surface duplicates"); + + // Confirm the visible row is the rewritten one. + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 1); + let ages: Vec = batches + .iter() + .flat_map(|b| { + let col = b + .column_by_name("age") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..col.len()).map(|i| col.value(i)).collect::>() + }) + .collect(); + assert_eq!(ages, vec![31]); +} + +#[tokio::test] +async fn count_rows_with_staged_matches_scan() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + let staged = store + .stage_append( + &ds, + person_batch(&[("bob", Some(25)), ("carol", Some(40))]), + &[], + ) + .await + .unwrap(); + + let count = store + .count_rows_with_staged(&ds, std::slice::from_ref(&staged), None) + .await + .unwrap(); + assert_eq!(count, 3); +} + +/// Two `stage_append` calls on the same dataset must produce +/// non-overlapping `_rowid` ranges. Without `prior_stages` threading, +/// both calls would assign IDs starting from `ds.manifest.next_row_id`, +/// producing overlapping ranges that break read paths consulting the +/// row-ID index (prefilter, vector search). With the slice threaded +/// through, the second call offsets by the first call's row count. +/// +/// This is what enables the engine's multi-statement `insert Knows ...; +/// insert Knows ...` (multiple appends to the same edge table) under +/// the D₂′ rule. +#[tokio::test] +async fn chained_stage_appends_have_distinct_row_ids() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))])) + .await + .unwrap(); + + let s1 = store + .stage_append(&ds, person_batch(&[("alice", Some(30))]), &[]) + .await + .unwrap(); + let s2 = store + .stage_append( + &ds, + person_batch(&[("bob", Some(25))]), + std::slice::from_ref(&s1), + ) + .await + .unwrap(); + + // Scan with row IDs requested. If s1 and s2 had overlapping _rowid + // ranges, Lance's scanner would conflict (or surface duplicates) on + // the combined fragment list. + let staged = vec![s1, s2]; + let batches = store + .scan_with_staged(&ds, &staged, None, None) + .await + .unwrap(); + let ids = collect_ids(&batches); + assert_eq!(ids, vec!["alice", "bob", "seed"]); + + // Project _rowid explicitly and assert all rows have distinct IDs. + let mut scanner = ds.scan(); + scanner.with_row_id(); + scanner.with_fragments(combine_for_scan(&ds, &staged)); + let stream = scanner.try_into_stream().await.unwrap(); + let projected: Vec<_> = stream.try_collect().await.unwrap(); + let row_ids: std::collections::BTreeSet = projected + .iter() + .flat_map(|b| { + let arr = b + .column_by_name("_rowid") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..arr.len()).map(|i| arr.value(i)).collect::>() + }) + .collect(); + assert_eq!( + row_ids.len(), + 3, + "all 3 rows (1 committed + 2 staged) should have distinct _rowid; \ + overlap implies stage_append failed to offset by prior_stages" + ); +} + +/// Helper for the chained-append test: replicate the primitive's +/// `combine_committed_with_staged` logic so the test can supply a custom +/// scanner that requests `_rowid`. Kept inline here to avoid making the +/// engine helper public. +fn combine_for_scan(ds: &Dataset, staged: &[StagedWrite]) -> Vec { + let removed: std::collections::HashSet = staged + .iter() + .flat_map(|w| w.removed_fragment_ids.iter().copied()) + .collect(); + let mut combined: Vec<_> = ds + .manifest + .fragments + .iter() + .filter(|f| !removed.contains(&f.id)) + .cloned() + .collect(); + for s in staged { + combined.extend(s.new_fragments.iter().cloned()); + } + combined +} + +/// `stage_append` + `commit_staged` round-trip: after commit, the +/// dataset's HEAD reflects the staged data and a fresh scan sees it. +/// Validates that our pre-assigned `row_id_meta` doesn't break Lance's +/// commit-time row-ID assignment (transaction.rs:2682). +#[tokio::test] +async fn stage_append_then_commit_persists_data() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + let pre_version = ds.version().version; + + let staged = store + .stage_append(&ds, person_batch(&[("bob", Some(25))]), &[]) + .await + .unwrap(); + + let new_ds = store + .commit_staged(Arc::new(ds.clone()), staged.transaction) + .await + .unwrap(); + assert!( + new_ds.version().version > pre_version, + "commit_staged must advance the dataset version" + ); + + // Reopen and confirm rows are visible at HEAD. + let reopened = Dataset::open(&uri).await.unwrap(); + let batches = store.scan_batches(&reopened).await.unwrap(); + assert_eq!(collect_ids(&batches), vec!["alice", "bob"]); +} + +/// `stage_merge_insert` + `commit_staged` round-trip: after commit, the +/// merged view (existing alice updated + new bob inserted) is visible. +#[tokio::test] +async fn stage_merge_insert_then_commit_persists_merged_view() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))])) + .await + .unwrap(); + + let staged = store + .stage_merge_insert( + ds.clone(), + person_batch(&[("alice", Some(31)), ("bob", Some(25))]), + vec!["id".to_string()], + WhenMatched::UpdateAll, + WhenNotMatched::InsertAll, + ) + .await + .unwrap(); + + store + .commit_staged(Arc::new(ds), staged.transaction) + .await + .unwrap(); + + let reopened = Dataset::open(&uri).await.unwrap(); + let batches = store.scan_batches(&reopened).await.unwrap(); + assert_eq!(collect_ids(&batches), vec!["alice", "bob"]); + + // Confirm alice was updated to age=31, not duplicated. + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 2, "merge_insert must not duplicate the matched row"); +} + +/// **Documented limitation** (see `scan_with_staged` doc): when a filter +/// is supplied, Lance's stats-based pruning drops the staged fragment from +/// the filtered scan because uncommitted fragments produced by +/// `write_fragments_internal` lack per-column statistics. The result +/// contains only matching committed rows; matching staged rows are +/// silently absent. `scanner.use_stats(false)` does not bypass this in +/// lance 4.0.0. +/// +/// This test pins the actual behavior so a future change either preserves +/// it (and updates the doc) or fixes it (and rewrites this test). The +/// engine's MR-794 step 2+ design uses in-memory pending-batch +/// accumulation + DataFusion `MemTable` for read-your-writes instead, so +/// production code is unaffected. +#[tokio::test] +async fn scan_with_staged_with_filter_silently_drops_staged_rows() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + // Committed: alice=30, carol=40 + let ds = TableStore::write_dataset( + &uri, + person_batch(&[("alice", Some(30)), ("carol", Some(40))]), + ) + .await + .unwrap(); + + // Staged: bob=25, dave=35 + let staged = store + .stage_append( + &ds, + person_batch(&[("bob", Some(25)), ("dave", Some(35))]), + &[], + ) + .await + .unwrap(); + + // Filter: age >= 30. Correct semantics would return alice, carol, dave. + // Actual: dave (staged, age=35) is dropped — only the committed matches + // come back. + let batches = store + .scan_with_staged( + &ds, + std::slice::from_ref(&staged), + None, + Some("age >= 30"), + ) + .await + .unwrap(); + assert_eq!( + collect_ids(&batches), + vec!["alice", "carol"], + "documented limitation: filter pushdown drops staged fragments. \ + If you're here because this assertion failed: either (a) Lance \ + exposed a way to scan uncommitted fragments without stats-based \ + pruning (good — update to assert == [alice, carol, dave]), or \ + (b) something changed in our scan_with_staged path. See PR #67 \ + test fix discussion + .context/mr-794-step2-design.md §1.1." + ); + + // Without filter, staged data IS visible — confirms the issue is + // specifically filter pushdown, not fragment scanning per se. + let unfiltered = store + .scan_with_staged( + &ds, + std::slice::from_ref(&staged), + None, + None, + ) + .await + .unwrap(); + assert_eq!( + collect_ids(&unfiltered), + vec!["alice", "bob", "carol", "dave"], + "unfiltered scan_with_staged returns all rows correctly" + ); +} + +/// **Documented contract** (see `stage_merge_insert` doc): chained +/// `stage_merge_insert` calls on the same table whose source rows share +/// keys cannot dedupe across stages. Each call's `MergeInsertBuilder` runs +/// against the committed view; neither sees the other's staged fragments. +/// The combined `scan_with_staged` therefore returns the shared key +/// twice. +/// +/// The engine's mutation path enforces D₂′ (per touched table: all +/// stage_append OR exactly one stage_merge_insert) at parse time so this +/// scenario is unreachable through public APIs. This test pins the +/// primitive behavior — if a future change makes the primitive itself +/// dedupe across stages (e.g. via a Lance API extension or in-memory +/// pre-merge), update this assertion. +#[tokio::test] +async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior() { + let dir = tempfile::tempdir().unwrap(); + let uri = format!("{}/people.lance", dir.path().to_str().unwrap()); + let store = TableStore::new(dir.path().to_str().unwrap()); + + // Seed empty (an unrelated row keeps the schema unambiguous). + let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))])) + .await + .unwrap(); + + // Op-1: stage merge_insert of alice. Against committed view: alice + // doesn't exist, so this lands as a fresh insert into Operation::Update.new_fragments. + let staged_1 = store + .stage_merge_insert( + ds.clone(), + person_batch(&[("alice", Some(30))]), + vec!["id".to_string()], + WhenMatched::UpdateAll, + WhenNotMatched::InsertAll, + ) + .await + .unwrap(); + + // Op-2: stage merge_insert of alice with a different age. Also runs + // against the committed view (alice doesn't exist there either), so + // Lance produces another fresh insert. Op-2 has no knowledge of + // op-1's staged fragments. + let staged_2 = store + .stage_merge_insert( + ds.clone(), + person_batch(&[("alice", Some(31))]), + vec!["id".to_string()], + WhenMatched::UpdateAll, + WhenNotMatched::InsertAll, + ) + .await + .unwrap(); + + // scan_with_staged sees committed (seed) + op-1.new (alice age=30) + + // op-2.new (alice age=31). Alice appears twice — the documented + // contract violation that D₂′ prevents at the engine layer. + let batches = store + .scan_with_staged(&ds, &[staged_1, staged_2], None, None) + .await + .unwrap(); + let ids = collect_ids(&batches); + let alice_count = ids.iter().filter(|id| *id == "alice").count(); + assert_eq!( + alice_count, 2, + "chained stage_merge_insert with shared key produces duplicates — \ + this is the contract documented on stage_merge_insert. If you're \ + here because this assertion failed: either (a) the primitive was \ + improved to dedupe across stages (good — update to assert == 1) \ + or (b) something subtler broke (investigate before changing the \ + assertion). See PR #67 Codex P1 thread + .context/mr-794-step2-design.md §3.1." + ); +}