MR-794 step 1: address PR #66 review — track removed_fragment_ids

Three independent automated reviews (Cubic P1, Cursor High, Codex P1)
flagged a real correctness bug in stage_merge_insert: Operation::Update
returns three fields — removed_fragment_ids, updated_fragments,
new_fragments — and we were collecting only the latter two into
StagedWrite.new_fragments while discarding removed_fragment_ids.

That breaks read-your-writes for any merge_insert that rewrites an
existing fragment: scan_with_staged combines the dataset's full committed
manifest with the staged new_fragments, so the *original* committed
fragment (which the rewrite supersedes) and its rewritten version both
end up in the Scanner's fragment list. Result: duplicate rows.

Fix:

- StagedWrite gains `removed_fragment_ids: Vec<u64>` populated from
  Operation::Update; empty for Operation::Append (which never supersedes
  existing fragments).
- scan_with_staged / count_rows_with_staged take `&[StagedWrite]` instead
  of `&[Fragment]` so they have access to both fields.
- A new `combine_committed_with_staged` helper composes the visible
  fragment list as `committed - removed + new`, deduping by fragment ID.

Also addresses cubic's P3 doc-fab note: the StagedWrite doc comment
claimed the type was "used by MutationStaging and the loader" but those
callers don't exist in this PR (they're MR-794 step 2+). Reword to
"defined here for later integration" so the doc doesn't lie about the
current state.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-04-30 15:54:28 +02:00
parent 3601002440
commit 6dc4167291
No known key found for this signature in database

View file

@ -38,21 +38,43 @@ pub struct DeleteState {
}
/// A Lance write that has produced fragment files on object storage but is
/// not yet committed to the dataset's manifest. Used by `MutationStaging`
/// (`exec/mutation.rs`) and the loader to defer Lance commits to
/// end-of-query, so a mid-query failure leaves the touched table at the
/// pre-mutation HEAD instead of drifting ahead.
/// not yet committed to the dataset's manifest. The staged-write primitives
/// are defined here for later integration in `MutationStaging`
/// (`exec/mutation.rs`) and the loader (`loader/mod.rs`) — those rewires
/// land in [MR-794](https://linear.app/modernrelay/issue/MR-794) step 2+.
/// The intent: defer Lance commits to end-of-query so a mid-query failure
/// leaves the touched table at the pre-mutation HEAD instead of drifting
/// ahead.
///
/// `transaction` is opaque from our side — Lance owns its semantics. We
/// commit it via `CommitBuilder::execute(transaction)` (see
/// `TableStore::commit_staged`). `new_fragments` is extracted for
/// read-your-writes within the same query: pass it to
/// `Scanner::with_fragments(...)` so subsequent ops in the same mutation
/// see the staged data alongside the committed snapshot.
/// `TableStore::commit_staged`).
///
/// For read-your-writes within the same query, `new_fragments` and
/// `removed_fragment_ids` together describe the post-stage view delta:
/// `scan_with_staged` (and `count_rows_with_staged`) compose
/// `committed - removed + new` so subsequent reads see the staged result
/// without double-counting fragments that `Operation::Update` rewrote.
/// Without `removed_fragment_ids`, a `stage_merge_insert` that rewrites
/// existing fragments would yield duplicate rows (the original fragment
/// stays in the committed manifest while its rewrite shows up in `new_fragments`).
#[derive(Debug, Clone)]
pub struct StagedWrite {
pub transaction: Transaction,
/// Fragments to surface alongside the committed manifest in
/// `Scanner::with_fragments(committed - removed + new)`. For
/// `Operation::Append` these are the freshly-appended fragments. For
/// `Operation::Update` (merge_insert) these are
/// `updated_fragments + new_fragments` (rewrites + freshly-inserted
/// rows).
pub new_fragments: Vec<Fragment>,
/// Fragment IDs that this staged write supersedes. The committed
/// manifest must filter these out before being combined with
/// `new_fragments` for read-your-writes scans, otherwise rewrites
/// yield duplicate rows. Empty for `stage_append` (`Operation::Append`
/// adds without removing anything); populated from
/// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`.
pub removed_fragment_ids: Vec<u64>,
}
#[derive(Debug, Clone)]
@ -569,6 +591,8 @@ impl TableStore {
Ok(StagedWrite {
transaction,
new_fragments,
// Append never supersedes existing fragments.
removed_fragment_ids: Vec::new(),
})
}
@ -604,22 +628,28 @@ impl TableStore {
.execute_uncommitted(stream)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
// Operation::Update { new_fragments, updated_fragments, .. } —
// Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } —
// `new_fragments` are the freshly inserted rows; `updated_fragments`
// are rewrites of existing fragments that include both retained and
// updated rows. For read-your-writes we surface BOTH so subsequent
// reads see updated rows even before the manifest commits.
let new_fragments = match &uncommitted.transaction.operation {
// updated rows; `removed_fragment_ids` lists the committed-manifest
// fragments that those rewrites supersede. For read-your-writes we
// expose `updated_fragments + new_fragments` and the
// `removed_fragment_ids` so `scan_with_staged` can filter the
// superseded committed fragments before combining — otherwise a
// single merge_insert appears as duplicate rows (original committed
// version + rewritten staged version).
let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
Operation::Update {
new_fragments,
updated_fragments,
removed_fragment_ids,
..
} => {
let mut all = updated_fragments.clone();
all.extend(new_fragments.iter().cloned());
all
(all, removed_fragment_ids.clone())
}
Operation::Append { fragments } => fragments.clone(),
Operation::Append { fragments } => (fragments.clone(), Vec::new()),
other => {
return Err(OmniError::manifest_internal(format!(
"stage_merge_insert: unexpected Lance operation {:?}",
@ -630,6 +660,7 @@ impl TableStore {
Ok(StagedWrite {
transaction: uncommitted.transaction,
new_fragments,
removed_fragment_ids,
})
}
@ -648,17 +679,25 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// Run a scan with optional uncommitted staged fragments visible
/// alongside the committed snapshot. When `staged_fragments` is empty
/// this is identical to `scan(...)`.
/// Run a scan with optional uncommitted staged writes visible
/// alongside the committed snapshot. When `staged` is empty this is
/// identical to `scan(...)`.
///
/// Composes the visible fragment list as `committed - removed + new`:
/// the committed manifest's fragments, minus any fragment IDs that
/// staged `Operation::Update`s (merge_insert rewrites) have superseded,
/// plus the staged new/updated fragments. Without the `removed`
/// filter, a merge_insert that rewrites an existing fragment would
/// surface twice — once via the original committed fragment, once via
/// the rewrite in `new_fragments`.
pub async fn scan_with_staged(
&self,
ds: &Dataset,
staged_fragments: &[Fragment],
staged: &[StagedWrite],
projection: Option<&[&str]>,
filter: Option<&str>,
) -> Result<Vec<RecordBatch>> {
if staged_fragments.is_empty() {
if staged.is_empty() {
return self.scan(ds, projection, filter, None).await;
}
let mut scanner = ds.scan();
@ -673,13 +712,7 @@ impl TableStore {
.filter(f)
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
// Combine committed fragments (from the dataset's manifest at
// current version) with the supplied staged fragments. Lance's
// Scanner::with_fragments scopes the scan to exactly this list —
// pull committed fragments out of the dataset's manifest.
let mut combined: Vec<Fragment> = ds.manifest.fragments.iter().cloned().collect();
combined.extend(staged_fragments.iter().cloned());
scanner.with_fragments(combined);
scanner.with_fragments(combine_committed_with_staged(ds, staged));
let stream = scanner
.try_into_stream()
.await
@ -690,16 +723,17 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))
}
/// `count_rows` variant that respects staged fragments. Used for
/// `count_rows` variant that respects staged writes. Used for
/// edge-cardinality validation that needs to see staged edges before
/// commit.
/// commit. Same `committed - removed + new` composition as
/// `scan_with_staged`.
pub async fn count_rows_with_staged(
&self,
ds: &Dataset,
staged_fragments: &[Fragment],
staged: &[StagedWrite],
filter: Option<String>,
) -> Result<usize> {
if staged_fragments.is_empty() {
if staged.is_empty() {
return self.count_rows(ds, filter).await;
}
let mut scanner = ds.scan();
@ -708,9 +742,7 @@ impl TableStore {
.filter(&f)
.map_err(|e| OmniError::Lance(e.to_string()))?;
}
let mut combined: Vec<Fragment> = ds.manifest.fragments.iter().cloned().collect();
combined.extend(staged_fragments.iter().cloned());
scanner.with_fragments(combined);
scanner.with_fragments(combine_committed_with_staged(ds, staged));
let count = scanner
.count_rows()
.await
@ -838,3 +870,34 @@ impl TableStore {
.map_err(|e| OmniError::Lance(e.to_string()))
}
}
/// Build the `Scanner::with_fragments` argument for read-your-writes:
/// committed manifest fragments minus any fragment IDs superseded by the
/// staged writes, plus the staged `new_fragments`. Order is:
/// 1. committed fragments whose IDs are NOT in any staged
/// `removed_fragment_ids` (preserves committed order),
/// 2. all staged `new_fragments` in stage order.
/// Lance's `Scanner` does not require any particular ordering between
/// committed and staged fragments — `with_fragments` scopes the scan to
/// exactly the supplied list. The dedup matters because merge_insert
/// rewrites a fragment in place at the Lance layer: the rewritten
/// fragment is in `new_fragments`, the original (which it supersedes) is
/// in `committed` until manifest commit, and including both would yield
/// duplicate rows.
fn combine_committed_with_staged(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
let removed: std::collections::HashSet<u64> = staged
.iter()
.flat_map(|w| w.removed_fragment_ids.iter().copied())
.collect();
let mut combined: Vec<Fragment> = ds
.manifest
.fragments
.iter()
.filter(|f| !removed.contains(&f.id))
.cloned()
.collect();
for write in staged {
combined.extend(write.new_fragments.iter().cloned());
}
combined
}