diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 6712df4..430c2c7 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -624,6 +624,20 @@ impl TableStore { ))); } }; + // Assign real fragment IDs. Lance's `InsertBuilder::execute_uncommitted` + // returns fragments with `id = 0` ("Temporary ID" — see lance-4.0.0 + // `dataset/write.rs:1044/1712`); the real assignment happens during + // commit via `Transaction::fragments_with_ids`. Because we expose + // these fragments to `scan_with_staged` *before* commit, two staged + // fragments (or one staged + the seed) would collide on `id = 0`, + // causing Lance's scanner to mishandle the combined list (silent + // duplicates / dropped rows). Mirror the commit-time renumbering + // here, using `ds.manifest.max_fragment_id() + 1` as the base and + // accounting for prior stages. + let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) + + 1 + + prior_stages_fragment_count(prior_stages); + assign_fragment_ids(&mut new_fragments, next_id_base); if ds.manifest.uses_stable_row_ids() { let prior_rows = prior_stages_row_count(prior_stages)?; let start_row_id = ds.manifest.next_row_id + prior_rows; @@ -968,6 +982,28 @@ impl TableStore { /// `stage_append`'s D₂′ contract. For `stage_merge_insert` results the /// `new_fragments` include rewrites that don't add new rows, so this /// would over-count. +fn prior_stages_fragment_count(prior_stages: &[StagedWrite]) -> u64 { + prior_stages + .iter() + .map(|s| s.new_fragments.len() as u64) + .sum() +} + +/// Assign sequential fragment IDs starting at `start_id`. Mirrors Lance's +/// commit-time `Transaction::fragments_with_ids` (lance-4.0.0 +/// `dataset/transaction.rs:1456`) — fragments produced by +/// `InsertBuilder::execute_uncommitted` start with `id = 0` as a temporary +/// placeholder; we renumber here so they don't collide with committed +/// fragments (or with each other across chained stages) when the slice is +/// passed to `Scanner::with_fragments`. +fn assign_fragment_ids(fragments: &mut [Fragment], start_id: u64) { + for (i, fragment) in fragments.iter_mut().enumerate() { + if fragment.id == 0 { + fragment.id = start_id + i as u64; + } + } +} + fn prior_stages_row_count(prior_stages: &[StagedWrite]) -> Result { let mut total: u64 = 0; for stage in prior_stages {