diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 99ff2aa..6ebd7ba 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -15,7 +15,8 @@ use lance_file::version::LanceFileVersion; use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams}; use lance_index::{DatasetIndexExt, IndexType, is_system_index}; use lance_linalg::distance::MetricType; -use lance_table::format::{Fragment, IndexMetadata}; +use lance_table::format::{Fragment, IndexMetadata, RowIdMeta}; +use lance_table::rowids::{RowIdSequence, write_row_ids}; use std::sync::Arc; use crate::db::manifest::{TableVersionMetadata, open_table_head_for_write}; @@ -562,6 +563,19 @@ impl TableStore { /// Stage an append: write fragment files for `batch`, return the /// uncommitted Lance transaction plus the new fragments for /// read-your-writes. + /// + /// On stable-row-id datasets we manually populate `row_id_meta` on the + /// cloned `new_fragments` we expose for `scan_with_staged`. Lance's + /// `InsertBuilder::execute_uncommitted` produces fragments with + /// `row_id_meta = None`; row IDs are normally assigned by + /// `Transaction::assign_row_ids` during commit. Because + /// `scan_with_staged` reads the staged fragments *before* commit, the + /// scanner trips on a stable-row-id dataset (`Error::internal("Missing + /// row id meta")` from `dataset/rowids.rs:22`). The transaction's + /// internal fragment copy stays untouched — Lance assigns IDs there + /// independently at commit time, and the two ID assignments don't + /// have to agree because no caller threads `_rowid` from the staged + /// scan into the commit path. pub async fn stage_append(&self, ds: &Dataset, batch: RecordBatch) -> Result { if batch.num_rows() == 0 { return Err(OmniError::manifest_internal( @@ -578,7 +592,7 @@ impl TableStore { .execute_uncommitted(vec![batch]) .await .map_err(|e| OmniError::Lance(e.to_string()))?; - let new_fragments = match &transaction.operation { + let mut new_fragments = match &transaction.operation { Operation::Append { fragments } => fragments.clone(), Operation::Overwrite { fragments, .. } => fragments.clone(), other => { @@ -588,6 +602,9 @@ impl TableStore { ))); } }; + if ds.manifest.uses_stable_row_ids() { + assign_row_id_meta(&mut new_fragments, ds.manifest.next_row_id)?; + } Ok(StagedWrite { transaction, new_fragments, @@ -919,6 +936,35 @@ impl TableStore { /// OR exactly one stage_merge_insert" at parse time (D₂′ in /// `exec/mutation.rs`) so this primitive's caller never chains merges. /// See `stage_merge_insert` for the full contract. +/// Assign sequential row IDs to fragments that lack them, starting from +/// `start_row_id`. Mirrors the relevant arm of Lance's +/// `Transaction::assign_row_ids` (lance-4.0.0 `dataset/transaction.rs:2682`) +/// for the `row_id_meta = None` case — fragments produced by +/// `InsertBuilder::execute_uncommitted` against a stable-row-id dataset. +/// +/// Used only by `stage_append` for read-your-writes — see its docstring +/// for why pre-commit assignment is needed and why diverging from Lance's +/// commit-time IDs is safe. +fn assign_row_id_meta(fragments: &mut [Fragment], start_row_id: u64) -> Result<()> { + let mut next_row_id = start_row_id; + for fragment in fragments { + if fragment.row_id_meta.is_some() { + continue; + } + let physical_rows = fragment.physical_rows.ok_or_else(|| { + OmniError::manifest_internal( + "stage_append: fragment is missing physical_rows".to_string(), + ) + })? as u64; + let row_ids = next_row_id..(next_row_id + physical_rows); + let sequence = RowIdSequence::from(row_ids); + let serialized = write_row_ids(&sequence); + fragment.row_id_meta = Some(RowIdMeta::Inline(serialized)); + next_row_id += physical_rows; + } + Ok(()) +} + fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec { let removed: std::collections::HashSet = staged .iter()