MR-794 step 1: assign row_id_meta to stage_append fragments

CI exposed a real Step 1 bug surfaced by the new staged_writes tests:
stage_append → scan_with_staged fails on stable_row_id datasets with
"Missing row id meta" (lance-4.0.0/src/dataset/rowids.rs:22).

Root cause: InsertBuilder::execute_uncommitted produces fragments with
row_id_meta = None. Lance's commit phase normally populates it via
Transaction::assign_row_ids, but scan_with_staged reads the staged
fragments BEFORE commit. MergeInsertBuilder::execute_uncommitted dodges
this by populating row_id_meta inline (transaction.rs:1618) — that's
why the two merge-side tests in tests/staged_writes.rs passed and the
two append-side tests failed.

The bug was always present in the primitive — PR #66 shipped it the
same way. PR #66 had no tests calling stage_append, so neither CI nor
the bot reviewers caught it. Step 2+ would have hit it on the first
mutation that did "insert + insert with FK validation," but the failure
would have looked like a MutationStaging wiring bug; localizing it
here saves the next session the chase.

Fix: assign row_id_meta on the cloned fragments returned in
StagedWrite.new_fragments. Mirrors the relevant arm of Lance's
Transaction::assign_row_ids (transaction.rs:2682) for the
row_id_meta = None case. The transaction's internal fragment copy stays
untouched — Lance assigns its own IDs at commit time, and the two ID
assignments don't have to agree because no caller threads _rowid from
the staged scan into the commit path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-04-30 23:49:14 +02:00
parent 2fe2669017
commit 714f1f0c0a
No known key found for this signature in database

View file

@ -15,7 +15,8 @@ use lance_file::version::LanceFileVersion;
use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
use lance_index::{DatasetIndexExt, IndexType, is_system_index};
use lance_linalg::distance::MetricType;
use lance_table::format::{Fragment, IndexMetadata};
use lance_table::format::{Fragment, IndexMetadata, RowIdMeta};
use lance_table::rowids::{RowIdSequence, write_row_ids};
use std::sync::Arc;
use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write};
@ -562,6 +563,19 @@ impl TableStore {
/// Stage an append: write fragment files for `batch`, return the
/// uncommitted Lance transaction plus the new fragments for
/// read-your-writes.
///
/// On stable-row-id datasets we manually populate `row_id_meta` on the
/// cloned `new_fragments` we expose for `scan_with_staged`. Lance's
/// `InsertBuilder::execute_uncommitted` produces fragments with
/// `row_id_meta = None`; row IDs are normally assigned by
/// `Transaction::assign_row_ids` during commit. Because
/// `scan_with_staged` reads the staged fragments *before* commit, the
/// scanner trips on a stable-row-id dataset (`Error::internal("Missing
/// row id meta")` from `dataset/rowids.rs:22`). The transaction's
/// internal fragment copy stays untouched — Lance assigns IDs there
/// independently at commit time, and the two ID assignments don't
/// have to agree because no caller threads `_rowid` from the staged
/// scan into the commit path.
pub async fn stage_append(&self, ds: &Dataset, batch: RecordBatch) -> Result<StagedWrite> {
if batch.num_rows() == 0 {
return Err(OmniError::manifest_internal(
@ -578,7 +592,7 @@ impl TableStore {
.execute_uncommitted(vec![batch])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let new_fragments = match &transaction.operation {
let mut new_fragments = match &transaction.operation {
Operation::Append { fragments } => fragments.clone(),
Operation::Overwrite { fragments, .. } => fragments.clone(),
other => {
@ -588,6 +602,9 @@ impl TableStore {
)));
}
};
if ds.manifest.uses_stable_row_ids() {
assign_row_id_meta(&mut new_fragments, ds.manifest.next_row_id)?;
}
Ok(StagedWrite {
transaction,
new_fragments,
@ -919,6 +936,35 @@ impl TableStore {
/// OR exactly one stage_merge_insert" at parse time (D₂ in
/// `exec/mutation.rs`) so this primitive's caller never chains merges.
/// See `stage_merge_insert` for the full contract.
/// Assign sequential row IDs to fragments that lack them, starting from
/// `start_row_id`. Mirrors the relevant arm of Lance's
/// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`)
/// for the `row_id_meta = None` case — fragments produced by
/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset.
///
/// Used only by `stage_append` for read-your-writes — see its docstring
/// for why pre-commit assignment is needed and why diverging from Lance's
/// commit-time IDs is safe.
fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> {
let mut next_row_id = start_row_id;
for fragment in fragments {
if fragment.row_id_meta.is_some() {
continue;
}
let physical_rows = fragment.physical_rows.ok_or_else(|| {
OmniError::manifest_internal(
"stage_append: fragment is missing physical_rows".to_string(),
)
})? as u64;
let row_ids = next_row_id..(next_row_id + physical_rows);
let sequence = RowIdSequence::from(row_ids);
let serialized = write_row_ids(&sequence);
fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
next_row_id += physical_rows;
}
Ok(())
}
fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
let removed: std::collections::HashSet<u64> = staged
.iter()