diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 1404d6c..ccf4c2a 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -38,21 +38,43 @@ pub struct DeleteState { } /// A Lance write that has produced fragment files on object storage but is -/// not yet committed to the dataset's manifest. Used by `MutationStaging` -/// (`exec/mutation.rs`) and the loader to defer Lance commits to -/// end-of-query, so a mid-query failure leaves the touched table at the -/// pre-mutation HEAD instead of drifting ahead. +/// not yet committed to the dataset's manifest. The staged-write primitives +/// are defined here for later integration in `MutationStaging` +/// (`exec/mutation.rs`) and the loader (`loader/mod.rs`) — those rewires +/// land in [MR-794](https://linear.app/modernrelay/issue/MR-794) step 2+. +/// The intent: defer Lance commits to end-of-query so a mid-query failure +/// leaves the touched table at the pre-mutation HEAD instead of drifting +/// ahead. /// /// `transaction` is opaque from our side — Lance owns its semantics. We /// commit it via `CommitBuilder::execute(transaction)` (see -/// `TableStore::commit_staged`). `new_fragments` is extracted for -/// read-your-writes within the same query: pass it to -/// `Scanner::with_fragments(...)` so subsequent ops in the same mutation -/// see the staged data alongside the committed snapshot. +/// `TableStore::commit_staged`). +/// +/// For read-your-writes within the same query, `new_fragments` and +/// `removed_fragment_ids` together describe the post-stage view delta: +/// `scan_with_staged` (and `count_rows_with_staged`) compose +/// `committed - removed + new` so subsequent reads see the staged result +/// without double-counting fragments that `Operation::Update` rewrote. +/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites +/// existing fragments would yield duplicate rows (the original fragment +/// stays in the committed manifest while its rewrite shows up in `new_fragments`). #[derive(Debug, Clone)] pub struct StagedWrite { pub transaction: Transaction, + /// Fragments to surface alongside the committed manifest in + /// `Scanner::with_fragments(committed - removed + new)`. For + /// `Operation::Append` these are the freshly-appended fragments. For + /// `Operation::Update` (merge_insert) these are + /// `updated_fragments + new_fragments` (rewrites + freshly-inserted + /// rows). pub new_fragments: Vec, + /// Fragment IDs that this staged write supersedes. The committed + /// manifest must filter these out before being combined with + /// `new_fragments` for read-your-writes scans, otherwise rewrites + /// yield duplicate rows. Empty for `stage_append` (`Operation::Append` + /// adds without removing anything); populated from + /// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`. + pub removed_fragment_ids: Vec, } #[derive(Debug, Clone)] @@ -569,6 +591,8 @@ impl TableStore { Ok(StagedWrite { transaction, new_fragments, + // Append never supersedes existing fragments. + removed_fragment_ids: Vec::new(), }) } @@ -604,22 +628,28 @@ impl TableStore { .execute_uncommitted(stream) .await .map_err(|e| OmniError::Lance(e.to_string()))?; - // Operation::Update { new_fragments, updated_fragments, .. } — + // Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } — // `new_fragments` are the freshly inserted rows; `updated_fragments` // are rewrites of existing fragments that include both retained and - // updated rows. For read-your-writes we surface BOTH so subsequent - // reads see updated rows even before the manifest commits. - let new_fragments = match &uncommitted.transaction.operation { + // updated rows; `removed_fragment_ids` lists the committed-manifest + // fragments that those rewrites supersede. For read-your-writes we + // expose `updated_fragments + new_fragments` and the + // `removed_fragment_ids` so `scan_with_staged` can filter the + // superseded committed fragments before combining — otherwise a + // single merge_insert appears as duplicate rows (original committed + // version + rewritten staged version). + let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation { Operation::Update { new_fragments, updated_fragments, + removed_fragment_ids, .. } => { let mut all = updated_fragments.clone(); all.extend(new_fragments.iter().cloned()); - all + (all, removed_fragment_ids.clone()) } - Operation::Append { fragments } => fragments.clone(), + Operation::Append { fragments } => (fragments.clone(), Vec::new()), other => { return Err(OmniError::manifest_internal(format!( "stage_merge_insert: unexpected Lance operation {:?}", @@ -630,6 +660,7 @@ impl TableStore { Ok(StagedWrite { transaction: uncommitted.transaction, new_fragments, + removed_fragment_ids, }) } @@ -648,17 +679,25 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } - /// Run a scan with optional uncommitted staged fragments visible - /// alongside the committed snapshot. When `staged_fragments` is empty - /// this is identical to `scan(...)`. + /// Run a scan with optional uncommitted staged writes visible + /// alongside the committed snapshot. When `staged` is empty this is + /// identical to `scan(...)`. + /// + /// Composes the visible fragment list as `committed - removed + new`: + /// the committed manifest's fragments, minus any fragment IDs that + /// staged `Operation::Update`s (merge_insert rewrites) have superseded, + /// plus the staged new/updated fragments. Without the `removed` + /// filter, a merge_insert that rewrites an existing fragment would + /// surface twice — once via the original committed fragment, once via + /// the rewrite in `new_fragments`. pub async fn scan_with_staged( &self, ds: &Dataset, - staged_fragments: &[Fragment], + staged: &[StagedWrite], projection: Option<&[&str]>, filter: Option<&str>, ) -> Result> { - if staged_fragments.is_empty() { + if staged.is_empty() { return self.scan(ds, projection, filter, None).await; } let mut scanner = ds.scan(); @@ -673,13 +712,7 @@ impl TableStore { .filter(f) .map_err(|e| OmniError::Lance(e.to_string()))?; } - // Combine committed fragments (from the dataset's manifest at - // current version) with the supplied staged fragments. Lance's - // Scanner::with_fragments scopes the scan to exactly this list — - // pull committed fragments out of the dataset's manifest. - let mut combined: Vec = ds.manifest.fragments.iter().cloned().collect(); - combined.extend(staged_fragments.iter().cloned()); - scanner.with_fragments(combined); + scanner.with_fragments(combine_committed_with_staged(ds, staged)); let stream = scanner .try_into_stream() .await @@ -690,16 +723,17 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } - /// `count_rows` variant that respects staged fragments. Used for + /// `count_rows` variant that respects staged writes. Used for /// edge-cardinality validation that needs to see staged edges before - /// commit. + /// commit. Same `committed - removed + new` composition as + /// `scan_with_staged`. pub async fn count_rows_with_staged( &self, ds: &Dataset, - staged_fragments: &[Fragment], + staged: &[StagedWrite], filter: Option, ) -> Result { - if staged_fragments.is_empty() { + if staged.is_empty() { return self.count_rows(ds, filter).await; } let mut scanner = ds.scan(); @@ -708,9 +742,7 @@ impl TableStore { .filter(&f) .map_err(|e| OmniError::Lance(e.to_string()))?; } - let mut combined: Vec = ds.manifest.fragments.iter().cloned().collect(); - combined.extend(staged_fragments.iter().cloned()); - scanner.with_fragments(combined); + scanner.with_fragments(combine_committed_with_staged(ds, staged)); let count = scanner .count_rows() .await @@ -838,3 +870,34 @@ impl TableStore { .map_err(|e| OmniError::Lance(e.to_string())) } } + +/// Build the `Scanner::with_fragments` argument for read-your-writes: +/// committed manifest fragments minus any fragment IDs superseded by the +/// staged writes, plus the staged `new_fragments`. Order is: +/// 1. committed fragments whose IDs are NOT in any staged +/// `removed_fragment_ids` (preserves committed order), +/// 2. all staged `new_fragments` in stage order. +/// Lance's `Scanner` does not require any particular ordering between +/// committed and staged fragments — `with_fragments` scopes the scan to +/// exactly the supplied list. The dedup matters because merge_insert +/// rewrites a fragment in place at the Lance layer: the rewritten +/// fragment is in `new_fragments`, the original (which it supersedes) is +/// in `committed` until manifest commit, and including both would yield +/// duplicate rows. +fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec { + let removed: std::collections::HashSet = staged + .iter() + .flat_map(|w| w.removed_fragment_ids.iter().copied()) + .collect(); + let mut combined: Vec = ds + .manifest + .fragments + .iter() + .filter(|f| !removed.contains(&f.id)) + .cloned() + .collect(); + for write in staged { + combined.extend(write.new_fragments.iter().cloned()); + } + combined +}