MR-794 step 1: thread row-ID offset + add commit_staged + filter tests

Three follow-ups to the staged-writes primitives, all caught by the
"are we missing tests?" review:

(1) Path A row-ID threading (Gap 1, real bug):
stage_append now takes prior_stages: &[StagedWrite] and offsets the
assigned row IDs by the sum of prior stages' physical_rows. Without
this, two stage_appends against the same dataset both started at
ds.manifest.next_row_id, producing fragments with overlapping _rowid
ranges. This would have fired in Step 2+ on any multi-statement
mutation like `insert Knows ...; insert Knows ...` (multiple appends
to the same edge table — allowed under D₂′). The slice mirrors
scan_with_staged's API shape; the same slice is passed to both stage
and scan. Documented contract: only stage_append results in
prior_stages (D₂′ guarantees this upstream).

(2) commit_staged round-trip tests (Gap 2):
Two tests covering stage_append + commit_staged and stage_merge_insert
+ commit_staged. Validate that Lance's commit-time row-ID assignment
works correctly even after our pre-commit row_id_meta assignment in
the append path — the two assignments diverge but neither is observed
across the boundary.

(3) Filter pushdown test (Gap 3):
scan_with_staged with a SQL filter applies it across both committed
and staged fragments. Validates the MR-794 ticket's claim that Lance's
with_fragments preserves filter/vector/FTS pushdown (Lance tests
test_scalar_index_respects_fragment_list etc.).

Also adds chained_stage_appends_have_distinct_row_ids which directly
demonstrates the Gap 1 fix by projecting _rowid and asserting no
duplicates across 1 committed + 2 staged rows.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-04-30 23:59:59 +02:00
parent 714f1f0c0a
commit 61b3f5090b
No known key found for this signature in database
2 changed files with 289 additions and 18 deletions

View file

@ -564,19 +564,41 @@ impl TableStore {
/// uncommitted Lance transaction plus the new fragments for
/// read-your-writes.
///
/// On stable-row-id datasets we manually populate `row_id_meta` on the
/// cloned `new_fragments` we expose for `scan_with_staged`. Lance's
/// `InsertBuilder::execute_uncommitted` produces fragments with
/// `row_id_meta = None`; row IDs are normally assigned by
/// `prior_stages` is the slice of staged writes already accumulated
/// against the **same dataset** in the same query. Pass `&[]` for the
/// first call; pass the accumulated stages for subsequent calls. The
/// primitive uses this to offset row-ID assignment so chained
/// `stage_append` calls don't produce overlapping `_rowid` ranges.
/// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same
/// slice gets passed to both.
///
/// On stable-row-id datasets we manually populate `row_id_meta` on
/// the cloned `new_fragments` we expose for `scan_with_staged`.
/// Lance's `InsertBuilder::execute_uncommitted` produces fragments
/// with `row_id_meta = None`; row IDs are normally assigned by
/// `Transaction::assign_row_ids` during commit. Because
/// `scan_with_staged` reads the staged fragments *before* commit, the
/// scanner trips on a stable-row-id dataset (`Error::internal("Missing
/// row id meta")` from `dataset/rowids.rs:22`). The transaction's
/// internal fragment copy stays untouched — Lance assigns IDs there
/// independently at commit time, and the two ID assignments don't
/// have to agree because no caller threads `_rowid` from the staged
/// scan into the commit path.
pub async fn stage_append(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
/// `scan_with_staged` reads the staged fragments *before* commit,
/// the scanner trips on a stable-row-id dataset
/// (`Error::internal("Missing row id meta")` from
/// `dataset/rowids.rs:22`). The transaction's internal fragment copy
/// stays untouched — Lance assigns IDs there independently at commit
/// time, and the two ID assignments don't have to agree because no
/// caller threads `_rowid` from the staged scan into the commit
/// path.
///
/// **Contract: `prior_stages` must contain only previous
/// `stage_append` results against the same dataset.** Mixing
/// stage_merge_insert into `prior_stages` would over-count because
/// merge_insert's `new_fragments` include rewrites that don't add
/// rows. The engine's parse-time D₂ check (per touched table: all
/// stage_append OR exactly one stage_merge_insert) guarantees this
/// upstream; on the primitive layer it's the caller's responsibility.
pub async fn stage_append(
&self,
ds: &Dataset,
batch: RecordBatch,
prior_stages: &[StagedWrite],
) -> Result<StagedWrite> {
if batch.num_rows() == 0 {
return Err(OmniError::manifest_internal(
"stage_append called with empty batch".to_string(),
@ -603,7 +625,9 @@ impl TableStore {
}
};
if ds.manifest.uses_stable_row_ids() {
assign_row_id_meta(&mut new_fragments, ds.manifest.next_row_id)?;
let prior_rows = prior_stages_row_count(prior_stages)?;
let start_row_id = ds.manifest.next_row_id + prior_rows;
assign_row_id_meta(&mut new_fragments, start_row_id)?;
}
Ok(StagedWrite {
transaction,
@ -936,6 +960,29 @@ impl TableStore {
/// OR exactly one stage_merge_insert" at parse time (D₂ in
/// `exec/mutation.rs`) so this primitive's caller never chains merges.
/// See `stage_merge_insert` for the full contract.
/// Sum `physical_rows` across all fragments in the supplied stages.
/// Used by `stage_append` to compute the row-ID offset for chained
/// `stage_append` calls against the same dataset.
///
/// Assumes `prior_stages` contains only `stage_append` results — see
/// `stage_append`'s D₂ contract. For `stage_merge_insert` results the
/// `new_fragments` include rewrites that don't add new rows, so this
/// would over-count.
fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
let mut total: u64 = 0;
for stage in prior_stages {
for fragment in &stage.new_fragments {
let physical_rows = fragment.physical_rows.ok_or_else(|| {
OmniError::manifest_internal(
"prior_stages_row_count: fragment is missing physical_rows".to_string(),
)
})? as u64;
total += physical_rows;
}
}
Ok(total)
}
/// Assign sequential row IDs to fragments that lack them, starting from
/// `start_row_id`. Mirrors the relevant arm of Lance's
/// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`)

View file

@ -17,10 +17,13 @@
//! primitive's behavior so a future change either (a) preserves it or
//! (b) consciously fixes it (and updates this test).
use arrow_array::{Array, Int32Array, RecordBatch, StringArray};
use arrow_array::{Array, Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::{WhenMatched, WhenNotMatched};
use omnigraph::table_store::TableStore;
use lance_table::format::Fragment;
use omnigraph::table_store::{StagedWrite, TableStore};
use std::sync::Arc;
fn person_schema() -> Arc<Schema> {
@ -71,9 +74,9 @@ async fn stage_append_is_visible_via_scan_with_staged() {
.await
.unwrap();
// Stage a second row.
// Stage a second row. First call → empty prior_stages.
let staged = store
.stage_append(&ds, person_batch(&[("bob", Some(25))]))
.stage_append(&ds, person_batch(&[("bob", Some(25))]), &[])
.await
.unwrap();
@ -157,7 +160,11 @@ async fn count_rows_with_staged_matches_scan() {
.await
.unwrap();
let staged = store
.stage_append(&ds, person_batch(&[("bob", Some(25)), ("carol", Some(40))]))
.stage_append(
&ds,
person_batch(&[("bob", Some(25)), ("carol", Some(40))]),
&[],
)
.await
.unwrap();
@ -168,6 +175,223 @@ async fn count_rows_with_staged_matches_scan() {
assert_eq!(count, 3);
}
/// Two `stage_append` calls on the same dataset must produce
/// non-overlapping `_rowid` ranges. Without `prior_stages` threading,
/// both calls would assign IDs starting from `ds.manifest.next_row_id`,
/// producing overlapping ranges that break read paths consulting the
/// row-ID index (prefilter, vector search). With the slice threaded
/// through, the second call offsets by the first call's row count.
///
/// This is what enables the engine's multi-statement `insert Knows ...;
/// insert Knows ...` (multiple appends to the same edge table) under
/// the D₂ rule.
#[tokio::test]
async fn chained_stage_appends_have_distinct_row_ids() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))]))
.await
.unwrap();
let s1 = store
.stage_append(&ds, person_batch(&[("alice", Some(30))]), &[])
.await
.unwrap();
let s2 = store
.stage_append(
&ds,
person_batch(&[("bob", Some(25))]),
std::slice::from_ref(&s1),
)
.await
.unwrap();
// Scan with row IDs requested. If s1 and s2 had overlapping _rowid
// ranges, Lance's scanner would conflict (or surface duplicates) on
// the combined fragment list.
let staged = vec![s1, s2];
let batches = store
.scan_with_staged(&ds, &staged, None, None)
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(ids, vec!["alice", "bob", "seed"]);
// Project _rowid explicitly and assert all rows have distinct IDs.
let mut scanner = ds.scan();
scanner.with_row_id();
scanner.with_fragments(combine_for_scan(&ds, &staged));
let stream = scanner.try_into_stream().await.unwrap();
let projected: Vec<_> = stream.try_collect().await.unwrap();
let row_ids: std::collections::BTreeSet<u64> = projected
.iter()
.flat_map(|b| {
let arr = b
.column_by_name("_rowid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
(0..arr.len()).map(|i| arr.value(i)).collect::<Vec<_>>()
})
.collect();
assert_eq!(
row_ids.len(),
3,
"all 3 rows (1 committed + 2 staged) should have distinct _rowid; \
overlap implies stage_append failed to offset by prior_stages"
);
}
/// Helper for the chained-append test: replicate the primitive's
/// `combine_committed_with_staged` logic so the test can supply a custom
/// scanner that requests `_rowid`. Kept inline here to avoid making the
/// engine helper public.
fn combine_for_scan(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
let removed: std::collections::HashSet<u64> = staged
.iter()
.flat_map(|w| w.removed_fragment_ids.iter().copied())
.collect();
let mut combined: Vec<_> = ds
.manifest
.fragments
.iter()
.filter(|f| !removed.contains(&f.id))
.cloned()
.collect();
for s in staged {
combined.extend(s.new_fragments.iter().cloned());
}
combined
}
/// `stage_append` + `commit_staged` round-trip: after commit, the
/// dataset's HEAD reflects the staged data and a fresh scan sees it.
/// Validates that our pre-assigned `row_id_meta` doesn't break Lance's
/// commit-time row-ID assignment (transaction.rs:2682).
#[tokio::test]
async fn stage_append_then_commit_persists_data() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let pre_version = ds.version().version;
let staged = store
.stage_append(&ds, person_batch(&[("bob", Some(25))]), &[])
.await
.unwrap();
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert!(
new_ds.version().version > pre_version,
"commit_staged must advance the dataset version"
);
// Reopen and confirm rows are visible at HEAD.
let reopened = Dataset::open(&uri).await.unwrap();
let batches = store.scan_batches(&reopened).await.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "bob"]);
}
/// `stage_merge_insert` + `commit_staged` round-trip: after commit, the
/// merged view (existing alice updated + new bob inserted) is visible.
#[tokio::test]
async fn stage_merge_insert_then_commit_persists_merged_view() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let staged = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(31)), ("bob", Some(25))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
store
.commit_staged(Arc::new(ds), staged.transaction)
.await
.unwrap();
let reopened = Dataset::open(&uri).await.unwrap();
let batches = store.scan_batches(&reopened).await.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "bob"]);
// Confirm alice was updated to age=31, not duplicated.
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 2, "merge_insert must not duplicate the matched row");
}
/// Filter pushdown via `scan_with_staged`: a SQL filter applies across
/// both committed and staged fragments. This validates the MR-794
/// ticket's claim that Lance's `with_fragments` preserves pushdown
/// behavior (per Lance tests `test_scalar_index_respects_fragment_list`
/// etc.).
#[tokio::test]
async fn scan_with_staged_pushes_filter_through_committed_and_staged() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Committed: alice=30, carol=40
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("carol", Some(40))]),
)
.await
.unwrap();
// Staged: bob=25, dave=35
let staged = store
.stage_append(
&ds,
person_batch(&[("bob", Some(25)), ("dave", Some(35))]),
&[],
)
.await
.unwrap();
// Filter: age >= 30 → expect alice, carol, dave (not bob).
let batches = store
.scan_with_staged(
&ds,
std::slice::from_ref(&staged),
None,
Some("age >= 30"),
)
.await
.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "carol", "dave"]);
// Same filter as count: same arithmetic.
let count = store
.count_rows_with_staged(
&ds,
std::slice::from_ref(&staged),
Some("age >= 30".to_string()),
)
.await
.unwrap();
assert_eq!(count, 3);
}
/// **Documented contract** (see `stage_merge_insert` doc): chained
/// `stage_merge_insert` calls on the same table whose source rows share
/// keys cannot dedupe across stages. Each call's `MergeInsertBuilder` runs