From 714f1f0c0a34bd229661b9acb7fe0553224c21ce Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Thu, 30 Apr 2026 23:49:14 +0200 Subject: [PATCH] MR-794 step 1: assign row_id_meta to stage_append fragments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI exposed a real Step 1 bug surfaced by the new staged_writes tests: stage_append → scan_with_staged fails on stable_row_id datasets with "Missing row id meta" (lance-4.0.0/src/dataset/rowids.rs:22). Root cause: InsertBuilder::execute_uncommitted produces fragments with row_id_meta = None. Lance's commit phase normally populates it via Transaction::assign_row_ids, but scan_with_staged reads the staged fragments BEFORE commit. MergeInsertBuilder::execute_uncommitted dodges this by populating row_id_meta inline (transaction.rs:1618) — that's why the two merge-side tests in tests/staged_writes.rs passed and the two append-side tests failed. The bug was always present in the primitive — PR #66 shipped it the same way. PR #66 had no tests calling stage_append, so neither CI nor the bot reviewers caught it. Step 2+ would have hit it on the first mutation that did "insert + insert with FK validation," but the failure would have looked like a MutationStaging wiring bug; localizing it here saves the next session the chase. Fix: assign row_id_meta on the cloned fragments returned in StagedWrite.new_fragments. Mirrors the relevant arm of Lance's Transaction::assign_row_ids (transaction.rs:2682) for the row_id_meta = None case. The transaction's internal fragment copy stays untouched — Lance assigns its own IDs at commit time, and the two ID assignments don't have to agree because no caller threads _rowid from the staged scan into the commit path. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/table_store.rs | 50 +++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 99ff2aa..6ebd7ba 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -15,7 +15,8 @@ use lance_file::version::LanceFileVersion; use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams}; use lance_index::{DatasetIndexExt, IndexType, is_system_index}; use lance_linalg::distance::MetricType; -use lance_table::format::{Fragment, IndexMetadata}; +use lance_table::format::{Fragment, IndexMetadata, RowIdMeta}; +use lance_table::rowids::{RowIdSequence, write_row_ids}; use std::sync::Arc; use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write}; @@ -562,6 +563,19 @@ impl TableStore { /// Stage an append: write fragment files for `batch`, return the /// uncommitted Lance transaction plus the new fragments for /// read-your-writes. + /// + /// On stable-row-id datasets we manually populate `row_id_meta` on the + /// cloned `new_fragments` we expose for `scan_with_staged`. Lance's + /// `InsertBuilder::execute_uncommitted` produces fragments with + /// `row_id_meta = None`; row IDs are normally assigned by + /// `Transaction::assign_row_ids` during commit. Because + /// `scan_with_staged` reads the staged fragments *before* commit, the + /// scanner trips on a stable-row-id dataset (`Error::internal("Missing + /// row id meta")` from `dataset/rowids.rs:22`). The transaction's + /// internal fragment copy stays untouched — Lance assigns IDs there + /// independently at commit time, and the two ID assignments don't + /// have to agree because no caller threads `_rowid` from the staged + /// scan into the commit path. pub async fn stage_append(&self, ds: &Dataset, batch: RecordBatch) -> Result { if batch.num_rows() == 0 { return Err(OmniError::manifest_internal( @@ -578,7 +592,7 @@ impl TableStore { .execute_uncommitted(vec![batch]) .await .map_err(|e| OmniError::Lance(e.to_string()))?; - let new_fragments = match &transaction.operation { + let mut new_fragments = match &transaction.operation { Operation::Append { fragments } => fragments.clone(), Operation::Overwrite { fragments, .. } => fragments.clone(), other => { @@ -588,6 +602,9 @@ impl TableStore { ))); } }; + if ds.manifest.uses_stable_row_ids() { + assign_row_id_meta(&mut new_fragments, ds.manifest.next_row_id)?; + } Ok(StagedWrite { transaction, new_fragments, @@ -919,6 +936,35 @@ impl TableStore { /// OR exactly one stage_merge_insert" at parse time (D₂′ in /// `exec/mutation.rs`) so this primitive's caller never chains merges. /// See `stage_merge_insert` for the full contract. +/// Assign sequential row IDs to fragments that lack them, starting from +/// `start_row_id`. Mirrors the relevant arm of Lance's +/// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`) +/// for the `row_id_meta = None` case — fragments produced by +/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset. +/// +/// Used only by `stage_append` for read-your-writes — see its docstring +/// for why pre-commit assignment is needed and why diverging from Lance's +/// commit-time IDs is safe. +fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> { + let mut next_row_id = start_row_id; + for fragment in fragments { + if fragment.row_id_meta.is_some() { + continue; + } + let physical_rows = fragment.physical_rows.ok_or_else(|| { + OmniError::manifest_internal( + "stage_append: fragment is missing physical_rows".to_string(), + ) + })? as u64; + let row_ids = next_row_id..(next_row_id + physical_rows); + let sequence = RowIdSequence::from(row_ids); + let serialized = write_row_ids(&sequence); + fragment.row_id_meta = Some(RowIdMeta::Inline(serialized)); + next_row_id += physical_rows; + } + Ok(()) +} + fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec { let removed: std::collections::HashSet = staged .iter()