Merge pull request #67 from ModernRelay/ragnorc/staged-writes-redo

MR-794 step 1: re-land staged-write primitives in TableStore
This commit is contained in:
Ragnor Comerford 2026-05-01 01:13:47 +02:00 committed by GitHub
commit facab010e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 966 additions and 8 deletions

View file

@ -544,12 +544,15 @@ fn apply_assignments(
/// with `Lance HEAD > manifest_version`. The next `mutate_as` against the
/// same table will surface `ExpectedVersionMismatch` (Lance HEAD ahead of
/// the manifest snapshot). Lance's `restore()` is *not* a rewind — it
/// creates a new commit, monotonically advancing the version. A proper fix
/// requires per-table Lance-internal branches (write to a transient branch,
/// fast-forward main on success, drop branch on failure); tracked as a
/// follow-up to MR-771. In practice this path is narrow: most validation
/// runs before any Lance write, so single-statement mutations are
/// unaffected. See `docs/runs.md`.
/// creates a new commit, monotonically advancing the version. The proper
/// fix uses Lance's distributed-write API (`write_fragments` /
/// `Scanner::with_fragments` / `Operation::Append { fragments }`) — see
/// [MR-794](https://linear.app/modernrelay/issue/MR-794). The staging
/// primitives `TableStore::stage_append` / `stage_merge_insert` /
/// `commit_staged` / `scan_with_staged` are in place; full integration
/// with this struct is the MR-794 follow-up. In practice this path is
/// narrow today: most validation runs before any Lance write, so
/// single-statement mutations are unaffected. See `docs/runs.md`.
#[derive(Default)]
struct MutationStaging {
expected_versions: HashMap<String, u64>,

View file

@ -4,14 +4,19 @@ use arrow_select::concat::concat_batches;
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner};
use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode, WriteParams};
use lance::dataset::transaction::{Operation, Transaction};
use lance::dataset::{
CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
WriteParams,
};
use lance::datatypes::BlobHandling;
use lance::index::scalar::IndexDetails;
use lance_file::version::LanceFileVersion;
use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
use lance_index::{DatasetIndexExt, IndexType, is_system_index};
use lance_linalg::distance::MetricType;
use lance_table::format::IndexMetadata;
use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::sync::Arc;
use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
@ -33,6 +38,46 @@ pub struct DeleteState {
pub(crate) version_metadata: TableVersionMetadata,
}
/// A Lance write that has produced fragment files on object storage but is
/// not yet committed to the dataset's manifest. The staged-write primitives
/// are defined here for later integration in `MutationStaging`
/// (`exec/mutation.rs`) and the loader (`loader/mod.rs`) — those rewires
/// land in [MR-794](https://linear.app/modernrelay/issue/MR-794) step 2+.
/// The intent: defer Lance commits to end-of-query so a mid-query failure
/// leaves the touched table at the pre-mutation HEAD instead of drifting
/// ahead.
///
/// `transaction` is opaque from our side — Lance owns its semantics. We
/// commit it via `CommitBuilder::execute(transaction)` (see
/// `TableStore::commit_staged`).
///
/// For read-your-writes within the same query, `new_fragments` and
/// `removed_fragment_ids` together describe the post-stage view delta:
/// `scan_with_staged` (and `count_rows_with_staged`) compose
/// `committed - removed + new` so subsequent reads see the staged result
/// without double-counting fragments that `Operation::Update` rewrote.
/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites
/// existing fragments would yield duplicate rows (the original fragment
/// stays in the committed manifest while its rewrite shows up in `new_fragments`).
#[derive(Debug, Clone)]
pub struct StagedWrite {
pub transaction: Transaction,
/// Fragments to surface alongside the committed manifest in
/// `Scanner::with_fragments(committed - removed + new)`. For
/// `Operation::Append` these are the freshly-appended fragments. For
/// `Operation::Update` (merge_insert) these are
/// `updated_fragments + new_fragments` (rewrites + freshly-inserted
/// rows).
pub new_fragments: Vec<Fragment>,
/// Fragment IDs that this staged write supersedes. The committed
/// manifest must filter these out before being combined with
/// `new_fragments` for read-your-writes scans, otherwise rewrites
/// yield duplicate rows. Empty for `stage_append` (`Operation::Append`
/// adds without removing anything); populated from
/// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`.
pub removed_fragment_ids: Vec<u64>,
}
#[derive(Debug, Clone)]
pub struct TableStore {
root_uri: String,
@ -500,6 +545,309 @@ impl TableStore {
})
}
// ─── Staged-write API (MR-794) ───────────────────────────────────────────
//
// These primitives wrap Lance's distributed-write API: each call writes
// fragment files to object storage but does NOT advance the dataset's
// HEAD or commit a manifest entry. The returned `Transaction` is held by
// the caller (typically `MutationStaging` or the loader's accumulator)
// and committed at end-of-query via `commit_staged`. On failure the
// fragments remain unreferenced and are reclaimed by `cleanup_old_versions`.
//
// The extracted `Vec<Fragment>` is for read-your-writes within the same
// query: subsequent ops construct a `Scanner` and call
// `scanner.with_fragments(staged.clone())` to see staged data alongside
// the committed snapshot. Lance's filter pushdown, vector search, and
// FTS all respect the supplied fragment list.
/// Stage an append: write fragment files for `batch`, return the
/// uncommitted Lance transaction plus the new fragments for
/// read-your-writes.
///
/// `prior_stages` is the slice of staged writes already accumulated
/// against the **same dataset** in the same query. Pass `&[]` for the
/// first call; pass the accumulated stages for subsequent calls. The
/// primitive uses this to offset row-ID assignment so chained
/// `stage_append` calls don't produce overlapping `_rowid` ranges.
/// Mirrors `scan_with_staged`'s `&[StagedWrite]` shape — the same
/// slice gets passed to both.
///
/// On stable-row-id datasets we manually populate `row_id_meta` on
/// the cloned `new_fragments` we expose for `scan_with_staged`.
/// Lance's `InsertBuilder::execute_uncommitted` produces fragments
/// with `row_id_meta = None`; row IDs are normally assigned by
/// `Transaction::assign_row_ids` during commit. Because
/// `scan_with_staged` reads the staged fragments *before* commit,
/// the scanner trips on a stable-row-id dataset
/// (`Error::internal("Missing row id meta")` from
/// `dataset/rowids.rs:22`). The transaction's internal fragment copy
/// stays untouched — Lance assigns IDs there independently at commit
/// time, and the two ID assignments don't have to agree because no
/// caller threads `_rowid` from the staged scan into the commit
/// path.
///
/// **Contract: `prior_stages` must contain only previous
/// `stage_append` results against the same dataset.** Mixing
/// stage_merge_insert into `prior_stages` would over-count because
/// merge_insert's `new_fragments` include rewrites that don't add
/// rows. The engine's parse-time D₂ check (per touched table: all
/// stage_append OR exactly one stage_merge_insert) guarantees this
/// upstream; on the primitive layer it's the caller's responsibility.
pub async fn stage_append(
&self,
ds: &Dataset,
batch: RecordBatch,
prior_stages: &[StagedWrite],
) -> Result<StagedWrite> {
if batch.num_rows() == 0 {
return Err(OmniError::manifest_internal(
"stage_append called with empty batch".to_string(),
));
}
let params = WriteParams {
mode: WriteMode::Append,
allow_external_blob_outside_bases: true,
..Default::default()
};
let transaction = InsertBuilder::new(Arc::new(ds.clone()))
.with_params(&params)
.execute_uncommitted(vec![batch])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let mut new_fragments = match &transaction.operation {
Operation::Append { fragments } => fragments.clone(),
Operation::Overwrite { fragments, .. } => fragments.clone(),
other => {
return Err(OmniError::manifest_internal(format!(
"stage_append: unexpected Lance operation {:?}",
std::mem::discriminant(other)
)));
}
};
// Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted`
// returns fragments with `id = 0` ("Temporary ID" — see lance-4.0.0
// `dataset/write.rs:1044/1712`); the real assignment happens during
// commit via `Transaction::fragments_with_ids`. Because we expose
// these fragments to `scan_with_staged` *before* commit, two staged
// fragments (or one staged + the seed) would collide on `id = 0`,
// causing Lance's scanner to mishandle the combined list (silent
// duplicates / dropped rows). Mirror the commit-time renumbering
// here, using `ds.manifest.max_fragment_id() + 1` as the base and
// accounting for prior stages.
// ds.manifest.max_fragment_id is Option<u32>; cast up to u64 because
// Lance's Fragment::id (and the commit-time renumbering counter in
// Transaction::fragments_with_ids) operate on u64.
let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
+ 1
+ prior_stages_fragment_count(prior_stages);
assign_fragment_ids(&mut new_fragments, next_id_base);
if ds.manifest.uses_stable_row_ids() {
let prior_rows = prior_stages_row_count(prior_stages)?;
let start_row_id = ds.manifest.next_row_id + prior_rows;
assign_row_id_meta(&mut new_fragments, start_row_id)?;
}
Ok(StagedWrite {
transaction,
new_fragments,
// Append never supersedes existing fragments.
removed_fragment_ids: Vec::new(),
})
}
/// Stage a merge_insert (upsert): write fragment files describing the
/// merge result, return the uncommitted transaction plus the new
/// fragments. The transaction's `Operation::Update` carries the
/// fragments-to-remove and fragments-to-add; for read-your-writes we
/// expose `new_fragments` (rows that will be visible after commit).
///
/// **Contract: do not chain `stage_merge_insert` calls on the same
/// table within one query.** Each call's `MergeInsertBuilder` runs
/// against the supplied dataset's committed view — it does not see
/// fragments produced by a previous staged merge on the same table.
/// Two chained `stage_merge_insert`s whose source rows share keys will
/// each independently produce `Operation::Update` transactions whose
/// `new_fragments` contain a row for the shared key. `scan_with_staged`
/// (and `count_rows_with_staged`) will then return both — i.e.
/// **duplicates by key**.
///
/// This is intrinsic to the underlying Lance API: there is no public
/// way to make `MergeInsertBuilder` see uncommitted fragments. The
/// engine's mutation path enforces the rule "per touched table: all
/// stage_append OR exactly one stage_merge_insert" at parse time
/// (the D₂ check landing with [MR-794](https://linear.app/modernrelay/issue/MR-794)
/// step 2+ in `exec/mutation.rs`). Multi-table queries and append-chains
/// remain safe; only chained merges on a single table are rejected.
///
/// Lift path: either a Lance API extension that lets
/// `MergeInsertBuilder` accept additional staged fragments, or an
/// in-memory pre-merge here that folds prior staged batches into the
/// input stream. See `docs/runs.md` and MR-793.
pub async fn stage_merge_insert(
&self,
ds: Dataset,
batch: RecordBatch,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<StagedWrite> {
if batch.num_rows() == 0 {
return Err(OmniError::manifest_internal(
"stage_merge_insert called with empty batch".to_string(),
));
}
let ds = Arc::new(ds);
let job = MergeInsertBuilder::try_new(ds, key_columns)
.map_err(|e| OmniError::Lance(e.to_string()))?
.when_matched(when_matched)
.when_not_matched(when_not_matched)
.try_build()
.map_err(|e| OmniError::Lance(e.to_string()))?;
let schema = batch.schema();
let reader = arrow_array::RecordBatchIterator::new(vec![Ok(batch)], schema);
let stream = lance_datafusion::utils::reader_to_stream(Box::new(reader));
let uncommitted = job
.execute_uncommitted(stream)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
// Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } —
// `new_fragments` are the freshly inserted rows; `updated_fragments`
// are rewrites of existing fragments that include both retained and
// updated rows; `removed_fragment_ids` lists the committed-manifest
// fragments that those rewrites supersede. For read-your-writes we
// expose `updated_fragments + new_fragments` and the
// `removed_fragment_ids` so `scan_with_staged` can filter the
// superseded committed fragments before combining — otherwise a
// single merge_insert appears as duplicate rows (original committed
// version + rewritten staged version).
let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
Operation::Update {
new_fragments,
updated_fragments,
removed_fragment_ids,
..
} => {
let mut all = updated_fragments.clone();
all.extend(new_fragments.iter().cloned());
(all, removed_fragment_ids.clone())
}
Operation::Append { fragments } => (fragments.clone(), Vec::new()),
other => {
return Err(OmniError::manifest_internal(format!(
"stage_merge_insert: unexpected Lance operation {:?}",
std::mem::discriminant(other)
)));
}
};
Ok(StagedWrite {
transaction: uncommitted.transaction,
new_fragments,
removed_fragment_ids,
})
}
/// Commit a previously-staged transaction onto `ds`, returning the new
/// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by
/// the publisher at end-of-query to materialize all staged writes before
/// the meta-manifest commit.
pub async fn commit_staged(
&self,
ds: Arc<Dataset>,
transaction: Transaction,
) -> Result<Dataset> {
CommitBuilder::new(ds)
.execute(transaction)
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Run a scan with optional uncommitted staged writes visible
/// alongside the committed snapshot. When `staged` is empty this is
/// identical to `scan(...)`.
///
/// Composes the visible fragment list as `committed - removed + new`:
/// the committed manifest's fragments, minus any fragment IDs that
/// staged `Operation::Update`s (merge_insert rewrites) have superseded,
/// plus the staged new/updated fragments. Without the `removed`
/// filter, a merge_insert that rewrites an existing fragment would
/// surface twice — once via the original committed fragment, once via
/// the rewrite in `new_fragments`.
///
/// **Filter contract is incomplete on staged fragments.** When `filter`
/// is `Some(...)`, Lance pushes the predicate to per-fragment scans
/// with stats-based pruning. Uncommitted fragments produced by
/// `write_fragments_internal` lack the per-column statistics that
/// committed fragments carry; Lance's optimizer drops them from the
/// filtered scan even when their data would match. Staged-fragment
/// rows are silently absent from the result. `scanner.use_stats(false)`
/// does not fix this in lance 4.0.0. Callers needing correct filtered
/// reads against staged data should use a different strategy (the
/// engine's MR-794 step 2+ design uses in-memory pending-batch
/// accumulation + DataFusion `MemTable` instead — see
/// `.context/mr-794-step2-design.md`).
///
/// This method remains on the surface for primitive-level testing
/// (basic stage + scan correctness without filters works) and for
/// callers that don't need filter pushdown.
pub async fn scan_with_staged(
&self,
ds: &Dataset,
staged: &[StagedWrite],
projection: Option<&[&str]>,
filter: Option<&str>,
) -> Result<Vec<RecordBatch>> {
if staged.is_empty() {
return self.scan(ds, projection, filter, None).await;
}
let mut scanner = ds.scan();
if let Some(cols) = projection {
let owned: Vec<String> = cols.iter().map(|s| s.to_string()).collect();
scanner
.project(&owned)
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
if let Some(f) = filter {
scanner
.filter(f)
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
scanner.with_fragments(combine_committed_with_staged(ds, staged));
let stream = scanner
.try_into_stream()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
stream
.try_collect()
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// `count_rows` variant that respects staged writes. Used for
/// edge-cardinality validation that needs to see staged edges before
/// commit. Same `committed - removed + new` composition as
/// `scan_with_staged`.
pub async fn count_rows_with_staged(
&self,
ds: &Dataset,
staged: &[StagedWrite],
filter: Option<String>,
) -> Result<usize> {
if staged.is_empty() {
return self.count_rows(ds, filter).await;
}
let mut scanner = ds.scan();
if let Some(f) = filter {
scanner
.filter(&f)
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
scanner.with_fragments(combine_committed_with_staged(ds, staged));
let count = scanner
.count_rows()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(count as usize)
}
async fn user_indices_for_column(
&self,
ds: &Dataset,
@ -620,3 +968,120 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))
}
}
/// Build the `Scanner::with_fragments` argument for read-your-writes:
/// committed manifest fragments minus any fragment IDs superseded by the
/// staged writes, plus the staged `new_fragments`. Order is:
/// 1. committed fragments whose IDs are NOT in any staged
/// `removed_fragment_ids` (preserves committed order),
/// 2. all staged `new_fragments` in stage order.
///
/// Lance's `Scanner` does not require any particular ordering between
/// committed and staged fragments — `with_fragments` scopes the scan to
/// exactly the supplied list. The dedup matters because merge_insert
/// rewrites a fragment in place at the Lance layer: the rewritten
/// fragment is in `new_fragments`, the original (which it supersedes) is
/// in `committed` until manifest commit, and including both would yield
/// duplicate rows.
///
/// **Inter-stage supersession is not handled here.** Each StagedWrite's
/// `removed_fragment_ids` lists committed-manifest fragment IDs only; a
/// later staged merge cannot know about an earlier staged merge's
/// fragments (Lance's `MergeInsertBuilder` runs against the committed
/// view). If two `stage_merge_insert`s on the same table produce rows
/// with the same key, the combined view returns duplicates by key. The
/// engine's mutation path enforces "per touched table: all stage_append
/// OR exactly one stage_merge_insert" at parse time (D₂ in
/// `exec/mutation.rs`) so this primitive's caller never chains merges.
/// See `stage_merge_insert` for the full contract.
/// Sum `physical_rows` across all fragments in the supplied stages.
/// Used by `stage_append` to compute the row-ID offset for chained
/// `stage_append` calls against the same dataset.
///
/// Assumes `prior_stages` contains only `stage_append` results — see
/// `stage_append`'s D₂ contract. For `stage_merge_insert` results the
/// `new_fragments` include rewrites that don't add new rows, so this
/// would over-count.
fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 {
prior_stages
.iter()
.map(|s| s.new_fragments.len() as u64)
.sum()
}
/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's
/// commit-time `Transaction::fragments_with_ids` (lance-4.0.0
/// `dataset/transaction.rs:1456`) — fragments produced by
/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary
/// placeholder; we renumber here so they don't collide with committed
/// fragments (or with each other across chained stages) when the slice is
/// passed to `Scanner::with_fragments`.
fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) {
for (i, fragment) in fragments.iter_mut().enumerate() {
if fragment.id == 0 {
fragment.id = start_id + i as u64;
}
}
}
fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result<u64> {
let mut total: u64 = 0;
for stage in prior_stages {
for fragment in &stage.new_fragments {
let physical_rows = fragment.physical_rows.ok_or_else(|| {
OmniError::manifest_internal(
"prior_stages_row_count: fragment is missing physical_rows".to_string(),
)
})? as u64;
total += physical_rows;
}
}
Ok(total)
}
/// Assign sequential row IDs to fragments that lack them, starting from
/// `start_row_id`. Mirrors the relevant arm of Lance's
/// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`)
/// for the `row_id_meta = None` case — fragments produced by
/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset.
///
/// Used only by `stage_append` for read-your-writes — see its docstring
/// for why pre-commit assignment is needed and why diverging from Lance's
/// commit-time IDs is safe.
fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> {
let mut next_row_id = start_row_id;
for fragment in fragments {
if fragment.row_id_meta.is_some() {
continue;
}
let physical_rows = fragment.physical_rows.ok_or_else(|| {
OmniError::manifest_internal(
"stage_append: fragment is missing physical_rows".to_string(),
)
})? as u64;
let row_ids = next_row_id..(next_row_id + physical_rows);
let sequence = RowIdSequence::from(row_ids);
let serialized = write_row_ids(&sequence);
fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
next_row_id += physical_rows;
}
Ok(())
}
fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
let removed: std::collections::HashSet<u64> = staged
.iter()
.flat_map(|w| w.removed_fragment_ids.iter().copied())
.collect();
let mut combined: Vec<Fragment> = ds
.manifest
.fragments
.iter()
.filter(|f| !removed.contains(&f.id))
.cloned()
.collect();
for write in staged {
combined.extend(write.new_fragments.iter().cloned());
}
combined
}

View file

@ -0,0 +1,490 @@
//! Primitive-level tests for `TableStore`'s staged-write API
//! (MR-794 step 1). These exercise `stage_append`, `stage_merge_insert`,
//! `scan_with_staged`, and `count_rows_with_staged` directly against a
//! Lance dataset — no Omnigraph engine involved. The engine-level rewire
//! (MR-794 step 2+) lives in `tests/runs.rs` once it lands.
//!
//! Test surface here:
//! 1. `stage_append` + `scan_with_staged` shows committed + staged data
//! without duplicates.
//! 2. `stage_merge_insert` of a row that supersedes a committed fragment
//! surfaces only the rewritten row, not both — the
//! `removed_fragment_ids` dedup landed in PR #66's `730631c`.
//! 3. **Documented contract**: chained `stage_merge_insert` calls on the
//! same dataset whose source rows share keys produce duplicate rows in
//! `scan_with_staged`. The engine's parse-time D₂ check (MR-794 step
//! 2+) prevents callers from triggering this; this test pins the
//! primitive's behavior so a future change either (a) preserves it or
//! (b) consciously fixes it (and updates this test).
use arrow_array::{Array, Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::{WhenMatched, WhenNotMatched};
use lance_table::format::Fragment;
use omnigraph::table_store::{StagedWrite, TableStore};
use std::sync::Arc;
fn person_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
]))
}
fn person_batch(rows: &[(&str, Option<i32>)]) -> RecordBatch {
let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect();
let ages: Vec<Option<i32>> = rows.iter().map(|(_, age)| *age).collect();
RecordBatch::try_new(
person_schema(),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(ages)),
],
)
.unwrap()
}
fn collect_ids(batches: &[RecordBatch]) -> Vec<String> {
let mut out = Vec::new();
for b in batches {
let ids = b
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
for i in 0..ids.len() {
out.push(ids.value(i).to_string());
}
}
out.sort();
out
}
#[tokio::test]
async fn stage_append_is_visible_via_scan_with_staged() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Seed: one committed row.
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
// Stage a second row. First call → empty prior_stages.
let staged = store
.stage_append(&ds, person_batch(&[("bob", Some(25))]), &[])
.await
.unwrap();
// scan_with_staged sees both committed alice + staged bob, no duplicates.
let batches = store
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
.await
.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "bob"]);
// Plain scan (no staged) still sees only committed alice — dataset HEAD
// hasn't moved.
let plain = store.scan_batches(&ds).await.unwrap();
assert_eq!(collect_ids(&plain), vec!["alice"]);
}
#[tokio::test]
async fn stage_merge_insert_dedupes_superseded_committed_fragment() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Seed: alice age 30 in one committed fragment.
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
// Stage a merge_insert that rewrites alice's row. This produces an
// Operation::Update whose removed_fragment_ids excludes the committed
// fragment that contained the old alice.
let staged = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(31))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
assert!(
!staged.removed_fragment_ids.is_empty(),
"merge_insert that rewrites a committed row must set removed_fragment_ids \
(this is the dedup invariant from PR #66 commit 730631c its absence \
was caught by Cubic/Cursor/Codex on PR #66)"
);
// scan_with_staged: alice appears exactly once, with the new age.
let batches = store
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(ids, vec!["alice"], "merge_insert must not surface duplicates");
// Confirm the visible row is the rewritten one.
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 1);
let ages: Vec<i32> = batches
.iter()
.flat_map(|b| {
let col = b
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
(0..col.len()).map(|i| col.value(i)).collect::<Vec<_>>()
})
.collect();
assert_eq!(ages, vec![31]);
}
#[tokio::test]
async fn count_rows_with_staged_matches_scan() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let staged = store
.stage_append(
&ds,
person_batch(&[("bob", Some(25)), ("carol", Some(40))]),
&[],
)
.await
.unwrap();
let count = store
.count_rows_with_staged(&ds, std::slice::from_ref(&staged), None)
.await
.unwrap();
assert_eq!(count, 3);
}
/// Two `stage_append` calls on the same dataset must produce
/// non-overlapping `_rowid` ranges. Without `prior_stages` threading,
/// both calls would assign IDs starting from `ds.manifest.next_row_id`,
/// producing overlapping ranges that break read paths consulting the
/// row-ID index (prefilter, vector search). With the slice threaded
/// through, the second call offsets by the first call's row count.
///
/// This is what enables the engine's multi-statement `insert Knows ...;
/// insert Knows ...` (multiple appends to the same edge table) under
/// the D₂ rule.
#[tokio::test]
async fn chained_stage_appends_have_distinct_row_ids() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))]))
.await
.unwrap();
let s1 = store
.stage_append(&ds, person_batch(&[("alice", Some(30))]), &[])
.await
.unwrap();
let s2 = store
.stage_append(
&ds,
person_batch(&[("bob", Some(25))]),
std::slice::from_ref(&s1),
)
.await
.unwrap();
// Scan with row IDs requested. If s1 and s2 had overlapping _rowid
// ranges, Lance's scanner would conflict (or surface duplicates) on
// the combined fragment list.
let staged = vec![s1, s2];
let batches = store
.scan_with_staged(&ds, &staged, None, None)
.await
.unwrap();
let ids = collect_ids(&batches);
assert_eq!(ids, vec!["alice", "bob", "seed"]);
// Project _rowid explicitly and assert all rows have distinct IDs.
let mut scanner = ds.scan();
scanner.with_row_id();
scanner.with_fragments(combine_for_scan(&ds, &staged));
let stream = scanner.try_into_stream().await.unwrap();
let projected: Vec<_> = stream.try_collect().await.unwrap();
let row_ids: std::collections::BTreeSet<u64> = projected
.iter()
.flat_map(|b| {
let arr = b
.column_by_name("_rowid")
.unwrap()
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
(0..arr.len()).map(|i| arr.value(i)).collect::<Vec<_>>()
})
.collect();
assert_eq!(
row_ids.len(),
3,
"all 3 rows (1 committed + 2 staged) should have distinct _rowid; \
overlap implies stage_append failed to offset by prior_stages"
);
}
/// Helper for the chained-append test: replicate the primitive's
/// `combine_committed_with_staged` logic so the test can supply a custom
/// scanner that requests `_rowid`. Kept inline here to avoid making the
/// engine helper public.
fn combine_for_scan(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
let removed: std::collections::HashSet<u64> = staged
.iter()
.flat_map(|w| w.removed_fragment_ids.iter().copied())
.collect();
let mut combined: Vec<_> = ds
.manifest
.fragments
.iter()
.filter(|f| !removed.contains(&f.id))
.cloned()
.collect();
for s in staged {
combined.extend(s.new_fragments.iter().cloned());
}
combined
}
/// `stage_append` + `commit_staged` round-trip: after commit, the
/// dataset's HEAD reflects the staged data and a fresh scan sees it.
/// Validates that our pre-assigned `row_id_meta` doesn't break Lance's
/// commit-time row-ID assignment (transaction.rs:2682).
#[tokio::test]
async fn stage_append_then_commit_persists_data() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let pre_version = ds.version().version;
let staged = store
.stage_append(&ds, person_batch(&[("bob", Some(25))]), &[])
.await
.unwrap();
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.await
.unwrap();
assert!(
new_ds.version().version > pre_version,
"commit_staged must advance the dataset version"
);
// Reopen and confirm rows are visible at HEAD.
let reopened = Dataset::open(&uri).await.unwrap();
let batches = store.scan_batches(&reopened).await.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "bob"]);
}
/// `stage_merge_insert` + `commit_staged` round-trip: after commit, the
/// merged view (existing alice updated + new bob inserted) is visible.
#[tokio::test]
async fn stage_merge_insert_then_commit_persists_merged_view() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
let staged = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(31)), ("bob", Some(25))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
store
.commit_staged(Arc::new(ds), staged.transaction)
.await
.unwrap();
let reopened = Dataset::open(&uri).await.unwrap();
let batches = store.scan_batches(&reopened).await.unwrap();
assert_eq!(collect_ids(&batches), vec!["alice", "bob"]);
// Confirm alice was updated to age=31, not duplicated.
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 2, "merge_insert must not duplicate the matched row");
}
/// **Documented limitation** (see `scan_with_staged` doc): when a filter
/// is supplied, Lance's stats-based pruning drops the staged fragment from
/// the filtered scan because uncommitted fragments produced by
/// `write_fragments_internal` lack per-column statistics. The result
/// contains only matching committed rows; matching staged rows are
/// silently absent. `scanner.use_stats(false)` does not bypass this in
/// lance 4.0.0.
///
/// This test pins the actual behavior so a future change either preserves
/// it (and updates the doc) or fixes it (and rewrites this test). The
/// engine's MR-794 step 2+ design uses in-memory pending-batch
/// accumulation + DataFusion `MemTable` for read-your-writes instead, so
/// production code is unaffected.
#[tokio::test]
async fn scan_with_staged_with_filter_silently_drops_staged_rows() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Committed: alice=30, carol=40
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("carol", Some(40))]),
)
.await
.unwrap();
// Staged: bob=25, dave=35
let staged = store
.stage_append(
&ds,
person_batch(&[("bob", Some(25)), ("dave", Some(35))]),
&[],
)
.await
.unwrap();
// Filter: age >= 30. Correct semantics would return alice, carol, dave.
// Actual: dave (staged, age=35) is dropped — only the committed matches
// come back.
let batches = store
.scan_with_staged(
&ds,
std::slice::from_ref(&staged),
None,
Some("age >= 30"),
)
.await
.unwrap();
assert_eq!(
collect_ids(&batches),
vec!["alice", "carol"],
"documented limitation: filter pushdown drops staged fragments. \
If you're here because this assertion failed: either (a) Lance \
exposed a way to scan uncommitted fragments without stats-based \
pruning (good update to assert == [alice, carol, dave]), or \
(b) something changed in our scan_with_staged path. See PR #67 \
test fix discussion + .context/mr-794-step2-design.md §1.1."
);
// Without filter, staged data IS visible — confirms the issue is
// specifically filter pushdown, not fragment scanning per se.
let unfiltered = store
.scan_with_staged(
&ds,
std::slice::from_ref(&staged),
None,
None,
)
.await
.unwrap();
assert_eq!(
collect_ids(&unfiltered),
vec!["alice", "bob", "carol", "dave"],
"unfiltered scan_with_staged returns all rows correctly"
);
}
/// **Documented contract** (see `stage_merge_insert` doc): chained
/// `stage_merge_insert` calls on the same table whose source rows share
/// keys cannot dedupe across stages. Each call's `MergeInsertBuilder` runs
/// against the committed view; neither sees the other's staged fragments.
/// The combined `scan_with_staged` therefore returns the shared key
/// twice.
///
/// The engine's mutation path enforces D₂ (per touched table: all
/// stage_append OR exactly one stage_merge_insert) at parse time so this
/// scenario is unreachable through public APIs. This test pins the
/// primitive behavior — if a future change makes the primitive itself
/// dedupe across stages (e.g. via a Lance API extension or in-memory
/// pre-merge), update this assertion.
#[tokio::test]
async fn chained_stage_merge_insert_with_shared_key_documents_duplicate_behavior() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
// Seed empty (an unrelated row keeps the schema unambiguous).
let ds = TableStore::write_dataset(&uri, person_batch(&[("seed", Some(0))]))
.await
.unwrap();
// Op-1: stage merge_insert of alice. Against committed view: alice
// doesn't exist, so this lands as a fresh insert into Operation::Update.new_fragments.
let staged_1 = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(30))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
// Op-2: stage merge_insert of alice with a different age. Also runs
// against the committed view (alice doesn't exist there either), so
// Lance produces another fresh insert. Op-2 has no knowledge of
// op-1's staged fragments.
let staged_2 = store
.stage_merge_insert(
ds.clone(),
person_batch(&[("alice", Some(31))]),
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::InsertAll,
)
.await
.unwrap();
// scan_with_staged sees committed (seed) + op-1.new (alice age=30) +
// op-2.new (alice age=31). Alice appears twice — the documented
// contract violation that D₂ prevents at the engine layer.
let batches = store
.scan_with_staged(&ds, &[staged_1, staged_2], None, None)
.await
.unwrap();
let ids = collect_ids(&batches);
let alice_count = ids.iter().filter(|id| *id == "alice").count();
assert_eq!(
alice_count, 2,
"chained stage_merge_insert with shared key produces duplicates — \
this is the contract documented on stage_merge_insert. If you're \
here because this assertion failed: either (a) the primitive was \
improved to dedupe across stages (good update to assert == 1) \
or (b) something subtler broke (investigate before changing the \
assertion). See PR #67 Codex P1 thread + .context/mr-794-step2-design.md §3.1."
);
}