feat(engine): Stage the delete path; retire the inline-delete residual (#308)

* test(engine): pin zero-row cascade delete must not drift an edge table (red)

A delete <Node> cascades a delete_where into every incident edge type. The
inline delete_where (Dataset::delete) advances Lance HEAD even when zero edges
match, but the cascade records the new version only if deleted_rows > 0 — so a
node with no incident edges leaves edge:Knows HEAD>manifest drift, which trips
the next strict write's ExpectedVersionMismatch and repair refuses it.

Red today: edge:Knows manifest=v5, Lance HEAD=v6. Goes green when delete moves
to the staged two-phase path (iss-950, Lance 7.0 DeleteBuilder::execute_uncommitted),
where a 0-row delete commits no Lance version and the deleted_rows>0 gate becomes
correct by construction.

* fix(engine): a zero-row delete must not advance Lance HEAD

Lance's Dataset::delete commits a new version even when the predicate matches
nothing (build_transaction always emits Operation::Delete), so a node delete
that cascades a delete_where into an incident edge type with no matching edges
advanced that edge table's Lance HEAD while the cascade skipped record_inline
(gated on deleted_rows > 0) — leaving HEAD>manifest drift that wedged the next
strict write and that repair refused as suspicious/unverifiable.

Use Lance 7.0's two-phase DeleteBuilder::execute_uncommitted to read
num_deleted_rows before committing: a no-match delete now advances nothing (no
version, no drift) and the existing deleted_rows>0 gate is correct by
construction. Non-zero deletes commit the staged transaction with
skip_auto_cleanup + affected_rows (parity with the prior inline path).

First step of the staged-delete migration (iss-950); turns the
node_delete_with_no_incident_edges_leaves_no_edge_table_drift regression green.

* feat(engine): stage_delete two-phase primitive (MR-A step 0)

Add TableStore::stage_delete (Lance 7.0 DeleteBuilder::execute_uncommitted),
the two-phase analogue of stage_merge_insert: writes deletion files without
advancing Lance HEAD, returns Option<StagedWrite> (None on 0 rows = true no-op),
carrying the deletion-vector updated_fragments as new_fragments and the
superseded originals as removed_fragment_ids so combine_committed_with_staged
makes the deletion visible to in-query reads.

No affected_rows is threaded: like stage_merge_insert's Operation::Update commit,
the staged delete relies on OmniGraph's per-table write queue + manifest CAS, not
Lance's per-dataset conflict resolver (commit_staged is a single attempt).

Flip the two residual guards to the staged path: staged_writes.rs now asserts
stage_delete does NOT advance HEAD and that a staged delete is read-your-writes
visible (the deletion-vector RYW proof D2 retirement depends on); the
lance_surface_guards delete guard pins execute_uncommitted's UncommittedDelete.

No behavior change yet (callers still use delete_where); Step 1 wires them.

* feat(engine): TableStorage::stage_delete + migrate merge delete path (MR-A step 1a)

Add stage_delete/Option<StagedHandle> to the TableStorage trait (delegates to
TableStore::stage_delete). Migrate the two branch_merge delete sites
(three-way RewriteMerged + adopt delta) from the inline delete_where residual to
stage_delete + commit_staged — identical in shape to the stage_merge_insert +
commit_staged pair above each. HEAD still advances within the merge sequence
(via commit_staged), under the unchanged SidecarKind::BranchMerge Phase-B
confirmation; the _pre_delete/_pre_index failpoints fire by position, unchanged.

merge_truth_table, branching, composite_flow green.

* feat(engine): migrate all delete sites to staged path, retire inline delete (MR-A step 1b/1c)

Routes every delete through the staged write path so delete never advances
Lance HEAD inline — the last inline-commit residual on the mutation path is
gone. `MutationStaging` now accumulates delete predicates (`record_delete`)
alongside pending write batches; at end-of-query `stage_all` combines a
table's predicates into one `(p1) OR (p2) …` `stage_delete` (a deletion-vector
transaction, no HEAD advance) and `commit_all` commits it through the same
`commit_staged` path as inserts/updates. Deletes are now ordinary staged
entries: one sidecar pin at `expected + 1`, no inline special-casing.

Migrated callers (all 5): the 3 mutation.rs sites (delete-node, cascade,
delete-edge) and the 2 merge.rs sites (already on stage_delete in step 1a).
`affected_edges`/`affected` move from post-inline-commit `deleted_rows` to a
committed `count_rows` at record time — exact under D₂, bounded by the cascade
working set. A predicate matching zero rows stages nothing (the staged
equivalent of the old "skip record_inline on 0 deleted rows"), so the zero-row
edge-table drift class stays closed by construction.

Retired scaffolding now that no caller remains:
- `MutationStaging.inline_committed` + `record_inline` → `delete_predicates` +
  `record_delete`; `StagedMutation.inline_committed`/`paths` fields and all the
  `commit_all` inline handling (queue keys, sidecar pins with the
  `record_inline` table_version special-case, the inline recheck loop).
- `open_table_for_mutation`'s post-inline-commit reopen branch (deletes no
  longer advance HEAD mid-query, so a second touch reopens at the pinned
  version like any write).
- `InlineCommitResidual::delete_where` + its `TableStore` impl, the orphaned
  `TableStore::delete_where`, and `DeleteState`. `InlineCommitResidual` now
  carries only `create_vector_index` (Lance #6666 still open).

D₂ stays for now: staged-delete read-your-writes doesn't yet compose into the
pending accumulator (insert-then-delete on one table), so mixed
insert/update/delete in one query is still rejected at parse time. Retiring D₂
is step 2. Doc comments updated to match across exec/, storage_layer, db/.

Tests (all green): writes, consistency, validators, end_to_end, composite_flow,
merge_truth_table, maintenance, recovery, staged_writes, forbidden_apis,
lance_surface_guards, changes, point_in_time (286), plus failpoints (63).

* docs: delete is a staged write, not an inline-commit residual (MR-A step 1)

Update the docs that described `delete` as the inline-commit residual now that
MR-A routes it through `stage_delete`. Always-loaded surfaces (AGENTS.md rule
4 / capability matrix, invariants.md Invariant 4 / truth matrix / known gaps)
plus the dev write-path docs (writes.md, execution.md incl. its mutation
sequence diagram, architecture.md) now state: deletes accumulate as predicates
and stage like inserts/updates, no inline HEAD advance; `InlineCommitResidual`
carries only `create_vector_index` (Lance #6666). The parse-time D₂ rule is
documented as retained — not because delete inline-commits, but because
staged-delete read-your-writes is not yet wired into the pending accumulator
(MR-A step 2). lance.md's 7.0 audit note marked MR-A as landed.

* docs: D₂ is a deliberate boundary, not temporary scaffolding (MR-A close-out)

After MR-A staged the delete path, D₂ (a mutation query is insert/update-only
OR delete-only) was left framed as temporary — "until Lance ships two-phase
delete" / "retire in step 2". Lance shipped that and we used it for the
inline-commit fix; D₂'s original justification is gone. It now stands for a
different, permanent reason: keeping a query to one kind keeps its
read-your-writes unambiguous and each table to one version per query. Retiring
it would buy single-commit mixed atomicity (cheap workaround: split, or a
branch) at the cost of an in-query delete view, pending pruning, edge
id-resolution, and two-commit-per-table ordering in the hot mutation path —
complexity not worth earning. Decision: keep D₂ as a deliberate boundary.

Reframes the now-stale wording everywhere, no logic change:
- The D₂ parse-time error message no longer promises "this restriction lifts
  when Lance exposes a two-phase delete API"; it states the boundary and points
  to a branch+merge for one atomic commit.
- `enforce_no_mixed_destructive_constructive` doc, AGENTS.md, invariants.md
  (Invariant 4 / truth matrix / removed from the known-gaps), writes.md,
  architecture.md, lance.md, and the user mutations doc (which wrongly said
  deletes "commit through a different path" — both stage now).
- Swept remaining stale `delete_where` mentions left from the Step-1 migration:
  the merge.rs "swap when upstream ships" comments (already swapped), the
  forbidden_apis / table_ops residual notes, the staged_writes vector-index
  guard doc (was "same as stage_delete's absence" — stage_delete now exists),
  and test comments/assert messages in recovery/maintenance/writes/failpoints.
  Genuinely-historical records (dated Lance audit, rfc-013, bug-case-fix) left.

Verified: engine builds warning-free; check-agents-md OK; writes/maintenance/
recovery/staged_writes/forbidden_apis all green. Closes MR-A.

* test(engine): overlapping delete predicates must not double-count affected_* (red)

Reproduces a reporting regression from the staged-delete migration flagged in
PR #308 review. Because deletes now stage (instead of inline-committing), two
delete statements in one query both scan the same unchanged committed snapshot;
counting each predicate independently over-reports `affected_*` when they
overlap. The old inline path committed each delete before the next ran, so it
counted distinct.

`delete Person where name = "Alice"` then `delete Person where age > 29` over
the standard fixture (Alice 30, Charlie 35) removes 2 distinct nodes and 3
distinct edges, but the buggy per-statement counting returns 3 nodes / 6 edges.
RED at this commit (asserts left=3, right=2).

* fix(engine): dedup overlapping delete predicates when counting affected_*

Count each delete statement against the committed snapshot MINUS the predicates
a prior delete statement on the same table already recorded:
`(pred) AND NOT ((prior1) OR (prior2) …)`. Summed over statements this is
inclusion-exclusion — `Σ |pₙ \ (p₁ ∪ …)| = |p₁ ∪ p₂ ∪ …|` — exactly the distinct
count the combined `(p1) OR (p2)` staged delete removes. Works for nodes and
edges alike with no edge identity needed; the node ID scan uses the same
exclusion so a later statement also doesn't re-cascade already-deleted nodes.
The ORIGINAL predicate is still what gets recorded (the staged delete removes
the union); only the count uses the exclusion. The common single-delete path is
unchanged (`prior` empty → filter is just the base predicate).

New helper `dedup_delete_filter` + `MutationStaging::recorded_delete_predicates`.
Turns the red regression test green (2 nodes / 3 edges); writes (33),
end_to_end, validators, maintenance, recovery, composite_flow, merge_truth_table,
consistency, changes, and failpoints (63) all stay green.

* test(engine): delete dedup must not drop NULL-column rows (red)

Follow-up to the overlapping-delete fix flagged in PR #308 review (Greptile P1):
the `(base) AND NOT (prior)` exclusion breaks under SQL three-valued logic. If a
prior delete predicate references a NULLable column, a later statement's
matching row whose column is NULL makes `prior` evaluate to UNKNOWN, `NOT
UNKNOWN` is UNKNOWN, and the row is filtered out of the scan — even though the
prior delete never matched it. That drops it from `deleted_ids`, skipping its
cascade (orphaned edges) or, if it is the only match, leaving the node
undeleted. A data bug, not just a miscount.

Data: Charlie(age 35), Zoe(age NULL); Knows Zoe→Charlie. `delete Person where
age > 30` then `delete Person where name = "Zoe"`. Under the buggy `NOT`, Zoe's
scan `(name='Zoe') AND NOT (age>30)` is UNKNOWN → Zoe survives. RED at this
commit (Person count left=1, right=0).

* fix(engine): NULL-safe delete dedup — exclude only definitely-matched prior rows

Change `dedup_delete_filter` from `(base) AND NOT (prior)` to
`(base) AND ((prior) IS NOT TRUE)`. `IS NOT TRUE` keeps both FALSE and UNKNOWN
rows, so a prior predicate that evaluates to SQL UNKNOWN (a NULL in a referenced
column) no longer drops a row this statement legitimately matches — only rows a
prior predicate matched as definitely TRUE are excluded from the count/scan. The
distinct-count semantics are unchanged for non-NULL data.

Turns the red NULL-dedup test green (Zoe deleted, her edge cascaded), and the
overlapping-dedup + writes/end_to_end/validators/maintenance/recovery/
composite_flow/consistency suites stay green.

* docs(engine): note dedup_delete_filter's load-bearing dependency on D₂

Self-review follow-up: the overlapping-delete dedup assumes the committed
snapshot is invariant across a query's statements, which holds only because D₂
forbids mixing writes with deletes (so a delete-touched table has no pending
writes). Make that dependency explicit at the function so a future D₂ relaxation
is forced to revisit the dedup. Comment-only.

* Preserve staged write commit metadata
This commit is contained in:
Ragnor Comerford 2026-06-27 16:48:41 +02:00 committed by GitHub
parent a7d4cba53d
commit 0dcdcf5a9d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 996 additions and 535 deletions

View file

@ -19,6 +19,7 @@ failpoints = ["dep:fail", "fail/failpoints"]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.7.2" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.7.2" }
lance = { workspace = true }
lance-core = { workspace = true }
lance-datafusion = { workspace = true }
datafusion = { workspace = true }
lance-file = { workspace = true }

View file

@ -154,7 +154,7 @@ pub struct Omnigraph {
/// write-serialization mechanism (the server holds the engine as a
/// lockless `Arc<Omnigraph>`). Reachable from engine internals
/// (mutation finalize, schema_apply, branch_merge, ensure_indices,
/// delete_where, the fork path, recovery reconciler).
/// the fork path, recovery reconciler).
write_queue: Arc<crate::db::write_queue::WriteQueueManager>,
/// Process-wide mutex held across the swap → operate → restore window
/// in `branch_merge_impl`. Two concurrent merges with distinct targets
@ -702,13 +702,14 @@ impl Omnigraph {
&self.table_store
}
/// Inline-commit residual surface (`delete_where`,
/// `create_vector_index`) — the writes Lance cannot yet express as a
/// stage-then-commit pair. Deliberately separate from [`Self::storage`] so
/// the default storage surface is staged-only and a new writer cannot couple
/// "write bytes" with "advance HEAD" by reaching for `db.storage()`. Only
/// the handful of documented residual call sites (mutation/merge deletes,
/// vector-index build) use this accessor. See
/// Inline-commit residual surface (`create_vector_index`) — the sole
/// write Lance cannot yet express as a stage-then-commit pair (segment
/// commit needs `build_index_metadata_from_segments`, Lance #6666).
/// Deliberately separate from [`Self::storage`] so the default storage
/// surface is staged-only and a new writer cannot couple "write bytes" with
/// "advance HEAD" by reaching for `db.storage()`. Only the vector-index
/// build uses this accessor — delete migrated to the staged path
/// (`stage_delete`) in MR-A. See
/// `crate::storage_layer::InlineCommitResidual` for the per-method blocker.
pub(crate) fn storage_inline_residual(
&self,
@ -726,7 +727,7 @@ impl Omnigraph {
/// Per-`(table_key, branch)` writer queues.
///
/// Engine-internal writers (mutation finalize, schema_apply,
/// branch_merge, ensure_indices, delete_where) and the future MR-870
/// branch_merge, ensure_indices) and the future MR-870
/// recovery reconciler reach the queue manager via this accessor.
/// Returns an `Arc` clone so callers can hold the manager across
/// `&mut self` engine API boundaries.

View file

@ -1171,8 +1171,9 @@ async fn prepare_updates_for_commit(
// its just-committed version. When a table's handle is present, the index
// build below reuses it and SKIPS the `reopen_for_mutation` open. Absent
// entries (other writers — schema apply, merge, ensure_indices, tests —
// pass `HashMap::new()`; inline-committed/delete tables are never staged)
// keep the byte-identical `reopen_for_mutation` path.
// pass `HashMap::new()`) keep the byte-identical `reopen_for_mutation`
// path. Delete tables ARE staged now (MR-A), so their handle is present
// like any other staged write.
mut committed_handles: std::collections::HashMap<String, SnapshotHandle>,
) -> Result<Vec<crate::db::SubTableUpdate>> {
if updates.is_empty() {

View file

@ -5,10 +5,10 @@
//! disjoint-key writes proceed concurrently and only writes to the same
//! `(table_key, branch_ref)` serialize here. This module owns the queue
//! data structure; callers in `MutationStaging::commit_all`, `branch_merge`,
//! `schema_apply`, `ensure_indices`, `delete_where`, the fork path (first
//! write to a table on a branch — acquired before the fork, held through the
//! manifest publish), and the recovery reconciler acquire guards before any
//! per-table Lance commit. Serialization is in-process only; cross-process
//! `schema_apply`, `ensure_indices`, the fork path (first write to a table on
//! a branch — acquired before the fork, held through the manifest publish),
//! and the recovery reconciler acquire guards before any per-table Lance
//! commit. Serialization is in-process only; cross-process
//! writers on one graph remain one-winner-CAS at the manifest publish.
//!
//! ## Why exclusive `tokio::sync::Mutex<()>` per key

View file

@ -1065,9 +1065,10 @@ async fn publish_rewritten_merge_table(
staged: &StagedMergeResult,
) -> Result<crate::db::SubTableUpdate> {
// Branch merge's source-rewrite path is Merge-shaped (upsert from
// source onto target). The inline `delete_where` later in this
// function operates on rows the rewrite chose to remove, not
// user-facing predicates, so Merge is the correct policy here.
// source onto target). The staged delete later in this function
// (`stage_delete` + `commit_staged`) operates on rows the rewrite chose
// to remove, not user-facing predicates, so Merge is the correct policy
// here.
// `open_for_mutation` is the no-txn entry, so collapse #1's non-strict
// open-skip (gated on `txn.is_some()`) never fires here — the handle is
// always `Some`.
@ -1130,15 +1131,10 @@ async fn publish_rewritten_merge_table(
// See tests/failpoints.rs::branch_merge_rewrite_partial_after_merge_rolls_back.
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_REWRITE_AFTER_MERGE_PRE_DELETE)?;
// Phase 2: delete removed rows via deletion vectors.
//
// INLINE-COMMIT RESIDUAL: lance-6.0.1 does not expose a public
// two-phase delete API (DeleteJob is `pub(crate)` —
// lance-format/lance#6658 is open with no PRs). We deliberately do
// NOT introduce a `stage_delete` wrapper that would secretly
// inline-commit (it would create a side-channel between the staged
// and inline write paths). When the upstream API ships, swap this
// `delete_where` call for `stage_delete` + `commit_staged`.
// Phase 2: delete removed rows via deletion vectors, staged through
// `stage_delete` + `commit_staged` (MR-A — Lance 7.0's
// `DeleteBuilder::execute_uncommitted`, #6658, made delete a two-phase
// staged write, so this no longer inline-commits).
if !staged.deleted_ids.is_empty() {
let escaped: Vec<String> = staged
.deleted_ids
@ -1146,11 +1142,12 @@ async fn publish_rewritten_merge_table(
.map(|id| format!("'{}'", id.replace('\'', "''")))
.collect();
let filter = format!("id IN ({})", escaped.join(", "));
let (new_ds, _) = target_db
.storage_inline_residual()
.delete_where(&full_path, current_ds, &filter)
.await?;
current_ds = new_ds;
if let Some(staged_delete) = target_db.storage().stage_delete(&current_ds, &filter).await? {
current_ds = target_db
.storage()
.commit_staged(current_ds, staged_delete)
.await?;
}
}
// Failpoint: crash after the Phase 2 delete commit, before the index build.
@ -1310,8 +1307,8 @@ async fn publish_adopted_delta(
// tests/failpoints.rs::branch_merge_adopt_partial_after_upsert_rolls_back.
crate::failpoints::maybe_fail(crate::failpoints::names::BRANCH_MERGE_ADOPT_AFTER_UPSERT_PRE_DELETE)?;
// Phase 2: delete removed rows via deletion vectors (inline-commit residual,
// same as the three-way path until Lance ships a public two-phase delete).
// Phase 2: delete removed rows via deletion vectors, staged through
// `stage_delete` + `commit_staged` (same as the three-way path; MR-A).
if !delta.deleted_ids.is_empty() {
let escaped: Vec<String> = delta
.deleted_ids
@ -1319,11 +1316,12 @@ async fn publish_adopted_delta(
.map(|id| format!("'{}'", id.replace('\'', "''")))
.collect();
let filter = format!("id IN ({})", escaped.join(", "));
let (new_ds, _) = target_db
.storage_inline_residual()
.delete_where(&full_path, current_ds, &filter)
.await?;
current_ds = new_ds;
if let Some(staged_delete) = target_db.storage().stage_delete(&current_ds, &filter).await? {
current_ds = target_db
.storage()
.commit_staged(current_ds, staged_delete)
.await?;
}
}
// Phase 4: index coverage is reconciler-owned on the adopt path. Unlike the
@ -1597,7 +1595,7 @@ impl Omnigraph {
// Pin `RewriteMerged` and `AdoptWithDelta` candidates — both advance
// Lance HEAD before the manifest publish (RewriteMerged via
// publish_rewritten_merge_table; AdoptWithDelta via publish_adopted_delta:
// stage_append + stage_merge_insert + delete_where + index — multiple
// stage_append + stage_merge_insert + stage_delete + index — multiple
// commit_staged calls per table, which the loose classification handles
// as multi-step drift).
//

View file

@ -565,42 +565,28 @@ fn apply_assignments(
use super::staging::{MutationStaging, PendingMode};
/// Open a sub-table dataset for read or inline-commit-write within the
/// current mutation query, capturing pre-write metadata in `staging` on
/// first touch. The captured version is the publisher's CAS fence at
/// end-of-query (per-table OCC).
/// Open a sub-table dataset for read or staged write within the current
/// mutation query, capturing pre-write metadata in `staging` on first touch.
/// The captured version is the publisher's CAS fence at end-of-query
/// (per-table OCC).
///
/// On first touch, opens the dataset at HEAD on the requested branch
/// via `open_for_mutation_on_branch`, which compares Lance HEAD against
/// the manifest's pinned version — that fence is the engine's
/// publisher-style OCC catching cross-writer drift before we make any
/// changes. For delete-only queries, this strict open is also the uncovered
/// drift guard that runs before `delete_where` can inline-commit.
/// drift guard.
///
/// On subsequent touches *within the same query*, behavior depends on
/// whether the table has already been inline-committed by a delete op:
///
/// - **Insert / update path (no inline commit between touches).** Lance
/// HEAD has not moved since first touch, so a fresh
/// `open_for_mutation_on_branch` would still match the manifest
/// pinned version. We just go through it again; `ensure_path` is a
/// no-op (idempotent on the captured `expected_version`).
/// - **Delete cascade or multi-delete on the same table.** A prior
/// `delete_where` on this table has already advanced Lance HEAD past
/// the manifest's pinned version (the manifest doesn't move until
/// end-of-query). Going through `open_for_mutation_on_branch` again
/// would trip its `ensure_expected_version` equality check
/// (`actual = pinned + 1` vs `expected = pinned`). Instead we route
/// through `reopen_for_mutation` at the post-inline-commit Lance
/// version captured in `staging.inline_committed[table_key]`, which
/// is the source of truth for "where is Lance HEAD right now on
/// this table within this query."
///
/// The `inline_committed` reopen branch closes the multi-delete-on-same-table
/// failure path that pre-staged-write engines inherited. The branch goes
/// away once Lance exposes a two-phase delete API
/// ([lance-format/lance#6658](https://github.com/lance-format/lance/issues/6658))
/// and we can stage deletes on the same path as inserts/updates.
/// On subsequent touches *within the same query*, Lance HEAD has not moved
/// since first touch — inserts, updates AND deletes all stage their work and
/// defer every HEAD advance to the end-of-query commit, so no op inline-commits
/// between touches. A fresh `open_for_mutation_on_branch` therefore still
/// matches the manifest pinned version; we go through it again and `ensure_path`
/// is a no-op (idempotent on the captured `expected_version`). This holds for a
/// delete cascade or multiple delete statements hitting the same table: each
/// touch records another predicate (`record_delete`), and `stage_all` combines
/// them into one staged delete — there is no post-inline-commit reopen to
/// special-case anymore.
impl Omnigraph {
/// Resolve a LIVE-HEAD read handle for an edge table's committed-state `@card`
/// scan when collapse #1 skipped the accumulation open. The edge-insert path no
@ -646,28 +632,6 @@ async fn open_table_for_mutation(
op_kind: crate::db::MutationOpKind,
txn: Option<&crate::db::WriteTxn>,
) -> Result<(Option<SnapshotHandle>, String, Option<String>)> {
if let Some(prior) = staging.inline_committed.get(table_key) {
let path = staging.paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"open_table_for_mutation: inline_committed[{}] without paths entry",
table_key
))
})?;
// The inline-committed reopen does NOT validate the schema contract
// (it reopens at the post-inline-commit Lance version directly), so it
// takes no `txn` — threading it here would change nothing. Deletes are
// strict ops, so this always opens (returns `Some`).
let ds = db
.reopen_for_mutation(
table_key,
&path.full_path,
path.table_branch.as_deref(),
prior.table_version,
op_kind,
)
.await?;
return Ok((Some(ds), path.full_path.clone(), path.table_branch.clone()));
}
// `open_for_mutation_on_branch` returns the expected version even when it
// skips the open (collapse #1, the non-strict insert/merge path): the version
// is the pinned base's, identical to the opened handle's `.version()`. Use it
@ -694,17 +658,62 @@ async fn open_table_for_mutation(
Ok((opened.handle, opened.full_path, opened.table_branch))
}
/// Build the committed-snapshot filter used to COUNT a delete statement's
/// `affected_*`, excluding rows a prior delete statement on the same table
/// already scheduled for removal in this query.
///
/// Deletes stage — they no longer inline-commit — so every statement in a
/// delete-only query scans the same unchanged committed snapshot. Counting each
/// predicate independently would double-count overlapping statements (the old
/// inline path did not, because each delete committed before the next ran). The
/// combined staged delete actually removes the UNION `p₁ p₂ …`; excluding
/// the prior predicates here makes each statement contribute `|pₙ \ (p₁ …)|`,
/// whose sum is exactly that distinct count. `base` (the original predicate) is
/// still what gets recorded — only the count uses this exclusion.
///
/// LOAD-BEARING on D₂: this exclusion assumes the committed snapshot is
/// invariant across the query's statements, which holds only because D₂
/// (`enforce_no_mixed_destructive_constructive`) forbids mixing inserts/updates
/// with deletes — so a delete-touched table never has pending writes that would
/// shift what a later statement sees. If D₂ is ever relaxed, this dedup must be
/// revisited (a later delete would then need to see prior in-query writes).
///
/// The exclusion uses `IS NOT TRUE`, not `NOT`, because of SQL three-valued
/// logic: a prior predicate referencing a column that is NULL for some row
/// (e.g. `age > 30` on a row with NULL `age`) evaluates to UNKNOWN, and
/// `NOT UNKNOWN` is still UNKNOWN — which a `WHERE` treats as not-matched, so
/// the row would be wrongly dropped from this statement's scan even though the
/// prior delete never matched it (dropping it from `deleted_ids` skips its
/// cascade, or — if it is the only match — leaves the node undeleted). Only
/// rows a prior predicate matched as definitely TRUE should be excluded:
/// `(prior) IS NOT TRUE` keeps both FALSE and UNKNOWN rows.
fn dedup_delete_filter(base: &str, prior: &[String]) -> String {
if prior.is_empty() {
base.to_string()
} else {
let excluded = prior
.iter()
.map(|p| format!("({p})"))
.collect::<Vec<_>>()
.join(" OR ");
format!("({base}) AND (({excluded}) IS NOT TRUE)")
}
}
/// D₂ parse-time check: a single mutation query is either insert/update-only
/// or delete-only. Mixed → reject before any I/O.
///
/// Reason: under the staged-write writer, inserts and updates
/// accumulate in memory and commit at end-of-query, while deletes still
/// inline-commit (Lance lacks a public two-phase delete in 6.0.1).
/// Mixing creates ordering hazards (same-row insert→delete becomes a no-op
/// because the staged insert isn't visible to delete; cascading deletes
/// of just-inserted edges break referential integrity by silent design).
/// Until Lance exposes `DeleteJob::execute_uncommitted`, the parse-time
/// rejection keeps both paths atomic and correct.
/// This is a deliberate semantic boundary, not temporary scaffolding. Inserts
/// and updates accumulate as pending in-memory batches and deletes accumulate
/// as predicates; both stage and commit at end-of-query. Keeping a single query
/// to one kind means read-your-writes stays unambiguous (a read never has to
/// reconcile pending inserts against same-query delete predicates) and each
/// touched table commits at most one version per query. Compose mixed
/// operations by issuing separate atomic mutations (writes, then deletes), or a
/// branch + merge when one atomic commit is required. Allowing mixing would
/// instead demand an in-query delete view, pending pruning, and per-table
/// two-commit ordering in the hot mutation path — complexity this boundary
/// deliberately avoids.
fn enforce_no_mixed_destructive_constructive(
ir: &omnigraph_compiler::ir::MutationIR,
) -> Result<()> {
@ -724,8 +733,9 @@ fn enforce_no_mixed_destructive_constructive(
return Err(OmniError::manifest(format!(
"mutation '{}' on the same query mixes inserts/updates and deletes; \
split into separate mutations: (1) inserts and updates, then (2) deletes. \
This restriction lifts when Lance exposes a two-phase delete API \
(tracked: lance-format/lance#6658).",
A query is deliberately constructive or destructive, not both, so its \
read-your-writes stays unambiguous; run the two on a branch and merge \
if you need them in one atomic commit.",
ir.name
)));
}
@ -804,11 +814,11 @@ impl Omnigraph {
let resolved_params = enrich_mutation_params(params)?;
// Per-query staging accumulator. Inserts and updates push batches
// into `pending`; deletes still inline-commit and record into
// `inline_committed`. At end-of-query, `finalize` issues one
// `stage_*` + `commit_staged` per pending table, then the
// publisher commits the manifest atomically across all touched
// tables. Branch is threaded explicitly — no coordinator swap.
// into `pending`; deletes push predicates into `delete_predicates`. At
// end-of-query, `finalize` issues one `stage_*` + `commit_staged` per
// touched table (inserts/updates/deletes alike), then the publisher
// commits the manifest atomically across all touched tables. Branch is
// threaded explicitly — no coordinator swap.
let mut staging = MutationStaging::default();
// Lower + validate up front so the touched-table set is known before
@ -1365,7 +1375,7 @@ impl Omnigraph {
let pred_sql = predicate_to_sql(predicate, params, false)?;
let table_key = format!("node:{}", type_name);
let (handle, full_path, table_branch) = open_table_for_mutation(
let (handle, _full_path, _table_branch) = open_table_for_mutation(
self,
staging,
branch,
@ -1376,14 +1386,20 @@ impl Omnigraph {
.await?;
// Delete is a STRICT op, so collapse #1 never skips its open.
let ds = handle.expect("strict Delete op always opens its dataset");
let initial_version = ds.version();
// Scan matching IDs for cascade. Per D₂ this never overlaps with
// staged inserts (mixed insert/delete in one query is rejected at
// parse time), so we scan committed only.
// parse time), so we scan committed only. Exclude IDs a prior delete
// statement on this table already scheduled (deletes stage, so the
// committed snapshot is unchanged across statements): without this,
// overlapping predicates would double-count `affected_nodes` AND
// re-cascade already-deleted nodes' edges. The combined staged delete
// still removes the union, so we record the original `pred_sql` below.
let scan_filter =
dedup_delete_filter(&pred_sql, staging.recorded_delete_predicates(&table_key));
let batches = self
.storage()
.scan(&ds, Some(&["id"]), Some(&pred_sql), None)
.scan(&ds, Some(&["id"]), Some(&scan_filter), None)
.await?;
let deleted_ids: Vec<String> = batches
@ -1409,33 +1425,15 @@ impl Omnigraph {
let affected_nodes = deleted_ids.len();
// Delete nodes — still inline-commit (Lance's `Dataset::delete` is
// not exposed as a two-phase op in 6.0.1). D₂ keeps inserts and
// deletes from coexisting in one query, so this advance of Lance
// HEAD is the only HEAD movement during the query and the
// publisher's CAS captures it intact.
let ds = self
.reopen_for_mutation(
&table_key,
&full_path,
table_branch.as_deref(),
initial_version,
crate::db::MutationOpKind::Delete,
)
.await?;
// Record the node delete as a staged predicate. D₂ keeps inserts and
// deletes from coexisting in one query, so this table carries no
// pending write batches; `stage_all` turns the predicate into one
// `stage_delete` (a deletion-vector transaction) that advances Lance
// HEAD only at the unified end-of-query commit — no inline residual.
// `open_table_for_mutation` above already captured the table's
// path/version/op-kind via `ensure_path`.
crate::failpoints::maybe_fail(crate::failpoints::names::MUTATION_DELETE_NODE_PRE_PRIMARY_DELETE)?;
let (_new_ds, delete_state) = self
.storage_inline_residual()
.delete_where(&full_path, ds, &pred_sql)
.await?;
staging.record_inline(crate::db::SubTableUpdate {
table_key: table_key.clone(),
table_version: delete_state.version,
table_branch: table_branch.clone(),
row_count: delete_state.row_count,
version_metadata: delete_state.version_metadata,
});
staging.record_delete(&table_key, pred_sql.clone());
let mut affected_edges = 0usize;
let escaped: Vec<String> = deleted_ids
@ -1465,7 +1463,7 @@ impl Omnigraph {
let edge_table_key = format!("edge:{}", edge_name);
let cascade_filter = cascade_filters.join(" OR ");
let (edge_handle, edge_full_path, edge_table_branch) = open_table_for_mutation(
let (edge_handle, _edge_full_path, _edge_table_branch) = open_table_for_mutation(
self,
staging,
branch,
@ -1477,21 +1475,26 @@ impl Omnigraph {
// Delete is a STRICT op, so collapse #1 never skips its open.
let edge_ds = edge_handle.expect("strict Delete op always opens its dataset");
let (_new_edge_ds, edge_delete) = self
.storage_inline_residual()
.delete_where(&edge_full_path, edge_ds, &cascade_filter)
// `affected_edges` was the post-inline-commit `deleted_rows`; with
// staged deletes the rows aren't removed until end-of-query, so
// count the matching committed edges now. Exact under D₂ (no staged
// inserts can add matches mid-query), and bounded by the cascade
// working set. Exclude edges a prior delete statement (a prior
// cascade, or an explicit edge delete) on this table already
// scheduled, so an edge incident to two deleted nodes — or matched
// by both a cascade and an explicit `delete <Edge>` — is counted
// once. Record the ORIGINAL cascade filter (the combined staged
// delete removes the union); skip only when nothing NEW matches.
let count_filter =
dedup_delete_filter(&cascade_filter, staging.recorded_delete_predicates(&edge_table_key));
let matched = self
.storage()
.count_rows(&edge_ds, Some(count_filter))
.await?;
affected_edges += matched;
affected_edges += edge_delete.deleted_rows;
if edge_delete.deleted_rows > 0 {
staging.record_inline(crate::db::SubTableUpdate {
table_key: edge_table_key,
table_version: edge_delete.version,
table_branch: edge_table_branch,
row_count: edge_delete.row_count,
version_metadata: edge_delete.version_metadata,
});
if matched > 0 {
staging.record_delete(&edge_table_key, cascade_filter);
}
}
@ -1517,7 +1520,7 @@ impl Omnigraph {
let pred_sql = predicate_to_sql(predicate, params, true)?;
let table_key = format!("edge:{}", type_name);
let (handle, full_path, table_branch) = open_table_for_mutation(
let (handle, _full_path, _table_branch) = open_table_for_mutation(
self,
staging,
branch,
@ -1529,20 +1532,21 @@ impl Omnigraph {
// Delete is a STRICT op, so collapse #1 never skips its open.
let ds = handle.expect("strict Delete op always opens its dataset");
let (_new_ds, delete_state) = self
.storage_inline_residual()
.delete_where(&full_path, ds, &pred_sql)
// Count matching committed edges now (the staged delete won't remove
// them until end-of-query). Exact under D₂; exclude edges a prior delete
// statement on this table (an earlier cascade or edge delete) already
// scheduled, so overlapping statements don't double-count. Record the
// ORIGINAL predicate below (the combined staged delete removes the
// union); only record when something NEW matches.
let count_filter =
dedup_delete_filter(&pred_sql, staging.recorded_delete_predicates(&table_key));
let affected = self
.storage()
.count_rows(&ds, Some(count_filter))
.await?;
let affected = delete_state.deleted_rows;
if affected > 0 {
staging.record_inline(crate::db::SubTableUpdate {
table_key,
table_version: delete_state.version,
table_branch,
row_count: delete_state.row_count,
version_metadata: delete_state.version_metadata,
});
staging.record_delete(&table_key, pred_sql.clone());
self.invalidate_graph_index().await;
}

View file

@ -14,9 +14,12 @@
//! This module is shared by the engine's mutation path (`exec/mutation.rs`)
//! and the bulk loader (`loader/mod.rs`); both feed insert/update batches
//! into `pending` and route end-of-query commits through `finalize`.
//! Deletes follow the inline-commit path and are recorded via
//! `record_inline` (parse-time D₂ rule prevents mixed insert/delete in a
//! single query, so no flushing is required).
//! Deletes accumulate as predicates in `delete_predicates` (via
//! `record_delete`) and stage through the same `stage_* → commit_staged`
//! path as writes — `stage_delete` produces a deletion-vector transaction
//! that advances no Lance HEAD until the end-of-query commit. The parse-time
//! D₂ rule keeps inserts/updates and deletes from mixing in one query, so
//! `pending` and `delete_predicates` never overlap on a table.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@ -78,9 +81,9 @@ pub(crate) struct StagedTablePath {
///
/// Replaces the legacy inline-commit `MutationStaging.latest` map with
/// an in-memory accumulator that defers all Lance HEAD advances to
/// end-of-query. After this rewire the bug class "Lance HEAD drifts ahead
/// of `__manifest`" is unreachable in `mutate_as` and `load` for inserts
/// and updates by construction.
/// end-of-query. Inserts, updates AND deletes all stage here, so the bug
/// class "Lance HEAD drifts ahead of `__manifest`" is unreachable in
/// `mutate_as` and `load` by construction.
#[derive(Default)]
pub(crate) struct MutationStaging {
/// Pre-write manifest version per table — the publisher's CAS fence at
@ -90,9 +93,11 @@ pub(crate) struct MutationStaging {
pub(crate) paths: HashMap<String, StagedTablePath>,
/// In-memory accumulated batches per table (insert/update path).
pub(crate) pending: HashMap<String, PendingTable>,
/// Inline-committed updates from delete-touching ops (D₂ guarantees no
/// pending batches exist on a delete-touched table).
pub(crate) inline_committed: HashMap<String, SubTableUpdate>,
/// Per-table delete predicates from delete-touching ops. D₂ guarantees a
/// table is write-XOR-delete within one query, so this never overlaps
/// `pending`. Staged as one combined `stage_delete` per table at
/// end-of-query (no inline HEAD advance) — see `stage_delete_table`.
pub(crate) delete_predicates: HashMap<String, Vec<String>>,
/// Strictest [`MutationOpKind`] seen per table within this query. Drives
/// the op-kind-aware drift check in [`StagedMutation::commit_all`]: for
/// tables whose first or any subsequent touch was a strict op
@ -210,10 +215,29 @@ impl MutationStaging {
Ok(())
}
/// Record a delete that already inline-committed at the Lance layer.
pub(crate) fn record_inline(&mut self, update: SubTableUpdate) {
self.inline_committed
.insert(update.table_key.clone(), update);
/// Record a delete predicate for `table_key`. The caller must have already
/// called `ensure_path` (via `open_table_for_mutation`) so the table's
/// path/version/op-kind are captured. D₂ guarantees a delete-touched table
/// has no pending write batches, so the predicates are staged as one
/// combined `stage_delete` at end-of-query — no inline HEAD advance.
pub(crate) fn record_delete(&mut self, table_key: &str, predicate: String) {
self.delete_predicates
.entry(table_key.to_string())
.or_default()
.push(predicate);
}
/// Delete predicates already recorded for `table_key` by earlier delete
/// statements in this query. Read before recording the current statement's
/// predicate so its `affected_*` count can exclude rows a prior statement
/// already scheduled for deletion (deletes stage, so the committed snapshot
/// is unchanged across statements — without this, overlapping predicates
/// would double-count). `&[]` if none.
pub(crate) fn recorded_delete_predicates(&self, table_key: &str) -> &[String] {
self.delete_predicates
.get(table_key)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
/// Read-your-writes accessor: the accumulated pending batches for
@ -237,10 +261,10 @@ impl MutationStaging {
self.pending.get(table_key).map(|p| p.schema.clone())
}
/// `true` if neither pending nor inline_committed has any state — the
/// query made no observable writes.
/// `true` if neither pending writes nor delete predicates have any state —
/// the query made no observable writes.
pub(crate) fn is_empty(&self) -> bool {
self.pending.is_empty() && self.inline_committed.is_empty()
self.pending.is_empty() && self.delete_predicates.is_empty()
}
/// Total count of pending rows across all tables. Used by tests and
@ -282,7 +306,7 @@ impl MutationStaging {
expected_versions,
paths,
pending,
inline_committed,
delete_predicates,
op_kinds,
} = self;
@ -304,11 +328,13 @@ impl MutationStaging {
stage_inputs.push((table_key, table, path, expected));
}
let concurrency = concurrency.min(stage_inputs.len()).max(1);
let staged_entries = futures::stream::iter(stage_inputs.into_iter().map(
|(table_key, table, path, expected)| async move {
stage_pending_table(db, table_key, table, path, expected).await
},
))
let mut staged_entries: Vec<StagedTableEntry> = futures::stream::iter(
stage_inputs.into_iter().map(
|(table_key, table, path, expected)| async move {
stage_pending_table(db, table_key, table, path, expected).await
},
),
)
.buffered(concurrency)
.collect::<Vec<Result<Option<StagedTableEntry>>>>()
.await
@ -318,11 +344,48 @@ impl MutationStaging {
.flatten()
.collect();
// Second pass: stage deletes through the same staged path. D₂
// guarantees a delete-touched table carries no pending write batches,
// so `delete_predicates` and `pending` are disjoint — each is a fresh
// `StagedTableEntry`, never a merge into a write entry above. Multiple
// predicates on one table (a cascade hitting an edge table twice, or
// two delete statements) combine into a single `(p₁) OR (p₂) …` staged
// delete, so the table advances Lance HEAD exactly once at commit. A
// predicate matching zero committed rows yields `None` and is skipped
// (the staged equivalent of the old "skip record_inline on 0 rows" —
// no inline HEAD advance, closing the zero-row drift class).
for (table_key, predicates) in delete_predicates {
let path = paths.get(&table_key).cloned().ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::stage_all: missing path for delete table '{}'",
table_key
))
})?;
let expected = *expected_versions.get(&table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"MutationStaging::stage_all: missing expected version for delete table '{}'",
table_key
))
})?;
let combined = if predicates.len() == 1 {
predicates.into_iter().next().unwrap()
} else {
predicates
.iter()
.map(|p| format!("({})", p))
.collect::<Vec<_>>()
.join(" OR ")
};
if let Some(entry) =
stage_delete_table(db, table_key, combined, path, expected).await?
{
staged_entries.push(entry);
}
}
Ok(StagedMutation {
inline_committed,
staged: staged_entries,
expected_versions,
paths,
op_kinds,
})
}
@ -395,6 +458,42 @@ async fn stage_pending_table(
}))
}
/// Stage a delete on `table_key` from a combined predicate, mirroring
/// [`stage_pending_table`] for the delete path. Reopens the dataset at the
/// pinned `expected` version (strict Delete op) and stages a deletion-vector
/// transaction via `TableStorage::stage_delete` — Phase A writes the deletion
/// file but advances no Lance HEAD until `commit_all` runs `commit_staged`.
/// Returns `None` when the predicate matches zero committed rows, so a no-op
/// delete stages nothing and never moves HEAD (the zero-row drift fix carried
/// onto the staged path).
async fn stage_delete_table(
db: &crate::db::Omnigraph,
table_key: String,
predicate: String,
path: StagedTablePath,
expected: u64,
) -> Result<Option<StagedTableEntry>> {
let ds = db
.reopen_for_mutation(
&table_key,
&path.full_path,
path.table_branch.as_deref(),
expected,
crate::db::MutationOpKind::Delete,
)
.await?;
match db.storage().stage_delete(&ds, &predicate).await? {
Some(staged) => Ok(Some(StagedTableEntry {
table_key,
path,
expected_version: expected,
dataset: ds,
staged_write: staged,
})),
None => Ok(None),
}
}
/// Output of [`MutationStaging::stage_all`]. Carries the staged Lance
/// transactions (Phase A complete; uncommitted fragments written) plus
/// the per-table metadata needed to write the recovery sidecar, run
@ -405,21 +504,13 @@ async fn stage_pending_table(
/// revalidation between Phase A and Phase B without touching staging
/// logic.
pub(crate) struct StagedMutation {
/// Updates from delete-touching ops (D₂ parse-time rule keeps
/// pending and inline_committed disjoint per table). Tables here
/// have already advanced Lance HEAD via inline `delete_where`;
/// `commit_all` builds sidecar pins for these too so the
/// commit→publish residual is recoverable for delete-only paths
/// (third-agent Finding 3).
inline_committed: HashMap<String, SubTableUpdate>,
/// One entry per table that had pending batches successfully staged.
/// One entry per table that had pending write batches or delete
/// predicates successfully staged (Phase A complete, no HEAD advance).
/// Deletes flow through this same vector as inserts/updates/overwrites —
/// there is no separate inline-commit path.
staged: Vec<StagedTableEntry>,
/// Pre-write manifest version per table — the publisher's CAS fence.
expected_versions: HashMap<String, u64>,
/// Per-table identifiers from `MutationStaging::paths`. Carried
/// through so `commit_all` can build sidecar pins for both staged
/// and inline-committed tables.
paths: HashMap<String, StagedTablePath>,
/// Strictest op_kind per touched table, propagated from
/// `MutationStaging::op_kinds` so `commit_all`'s drift check
/// fires only on read-modify-write tables.
@ -456,7 +547,8 @@ pub(crate) struct CommittedMutation {
/// Post-`commit_staged` handle per STAGED table (table_key → handle at the
/// just-committed version). Carried out (RFC-013 step 3b, collapse #4) so the
/// publish-prepare index build reuses it instead of a fresh `reopen_for_mutation`
/// at the same version. Inline-committed / delete tables are absent (no staged handle).
/// at the same version. Deletes are staged too, so their committed handle is
/// present here like any other write.
pub(crate) committed_handles: HashMap<String, SnapshotHandle>,
}
@ -508,39 +600,23 @@ impl StagedMutation {
txn: Option<&crate::db::WriteTxn>,
) -> Result<CommittedMutation> {
let StagedMutation {
inline_committed,
mut staged,
mut expected_versions,
paths,
op_kinds,
} = self;
// Per-(table_key, branch) queues for every touched table — both
// staged and inline-committed. Sorted by `acquire_many` internally
// so all multi-table writers (mutation, branch_merge, schema_apply,
// the fork path, recovery) agree on acquisition order — prevents
// lock-order inversion deadlock.
//
// For inline-committed tables (delete-only mutations), Lance HEAD
// has already advanced inside `delete_where` before `commit_all`
// runs. Holding the queue here prevents another writer from
// interleaving between our delete and our publish, which would
// otherwise leave a Lance-HEAD-ahead residual the delete-only
// sidecar (added below) would have to recover.
// Per-(table_key, branch) queues for every touched table. Sorted by
// `acquire_many` internally so all multi-table writers (mutation,
// branch_merge, schema_apply, the fork path, recovery) agree on
// acquisition order — prevents lock-order inversion deadlock. Deletes
// are staged like every other write, so holding the queue from before
// `commit_staged` through the publish keeps no Lance HEAD ahead of the
// manifest on the happy path.
let mut queue_keys: Vec<(String, Option<String>)> =
Vec::with_capacity(staged.len() + inline_committed.len());
Vec::with_capacity(staged.len());
for entry in &staged {
queue_keys.push((entry.table_key.clone(), entry.path.table_branch.clone()));
}
for table_key in inline_committed.keys() {
let path = paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing path for inline-committed table '{}'",
table_key
))
})?;
queue_keys.push((table_key.clone(), path.table_branch.clone()));
}
// Reuse the caller's guards (fork path) when handed in, else acquire
// our own. When reusing, every key we would acquire MUST already be
// covered — re-acquiring a held non-re-entrant key would deadlock, and
@ -723,23 +799,12 @@ impl StagedMutation {
expected_versions.insert(entry.table_key.clone(), current);
}
// Sidecar protocol: build the per-table pin list and write the
// sidecar BEFORE any later error can return after Lance HEAD has
// already moved. For staged tables this still happens before any
// Lance commit_staged runs. For inline-committed delete tables,
// Lance HEAD moved inside delete_where before commit_all, so the
// sidecar must also exist before the inline manifest-version check
// below can reject a stale query.
//
// Pins cover BOTH staged tables (Lance HEAD will advance below
// when `commit_staged` runs) AND inline-committed tables
// (Lance HEAD already advanced inside `delete_where` — we still
// need a sidecar so that an upcoming publish failure is
// recoverable on next open). This closes the third-agent
// Finding 3 hazard: delete-only mutations would otherwise skip
// the sidecar, leaving any commit→publish residual unreachable
// by recovery.
let mut pins: Vec<SidecarTablePin> =
Vec::with_capacity(staged.len() + inline_committed.len());
// sidecar BEFORE any `commit_staged` advances Lance HEAD, so any
// commit→publish residual is recoverable on the next open. Deletes
// are staged like every other write, so each delete table is a normal
// `staged` entry here — one pin at `expected + 1` (a single staged
// commit advances exactly one version), no inline special-casing.
let mut pins: Vec<SidecarTablePin> = Vec::with_capacity(staged.len());
for entry in &staged {
pins.push(SidecarTablePin {
table_key: entry.table_key.clone(),
@ -752,33 +817,6 @@ impl StagedMutation {
table_branch: entry.path.table_branch.clone(),
});
}
for (table_key, update) in &inline_committed {
let path = paths.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing path for inline-committed table '{}'",
table_key
))
})?;
let expected = *expected_versions.get(table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing expected version for inline-committed table '{}'",
table_key
))
})?;
pins.push(SidecarTablePin {
table_key: table_key.clone(),
table_path: path.full_path.clone(),
expected_version: expected,
// For inline-committed tables, the post-commit pin is
// the actual post-delete version recorded by
// `record_inline`, NOT `expected + 1` — `delete_where`
// can advance HEAD by more than one version (e.g.,
// when Lance internally compacts deletion vectors).
post_commit_pin: update.table_version,
confirmed_version: None,
table_branch: path.table_branch.clone(),
});
}
let sidecar_handle = if pins.is_empty() {
None
@ -792,33 +830,7 @@ impl StagedMutation {
Some(write_sidecar(db.root_uri(), db.storage_adapter(), &sidecar).await?)
};
for (table_key, _update) in inline_committed.iter() {
let current = snapshot
.entry(table_key)
.map(|e| e.table_version)
.ok_or_else(|| {
OmniError::manifest_conflict(format!(
"table '{}' missing from manifest at commit time",
table_key,
))
})?;
let expected = expected_versions.get(table_key).copied().ok_or_else(|| {
OmniError::manifest_internal(format!(
"StagedMutation::commit_all: missing expected version for inline-committed table '{}'",
table_key
))
})?;
if expected != current {
return Err(OmniError::manifest_expected_version_mismatch(
table_key.clone(),
expected,
current,
));
}
expected_versions.insert(table_key.clone(), current);
}
let mut updates: Vec<SubTableUpdate> = inline_committed.into_values().collect();
let mut updates: Vec<SubTableUpdate> = Vec::with_capacity(staged.len());
// Carry each staged table's post-`commit_staged` handle out so the
// publish-prepare index build reuses it (collapse #4) instead of

View file

@ -14,17 +14,19 @@
//! [`InlineCommitResidual`], reachable only via
//! `Omnigraph::storage_inline_residual()`, so the default `db.storage()`
//! surface is staged-only and cannot couple "write bytes" with "advance
//! HEAD" — MR-793 acceptance §1 closes by construction. The residuals:
//! HEAD" — MR-793 acceptance §1 closes by construction. The sole remaining
//! residual:
//!
//! * `delete_where` — Lance #6658 (`DeleteBuilder::execute_uncommitted`)
//! did not backport to the 6.x line; it first ships in `v7.0.0-beta.10`.
//! Migration to staged two-phase delete is tracked as MR-A, gated on the
//! Lance v7.x bump.
//! * `create_vector_index` — segment-commit-path needs
//! `build_index_metadata_from_segments`, still `pub(crate)` in Lance
//! 6.0.1 ([#6666](https://github.com/lance-format/lance/issues/6666),
//! 7.0.0 ([#6666](https://github.com/lance-format/lance/issues/6666),
//! open). Scalar indices already stage.
//!
//! `delete_where` was the other residual until MR-A: Lance 7.0's
//! `DeleteBuilder::execute_uncommitted` (#6658) made delete a staged write
//! (`TableStorage::stage_delete` → `commit_staged`), so delete no longer
//! advances Lance HEAD inline and the residual is gone.
//!
//! Each is named honestly at its call site; the forbidden-API guard test
//! catches direct lance::* misuse outside the storage layer.
//!
@ -64,7 +66,7 @@ use lance::dataset::{WhenMatched, WhenNotMatched};
use crate::db::{Snapshot, SubTableEntry};
use crate::error::Result;
use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
use crate::table_store::{StagedWrite, TableState, TableStore};
// ─── sealed module ──────────────────────────────────────────────────────────
@ -156,7 +158,8 @@ impl SnapshotHandle {
///
/// Produced by `TableStorage::stage_*`, consumed by
/// `TableStorage::commit_staged`. Carries the underlying `StagedWrite`
/// (transaction + read-your-writes deltas) behind `pub(crate)`.
/// (transaction + commit metadata + read-your-writes deltas) behind
/// `pub(crate)`.
#[derive(Debug, Clone)]
pub struct StagedHandle {
pub(crate) inner: StagedWrite,
@ -167,8 +170,7 @@ impl StagedHandle {
Self { inner: staged }
}
/// Take ownership of the inner `StagedWrite`. Used by
/// `commit_staged`.
/// Take ownership of the inner `StagedWrite`. Used by `commit_staged`.
pub(crate) fn into_staged(self) -> StagedWrite {
self.inner
}
@ -179,7 +181,8 @@ impl StagedHandle {
/// `TableStore::stage_append`'s `prior_stages` parameter. The result is
/// owned (not borrowed) — callers that already had a `&[StagedHandle]`
/// pay a clone cost per element. `StagedWrite::clone` is cheap because
/// `Transaction` and `Vec<Fragment>` are shallow-clone friendly.
/// `Transaction`, commit metadata, and `Vec<Fragment>` are shallow-clone
/// friendly.
pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
handles.iter().map(|h| h.inner.clone()).collect()
}
@ -384,6 +387,15 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
batch: RecordBatch,
) -> Result<StagedHandle>;
/// Stage a delete (two-phase, no HEAD advance). `None` when 0 rows match —
/// the table is not touched (no transaction, no version). See
/// `TableStore::stage_delete`.
async fn stage_delete(
&self,
snapshot: &SnapshotHandle,
filter: &str,
) -> Result<Option<StagedHandle>>;
/// Stage a BTREE scalar index build. MR-793 Phase 2.
async fn stage_create_btree_index(
&self,
@ -400,8 +412,8 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
// ── Index presence (reads, no HEAD advance) ──────────────────────
//
// The inline-commit writes (`delete_where`, `create_vector_index`) are
// deliberately NOT on this trait. They live on
// The inline-commit residual (`create_vector_index`) is deliberately NOT
// on this trait. It lives on
// the separate `InlineCommitResidual` trait, reachable only through
// `Omnigraph::storage_inline_residual()`. As a result the default
// `db.storage()` surface cannot couple "write bytes" with "advance HEAD"
@ -447,21 +459,16 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
/// by accident (MR-793 acceptance §1, by construction).
///
/// Residual reasons (each is named honestly at its call site):
/// * `delete_where` — Lance has no public two-phase delete on the 6.x line
/// (`DeleteBuilder::execute_uncommitted` first ships in v7.x; MR-A / Lance
/// #6658). The D2 parse-time rule + recovery sidecars cover the gap meanwhile.
/// * `create_vector_index` — vector-index segment-commit needs
/// `build_index_metadata_from_segments`, still `pub(crate)` in Lance 6.0.1
/// `build_index_metadata_from_segments`, still `pub(crate)` in Lance 7.0.0
/// (Lance #6666). Scalar indices already stage.
///
/// `delete_where` used to live here, but Lance 7.0's
/// `DeleteBuilder::execute_uncommitted` (#6658) made delete a staged write
/// (`TableStorage::stage_delete` → `commit_staged`); it was retired in MR-A so
/// delete no longer advances Lance HEAD inline.
#[async_trait]
pub(crate) trait InlineCommitResidual: sealed::Sealed + Send + Sync + Debug {
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<(SnapshotHandle, DeleteState)>;
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
@ -725,8 +732,7 @@ impl TableStorage for TableStore {
staged: StagedHandle,
) -> Result<SnapshotHandle> {
let ds_arc = snapshot.into_arc();
let transaction = staged.into_staged().transaction;
TableStore::commit_staged(self, ds_arc, transaction)
TableStore::commit_staged(self, ds_arc, staged.into_staged())
.await
.map(SnapshotHandle::new)
}
@ -741,6 +747,16 @@ impl TableStorage for TableStore {
.map(StagedHandle::new)
}
async fn stage_delete(
&self,
snapshot: &SnapshotHandle,
filter: &str,
) -> Result<Option<StagedHandle>> {
Ok(TableStore::stage_delete(self, snapshot.dataset(), filter)
.await?
.map(StagedHandle::new))
}
async fn stage_create_btree_index(
&self,
snapshot: &SnapshotHandle,
@ -805,17 +821,6 @@ impl TableStorage for TableStore {
#[async_trait]
impl InlineCommitResidual for TableStore {
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<(SnapshotHandle, DeleteState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc()).unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,

View file

@ -10,12 +10,13 @@ use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream, Scanner}
use lance::dataset::transaction::{Operation, Transaction, TransactionBuilder};
use lance::dataset::write::merge_insert::SourceDedupeBehavior;
use lance::dataset::{
CommitBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode,
WriteParams,
CommitBuilder, DeleteBuilder, InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched,
WriteMode, WriteParams,
};
use lance::datatypes::{BlobKind, Schema as LanceSchema};
use lance::index::DatasetIndexExt;
use lance::index::scalar::IndexDetails;
use lance_core::utils::mask::RowAddrTreeMap;
use lance_file::version::LanceFileVersion;
use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
use lance_index::{IndexType, is_system_index};
@ -36,14 +37,6 @@ pub struct TableState {
pub(crate) version_metadata: TableVersionMetadata,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeleteState {
pub version: u64,
pub row_count: u64,
pub deleted_rows: usize,
pub(crate) version_metadata: TableVersionMetadata,
}
/// Whether a `key_col IN (...)` scan on a dataset will be served by the
/// persisted scalar (BTREE) index, or silently fall back to a full filtered
/// scan. Detection-only (metadata, no IO); the scan returns the correct rows
@ -66,9 +59,9 @@ pub enum IndexCoverage {
/// drifting ahead. See `docs/dev/writes.md` for the publisher-CAS contract
/// this builds on.
///
/// `transaction` is opaque from our side — Lance owns its semantics. We
/// commit it via `CommitBuilder::execute(transaction)` (see
/// `TableStore::commit_staged`).
/// `transaction` and `commit_metadata` are opaque from our side — Lance owns
/// their semantics. They must travel together so `commit_staged` can preserve
/// Lance's row-level conflict resolution metadata for staged deletes/updates.
///
/// For read-your-writes within the same query, `new_fragments` and
/// `removed_fragment_ids` together describe the post-stage view delta:
@ -80,21 +73,70 @@ pub enum IndexCoverage {
/// stays in the committed manifest while its rewrite shows up in `new_fragments`).
#[derive(Debug, Clone)]
pub struct StagedWrite {
pub transaction: Transaction,
transaction: Transaction,
commit_metadata: StagedCommitMetadata,
/// Fragments to surface alongside the committed manifest in
/// `Scanner::with_fragments(committed - removed + new)`. For
/// `Operation::Append` these are the freshly-appended fragments. For
/// `Operation::Update` (merge_insert) these are
/// `updated_fragments + new_fragments` (rewrites + freshly-inserted
/// rows).
pub new_fragments: Vec<Fragment>,
new_fragments: Vec<Fragment>,
/// Fragment IDs that this staged write supersedes. The committed
/// manifest must filter these out before being combined with
/// `new_fragments` for read-your-writes scans, otherwise rewrites
/// yield duplicate rows. Empty for `stage_append` (`Operation::Append`
/// adds without removing anything); populated from
/// `Operation::Update.removed_fragment_ids` for `stage_merge_insert`.
pub removed_fragment_ids: Vec<u64>,
removed_fragment_ids: Vec<u64>,
}
#[derive(Debug, Clone, Default)]
struct StagedCommitMetadata {
affected_rows: Option<RowAddrTreeMap>,
}
impl StagedCommitMetadata {
fn affected_rows(affected_rows: Option<RowAddrTreeMap>) -> Self {
Self { affected_rows }
}
}
impl StagedWrite {
fn new(
transaction: Transaction,
new_fragments: Vec<Fragment>,
removed_fragment_ids: Vec<u64>,
) -> Self {
Self {
transaction,
commit_metadata: StagedCommitMetadata::default(),
new_fragments,
removed_fragment_ids,
}
}
fn with_commit_metadata(
transaction: Transaction,
commit_metadata: StagedCommitMetadata,
new_fragments: Vec<Fragment>,
removed_fragment_ids: Vec<u64>,
) -> Self {
Self {
transaction,
commit_metadata,
new_fragments,
removed_fragment_ids,
}
}
pub fn new_fragments(&self) -> &[Fragment] {
&self.new_fragments
}
pub fn removed_fragment_ids(&self) -> &[u64] {
&self.removed_fragment_ids
}
}
#[derive(Debug, Clone)]
@ -387,10 +429,8 @@ impl TableStore {
if has_blob_columns {
let arrow_schema: SchemaRef = Arc::new(ds.schema().into());
let batches = self.scan_batches_for_rewrite(ds).await?;
let reader = arrow_array::RecordBatchIterator::new(
batches.into_iter().map(Ok),
arrow_schema,
);
let reader =
arrow_array::RecordBatchIterator::new(batches.into_iter().map(Ok), arrow_schema);
return Ok(lance_datafusion::utils::reader_to_stream(Box::new(reader)));
}
// Non-blob: a true lazy scan. `DatasetRecordBatchStream` converts to the
@ -879,23 +919,56 @@ impl TableStore {
}
}
pub(crate) async fn delete_where(
&self,
dataset_uri: &str,
ds: &mut Dataset,
filter: &str,
) -> Result<DeleteState> {
let delete_result = ds
.delete(filter)
/// Stage a delete without advancing Lance HEAD — the two-phase analogue of
/// `stage_merge_insert`. `DeleteBuilder::execute_uncommitted` writes the
/// per-fragment deletion files to object storage (Phase A) and returns an
/// uncommitted `Operation::Delete` transaction; HEAD does NOT advance until
/// `commit_staged`. A 0-row delete is a TRUE no-op: `None` (no transaction,
/// no fragments, no version). For a non-empty delete the returned
/// `StagedWrite` carries the deletion-vector-bearing `updated_fragments` as
/// `new_fragments` and the superseded originals (+ any fully-removed
/// fragments) as `removed_fragment_ids`, so `combine_committed_with_staged`
/// (`committed - removed + new`) makes an in-query read see the deletion.
/// Like `stage_merge_insert`, this must carry Lance's `affected_rows`
/// metadata through to `commit_staged`; otherwise a staged transaction loses
/// the row-level conflict information Lance's rebase path needs.
pub async fn stage_delete(&self, ds: &Dataset, filter: &str) -> Result<Option<StagedWrite>> {
let uncommitted = DeleteBuilder::new(Arc::new(ds.clone()), filter)
.execute_uncommitted()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(DeleteState {
version: delete_result.new_dataset.version().version,
row_count: self.count_rows(&delete_result.new_dataset, None).await? as u64,
deleted_rows: delete_result.num_deleted_rows as usize,
version_metadata: self
.dataset_version_metadata(dataset_uri, &delete_result.new_dataset)?,
})
if uncommitted.num_deleted_rows == 0 {
return Ok(None);
}
let (new_fragments, removed_fragment_ids) = match &uncommitted.transaction.operation {
Operation::Delete {
updated_fragments,
deleted_fragment_ids,
..
} => {
// The originals superseded by their deletion-vector rewrites must
// be filtered out of the read view; `deleted_fragment_ids` are
// whole-fragment removals.
let mut removed = deleted_fragment_ids.clone();
removed.extend(updated_fragments.iter().map(|f| f.id));
(updated_fragments.clone(), removed)
}
other => {
return Err(OmniError::manifest_internal(format!(
"stage_delete: expected Operation::Delete, got {:?}",
std::mem::discriminant(other)
)));
}
};
Ok(Some(StagedWrite::with_commit_metadata(
uncommitted.transaction,
StagedCommitMetadata::affected_rows(uncommitted.affected_rows),
new_fragments,
removed_fragment_ids,
)))
}
// ─── Staged-write API ────────────────────────────────────────────────────
@ -1005,12 +1078,12 @@ impl TableStore {
let start_row_id = ds.manifest.next_row_id + prior_rows;
assign_row_id_meta(&mut new_fragments, start_row_id)?;
}
Ok(StagedWrite {
Ok(StagedWrite::new(
transaction,
new_fragments,
// Append never supersedes existing fragments.
removed_fragment_ids: Vec::new(),
})
Vec::new(),
))
}
/// Streaming variant of [`Self::stage_append`]: appends the rows of `source`
@ -1071,11 +1144,7 @@ impl TableStore {
let start_row_id = ds.manifest.next_row_id + prior_rows;
assign_row_id_meta(&mut new_fragments, start_row_id)?;
}
Ok(StagedWrite {
transaction,
new_fragments,
removed_fragment_ids: Vec::new(),
})
Ok(StagedWrite::new(transaction, new_fragments, Vec::new()))
}
/// Stage a merge_insert (upsert): write fragment files describing the
@ -1192,22 +1261,20 @@ impl TableStore {
)));
}
};
Ok(StagedWrite {
transaction: uncommitted.transaction,
Ok(StagedWrite::with_commit_metadata(
uncommitted.transaction,
StagedCommitMetadata::affected_rows(uncommitted.affected_rows),
new_fragments,
removed_fragment_ids,
})
))
}
/// Commit a previously-staged transaction onto `ds`, returning the new
/// dataset (with HEAD advanced). Wraps `CommitBuilder::execute`. Used by
/// Commit a previously-staged write onto `ds`, returning the new dataset
/// (with HEAD advanced). The staged packet owns the Lance transaction plus
/// any commit metadata (`affected_rows` for delete/merge rebase). Used by
/// the publisher at end-of-query to materialize all staged writes before
/// the meta-manifest commit.
pub async fn commit_staged(
&self,
ds: Arc<Dataset>,
transaction: Transaction,
) -> Result<Dataset> {
pub async fn commit_staged(&self, ds: Arc<Dataset>, staged: StagedWrite) -> Result<Dataset> {
// Skip Lance's auto-cleanup hook on every commit. OmniGraph owns version
// GC explicitly (optimize.rs::cleanup_all_tables); Lance's hook fires off
// the *dataset's stored* `lance.auto_cleanup.*` config, which graphs
@ -1216,9 +1283,12 @@ impl TableStore {
// upgraded graphs. Skipping here covers the staged write path (the main
// data path) for new and legacy datasets alike, preventing Lance from
// GC'ing versions the __manifest still pins for snapshots/time-travel.
CommitBuilder::new(ds)
.with_skip_auto_cleanup(true)
.execute(transaction)
let mut builder = CommitBuilder::new(ds).with_skip_auto_cleanup(true);
if let Some(affected_rows) = staged.commit_metadata.affected_rows {
builder = builder.with_affected_rows(affected_rows);
}
builder
.execute(staged.transaction)
.await
.map_err(|e| OmniError::Lance(e.to_string()))
}
@ -1305,11 +1375,11 @@ impl TableStore {
// fragment in removed_fragment_ids so the post-stage view shows
// ONLY the staged fragments.
let removed_fragment_ids: Vec<u64> = ds.manifest.fragments.iter().map(|f| f.id).collect();
Ok(StagedWrite {
Ok(StagedWrite::new(
transaction,
new_fragments,
removed_fragment_ids,
})
))
}
/// Stage a BTREE scalar index build. Returns a StagedWrite whose
@ -1358,11 +1428,7 @@ impl TableStore {
},
)
.build();
Ok(StagedWrite {
transaction,
new_fragments: Vec::new(),
removed_fragment_ids: Vec::new(),
})
Ok(StagedWrite::new(transaction, Vec::new(), Vec::new()))
}
/// Stage an INVERTED (FTS) scalar index build. Same shape as
@ -1397,11 +1463,7 @@ impl TableStore {
},
)
.build();
Ok(StagedWrite {
transaction,
new_fragments: Vec::new(),
removed_fragment_ids: Vec::new(),
})
Ok(StagedWrite::new(transaction, Vec::new(), Vec::new()))
}
/// Run a scan with optional uncommitted staged writes visible

View file

@ -3599,7 +3599,7 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() {
// Recovery: reopen runs the sweep. BranchMerge uses LOOSE
// classification — `publish_rewritten_merge_table` runs multiple
// commit_staged calls per table (stage_merge_insert + delete_where +
// commit_staged calls per table (stage_merge_insert + stage_delete +
// index rebuilds), so post_commit_pin in the sidecar is a lower
// bound; the loose-match classifier accepts any HEAD > expected_version
// when expected_version == manifest_pinned.

View file

@ -37,8 +37,9 @@
//! `Omnigraph::storage_inline_residual()`, so the default storage surface
//! cannot couple "write bytes" with "advance HEAD" — engine code that
//! wants an inline residual must name the residual accessor explicitly.
//! The only residuals are `delete_where` (Lance #6658 / v7.x) and
//! `create_vector_index` (Lance #6666). The dead legacy methods
//! The sole residual is `create_vector_index` (Lance #6666); `delete`
//! migrated to the staged `stage_delete` path in MR-A (Lance 7.0 #6658).
//! The dead legacy methods
//! (trait `append_batch` / `merge_insert_batches`, inherent
//! `merge_insert_batch{,es}`, `create_{btree,inverted}_index`) were
//! removed entirely. This guard's scope is unchanged: it catches direct

View file

@ -98,8 +98,9 @@ async fn lance_error_too_much_write_contention_variant_exists() {
#[tokio::test]
async fn lance_error_incompatible_transaction_variant_exists() {
let err =
lance::Error::incompatible_transaction_source("concurrent UpdateConfig at version N".into());
let err = lance::Error::incompatible_transaction_source(
"concurrent UpdateConfig at version N".into(),
);
assert!(
matches!(err, lance::Error::IncompatibleTransaction { .. }),
"Lance::Error::IncompatibleTransaction variant missing or renamed; \
@ -334,12 +335,15 @@ async fn _compile_transaction_history_for_repair_signature() -> lance::Result<()
Ok(())
}
// --- Guard 8: Dataset::delete returns DeleteResult { new_dataset, num_deleted_rows } ---
// --- Guard 8: DeleteBuilder::execute_uncommitted returns
// UncommittedDelete { transaction, affected_rows, num_deleted_rows } ---
//
// `table_store.rs::delete_where` consumes both fields. When MR-A migrates
// `delete_where` to two-phase via `DeleteBuilder::execute_uncommitted`, this
// guard updates to pin the staged path. Compile-only.
// `table_store.rs::stage_delete` uses the two-phase delete (lance#6658, Lance
// 7.0): it reads `num_deleted_rows` (0 ⇒ no-op `None`) and stages `transaction`
// WITHOUT committing, instead of the inline `Dataset::delete`. It must also
// preserve `affected_rows` and pass it to `CommitBuilder::with_affected_rows`
// through `StagedWrite`; dropping it disables Lance's row-level rebase metadata.
// Compile-only.
#[allow(
dead_code,
unreachable_code,
@ -347,11 +351,42 @@ async fn _compile_transaction_history_for_repair_signature() -> lance::Result<()
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_delete_result_field_shape() -> lance::Result<()> {
let mut ds: Dataset = unimplemented!();
let result: DeleteResult = ds.delete("x = 1").await?;
let _new_dataset: Arc<Dataset> = result.new_dataset;
let _num_deleted: u64 = result.num_deleted_rows;
async fn _compile_uncommitted_delete_field_shape() -> lance::Result<()> {
use lance::dataset::DeleteBuilder;
use lance_core::utils::mask::RowAddrTreeMap;
let ds: Arc<Dataset> = unimplemented!();
let staged = DeleteBuilder::new(ds, "x = 1")
.execute_uncommitted()
.await?;
let _txn: lance::dataset::transaction::Transaction = staged.transaction;
let _num_deleted: u64 = staged.num_deleted_rows;
let _affected: Option<RowAddrTreeMap> = staged.affected_rows;
Ok(())
}
// --- Guard 8b: MergeInsertJob::execute_uncommitted returns
// UncommittedMergeInsert { transaction, affected_rows, stats, inserted_rows_filter } ---
//
// `TableStore::stage_merge_insert` has the same staged commit contract as
// delete: the Lance transaction and `affected_rows` metadata must travel
// together into `commit_staged`.
#[allow(
dead_code,
unreachable_code,
unused_variables,
unused_mut,
clippy::diverging_sub_expression
)]
async fn _compile_uncommitted_merge_insert_field_shape() -> lance::Result<()> {
use lance_core::utils::mask::RowAddrTreeMap;
let ds: Arc<Dataset> = unimplemented!();
let source: Box<dyn arrow_array::RecordBatchReader + Send> = unimplemented!();
let job = MergeInsertBuilder::try_new(ds, vec!["x".to_string()])?.try_build()?;
let staged = job.execute_uncommitted(source).await?;
let _txn: lance::dataset::transaction::Transaction = staged.transaction;
let _affected: Option<RowAddrTreeMap> = staged.affected_rows;
let _stats = staged.stats;
let _inserted_rows_filter = staged.inserted_rows_filter;
Ok(())
}
@ -757,7 +792,9 @@ async fn scalar_index_use_requires_matched_literal_type() {
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int32Array::from(vec![1, 5, 9, 13])),
Arc::new(arrow_array::Date32Array::from(vec![19000, 19723, 20000, 20500])),
Arc::new(arrow_array::Date32Array::from(vec![
19000, 19723, 20000, 20500,
])),
],
)
.unwrap();
@ -786,7 +823,11 @@ async fn scalar_index_use_requires_matched_literal_type() {
// (label, filter, expect_index_used)
let cases = [
("n32 = 5i32 (matched Int32)", col("n32").eq(lit(5i32)), true),
("n32 = 5i64 (widened Int64)", col("n32").eq(lit(5i64)), false),
(
"n32 = 5i64 (widened Int64)",
col("n32").eq(lit(5i64)),
false,
),
(
"d32 = Date32 (matched)",
col("d32").eq(lit(ScalarValue::Date32(Some(19723)))),
@ -917,7 +958,10 @@ async fn skip_auto_cleanup_suppresses_version_gc() {
async fn set_legacy_cleanup(ds: &mut Dataset) {
let mut cfg = HashMap::new();
cfg.insert("lance.auto_cleanup.interval".to_string(), "1".to_string());
cfg.insert("lance.auto_cleanup.older_than".to_string(), "0ms".to_string());
cfg.insert(
"lance.auto_cleanup.older_than".to_string(),
"0ms".to_string(),
);
ds.update_config(cfg).await.unwrap();
}
fn row(i: i32) -> (Arc<Schema>, RecordBatch) {
@ -1112,10 +1156,14 @@ async fn camelcase_index_equality_routes_to_scalar_index() {
..Default::default()
};
let mut ds = Dataset::write(reader, uri, Some(params)).await.unwrap();
ds.create_index_builder(&["repoName"], IndexType::BTree, &ScalarIndexParams::default())
.replace(true)
.await
.unwrap();
ds.create_index_builder(
&["repoName"],
IndexType::BTree,
&ScalarIndexParams::default(),
)
.replace(true)
.await
.unwrap();
async fn plan_str(ds: &Dataset, filter: datafusion::prelude::Expr) -> lance::Result<String> {
let mut scanner = ds.scan();

View file

@ -869,7 +869,7 @@ async fn delete_only_mutation_refuses_uncovered_drift_before_inline_commit() {
&mixed_params(&[("$name", "Alice")], &[]),
)
.await
.expect_err("strict delete must reject uncovered drift before delete_where");
.expect_err("strict delete must reject uncovered drift before staging the delete");
assert!(
err.to_string().contains("expected"),
"delete should fail as a strict stale-version write; got: {err}"
@ -879,7 +879,7 @@ async fn delete_only_mutation_refuses_uncovered_drift_before_inline_commit() {
assert_eq!(manifest_after, manifest_before);
assert_eq!(
head_after, head_before,
"delete_where must not run after the strict drift guard fails"
"the staged delete must not commit after the strict drift guard fails"
);
assert_eq!(
count_rows(&db, "node:Person").await,

View file

@ -409,7 +409,8 @@ async fn recovery_rolls_back_synthetic_drift_on_open() {
// leave (with no sidecar — the writer never wrote one because we're
// simulating the residual class directly).
//
// Use `delete_where` with a never-matching predicate: it inline-commits
// Use `lance_delete_inline` (a test helper that calls Lance directly) with
// a never-matching predicate: it inline-commits
// a Lance transaction (advancing HEAD by one) without removing data
// and without depending on the dataset's exact column set. The actual
// residual the sweep recovers from is the manifest-vs-Lance-HEAD gap;
@ -456,7 +457,7 @@ async fn recovery_rolls_back_synthetic_drift_on_open() {
// sidecar.post_commit_pin != observed head), decide RollBack, and call
// restore_table_to_version(person_uri, head_before_drift). The
// fragment-set short-circuit may make this a no-op if the synthetic
// drift produced no fragment changes (delete_where with a never-matching
// drift produced no fragment changes (lance_delete_inline with a never-matching
// predicate is one such case — Lance bumps version but fragments are
// unchanged). Either way the sweep must complete without error and
// delete the sidecar; the actual rollback HEAD-advance behavior is
@ -725,7 +726,7 @@ async fn recovery_rolls_forward_after_phase_b_completes() {
let head_before = ds.version().version;
// Synthesize a successful Phase B: advance Lance HEAD by one
// (delete_where with no-match — no fragment changes, but version bumps).
// (lance_delete_inline with no-match — no fragment changes, but version bumps).
let _ = helpers::lance_delete_inline(&mut ds, "1 = 2").await;
let head_after = ds.version().version;
assert_eq!(head_after, head_before + 1);

View file

@ -22,7 +22,7 @@ use arrow_array::{Array, Int32Array, RecordBatch, StringArray, UInt64Array};
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use lance::Dataset;
use lance::dataset::{WhenMatched, WhenNotMatched};
use lance::dataset::{DeleteBuilder, WhenMatched, WhenNotMatched};
use lance::index::DatasetIndexExt;
use lance_index::IndexType;
use lance_linalg::distance::MetricType;
@ -66,6 +66,19 @@ fn person_batch(rows: &[(&str, Option<i32>)]) -> RecordBatch {
.unwrap()
}
fn numbered_person_batch(range: std::ops::Range<i32>) -> RecordBatch {
let ids: Vec<String> = range.clone().map(|i| format!("p{i}")).collect();
let ages: Vec<Option<i32>> = range.map(Some).collect();
RecordBatch::try_new(
person_schema(),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(ages)),
],
)
.unwrap()
}
fn collect_ids(batches: &[RecordBatch]) -> Vec<String> {
let mut out = Vec::new();
for b in batches {
@ -83,6 +96,29 @@ fn collect_ids(batches: &[RecordBatch]) -> Vec<String> {
out
}
fn collect_age_for_id(batches: &[RecordBatch], needle: &str) -> Option<i32> {
for batch in batches {
let ids = batch
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let ages = batch
.column_by_name("age")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
for row in 0..batch.num_rows() {
if ids.value(row) == needle && !ages.is_null(row) {
return Some(ages.value(row));
}
}
}
None
}
#[tokio::test]
async fn stage_append_is_visible_via_scan_with_staged() {
let dir = tempfile::tempdir().unwrap();
@ -138,7 +174,7 @@ async fn stage_merge_insert_dedupes_superseded_committed_fragment() {
.await
.unwrap();
assert!(
!staged.removed_fragment_ids.is_empty(),
!staged.removed_fragment_ids().is_empty(),
"merge_insert that rewrites a committed row must set removed_fragment_ids \
so the scan-with-staged composer can shadow the superseded committed \
fragment without it, the committed row and its rewrite both appear, \
@ -277,7 +313,7 @@ async fn chained_stage_appends_have_distinct_row_ids() {
fn combine_for_scan(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
let removed: std::collections::HashSet<u64> = staged
.iter()
.flat_map(|w| w.removed_fragment_ids.iter().copied())
.flat_map(|w| w.removed_fragment_ids().iter().copied())
.collect();
let mut combined: Vec<_> = ds
.manifest
@ -287,7 +323,7 @@ fn combine_for_scan(ds: &Dataset, staged: &[StagedWrite]) -> Vec<Fragment> {
.cloned()
.collect();
for s in staged {
combined.extend(s.new_fragments.iter().cloned());
combined.extend(s.new_fragments().iter().cloned());
}
combined
}
@ -313,7 +349,7 @@ async fn stage_append_then_commit_persists_data() {
.unwrap();
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.commit_staged(Arc::new(ds.clone()), staged)
.await
.unwrap();
assert!(
@ -350,10 +386,7 @@ async fn stage_merge_insert_then_commit_persists_merged_view() {
.await
.unwrap();
store
.commit_staged(Arc::new(ds), staged.transaction)
.await
.unwrap();
store.commit_staged(Arc::new(ds), staged).await.unwrap();
let reopened = Dataset::open(&uri).await.unwrap();
let batches = store.scan_batches(&reopened).await.unwrap();
@ -364,6 +397,53 @@ async fn stage_merge_insert_then_commit_persists_merged_view() {
assert_eq!(total, 2, "merge_insert must not duplicate the matched row");
}
#[tokio::test]
async fn stage_merge_insert_commit_rebases_over_disjoint_committed_delete() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, numbered_person_batch(0..100))
.await
.unwrap();
let update_ids: Vec<String> = (0..10).map(|i| format!("p{i}")).collect();
let update_ages: Vec<Option<i32>> = (0..10).map(|i| Some(1000 + i)).collect();
let update_batch = RecordBatch::try_new(
person_schema(),
vec![
Arc::new(StringArray::from(update_ids)),
Arc::new(Int32Array::from(update_ages)),
],
)
.unwrap();
let staged = store
.stage_merge_insert(
ds.clone(),
update_batch,
vec!["id".to_string()],
WhenMatched::UpdateAll,
WhenNotMatched::DoNothing,
)
.await
.unwrap();
DeleteBuilder::new(Arc::new(ds.clone()), "age >= 10 AND age < 20")
.execute()
.await
.unwrap();
let committed = store
.commit_staged(Arc::new(ds.clone()), staged)
.await
.unwrap();
assert_eq!(committed.count_rows(None).await.unwrap(), 90);
let batches = store.scan_batches(&committed).await.unwrap();
assert_eq!(collect_age_for_id(&batches, "p0"), Some(1000));
assert_eq!(collect_age_for_id(&batches, "p10"), None);
}
/// **Documented limitation** (see `scan_with_staged` doc): when a filter
/// is supplied, Lance's stats-based pruning drops the staged fragment from
/// the filtered scan because uncommitted fragments produced by
@ -537,7 +617,7 @@ async fn stage_overwrite_does_not_advance_head_until_commit() {
// After commit_staged, HEAD advances and the dataset shows the
// overwrite result (zoe alone — alice replaced).
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.commit_staged(Arc::new(ds.clone()), staged)
.await
.unwrap();
assert!(new_ds.version().version > pre_version);
@ -578,7 +658,7 @@ async fn stage_overwrite_preserves_stable_row_ids() {
.await
.unwrap();
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.commit_staged(Arc::new(ds.clone()), staged)
.await
.unwrap();
@ -616,7 +696,7 @@ async fn stage_overwrite_replaces_all_fragments() {
.await
.unwrap();
let removed: std::collections::HashSet<u64> =
staged.removed_fragment_ids.iter().copied().collect();
staged.removed_fragment_ids().iter().copied().collect();
assert_eq!(
removed, committed_fragment_ids,
"stage_overwrite must list every committed fragment as removed so \
@ -659,11 +739,11 @@ async fn stage_overwrite_empty_batch_replaces_all_rows() {
.await
.unwrap();
assert!(
staged.new_fragments.is_empty(),
staged.new_fragments().is_empty(),
"empty overwrite should produce a zero-fragment Lance Overwrite transaction"
);
assert_eq!(
staged.removed_fragment_ids.len(),
staged.removed_fragment_ids().len(),
ds.manifest.fragments.len(),
"empty overwrite still removes every committed fragment"
);
@ -674,7 +754,7 @@ async fn stage_overwrite_empty_batch_replaces_all_rows() {
);
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.commit_staged(Arc::new(ds.clone()), staged)
.await
.unwrap();
assert_eq!(new_ds.version().version, pre_version + 1);
@ -726,7 +806,7 @@ async fn stage_create_btree_index_does_not_advance_head_until_commit() {
);
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.commit_staged(Arc::new(ds.clone()), staged)
.await
.unwrap();
assert!(new_ds.version().version > pre_version);
@ -760,7 +840,7 @@ async fn stage_create_inverted_index_does_not_advance_head_until_commit() {
assert!(!store.has_fts_index(&ds, "id").await.unwrap());
let new_ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.commit_staged(Arc::new(ds.clone()), staged)
.await
.unwrap();
assert!(new_ds.version().version > pre_version);
@ -770,23 +850,21 @@ async fn stage_create_inverted_index_does_not_advance_head_until_commit() {
);
}
/// Pin the inline-commit behavior of `delete_where`. Lance 6.0.1 does
/// NOT expose a public `DeleteJob::execute_uncommitted`
/// (`pub(crate)` — see lance-format/lance#6658). The trait deliberately
/// does NOT introduce a `stage_delete` wrapper that would secretly
/// inline-commit (a side-channel between the staged and inline write
/// paths). Instead, the trait keeps `delete_where` as the only delete
/// entry point, named honestly.
///
/// **When Lance #6658 lands**: this test will need to flip — replace
/// the assertion with a `stage_delete` + `commit_staged` round-trip
/// and remove the residual line in `docs/dev/writes.md`.
/// Staged delete (Lance 7.0 `DeleteBuilder::execute_uncommitted`, lance#6658):
/// `stage_delete` does NOT advance Lance HEAD (two-phase); an in-query
/// `scan_with_staged` sees the deletion via the staged deletion-vector
/// fragments (read-your-writes — proves `Scanner::with_fragments` applies the
/// staged deletion files); `commit_staged` then advances HEAD and persists it;
/// and a 0-row delete is a true no-op (`None`, no version, no fragments).
/// Flipped from the old `delete_where_advances_head_inline_documents_residual`
/// once the two-phase delete landed.
#[tokio::test]
async fn delete_where_advances_head_inline_documents_residual() {
async fn stage_delete_does_not_advance_head_and_reads_through_staged() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let mut ds = TableStore::write_dataset(
let ds = TableStore::write_dataset(
&uri,
person_batch(&[("alice", Some(30)), ("bob", Some(25))]),
)
@ -794,26 +872,82 @@ async fn delete_where_advances_head_inline_documents_residual() {
.unwrap();
let pre_version = ds.version().version;
let result = ds.delete("id = 'alice'").await.unwrap();
ds = (*result.new_dataset).clone();
assert_eq!(result.num_deleted_rows, 1);
assert!(
ds.version().version > pre_version,
"delete_where ADVANCES Lance HEAD inline (the residual). When \
lance-format/lance#6658 ships and we migrate to stage_delete + \
commit_staged, flip this assertion to assert that staging does \
NOT advance HEAD."
// Stage a delete of alice — writes the deletion file (Phase A) but does
// NOT advance HEAD.
let staged = store
.stage_delete(&ds, "id = 'alice'")
.await
.unwrap()
.expect("alice matches → Some(StagedWrite)");
assert_eq!(
ds.version().version,
pre_version,
"stage_delete must NOT advance Lance HEAD (two-phase)"
);
// Read-your-writes: a scan over the staged delete sees the deletion vector
// — alice is gone, bob remains.
let batches = store
.scan_with_staged(&ds, std::slice::from_ref(&staged), None, None)
.await
.unwrap();
assert_eq!(
collect_ids(&batches),
vec!["bob"],
"the staged deletion must be visible to an in-query read (deletion-vector RYW)"
);
// Commit advances HEAD and persists the deletion.
let committed = store
.commit_staged(std::sync::Arc::new(ds.clone()), staged)
.await
.unwrap();
assert!(committed.version().version > pre_version);
assert_eq!(committed.count_rows(None).await.unwrap(), 1);
// A 0-row delete is a true no-op: None, no version, no fragments.
let none = store
.stage_delete(&committed, "id = 'nobody'")
.await
.unwrap();
assert!(none.is_none(), "a 0-row delete must stage nothing");
}
/// Companion to `delete_where_*`: pin the inline-commit behavior of
/// `create_vector_index`. Lance 6.0.1 vector indices take the
/// "segment commit path" which calls `build_index_metadata_from_segments`
/// (`pub(crate)` in lance-6.0.1 `src/index.rs:111`). Until upstream
/// exposes that helper (companion ticket to lance-format/lance#6658),
/// the trait surface deliberately does NOT include
/// `stage_create_vector_index` — same rationale as `stage_delete`'s
/// absence (no side-channel between staged and inline write paths).
#[tokio::test]
async fn stage_delete_commit_rebases_over_disjoint_committed_delete() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let ds = TableStore::write_dataset(&uri, numbered_person_batch(0..100))
.await
.unwrap();
let staged = store
.stage_delete(&ds, "age < 10")
.await
.unwrap()
.expect("delete should match rows");
DeleteBuilder::new(Arc::new(ds.clone()), "age >= 10 AND age < 20")
.execute()
.await
.unwrap();
let committed = store
.commit_staged(Arc::new(ds.clone()), staged)
.await
.unwrap();
assert_eq!(committed.count_rows(None).await.unwrap(), 80);
}
/// Pin the inline-commit behavior of `create_vector_index` — the SOLE
/// remaining inline residual now that `delete` has migrated to `stage_delete`
/// (MR-A). Vector indices take Lance's "segment commit path" which calls
/// `build_index_metadata_from_segments` (`pub(crate)` in Lance 7.0.0). Until
/// upstream exposes that helper (lance-format/lance#6666), the trait surface
/// deliberately does NOT include `stage_create_vector_index` — keeping the
/// inline coupling off `TableStorage` so no side-channel exists between the
/// staged and inline write paths.
#[tokio::test]
async fn create_vector_index_advances_head_inline_documents_residual() {
use arrow_array::FixedSizeListArray;
@ -1073,7 +1207,10 @@ async fn commit_staged_skips_auto_cleanup_so_pinned_versions_survive() {
// every commit, delete anything older than now).
let mut cfg = HashMap::new();
cfg.insert("lance.auto_cleanup.interval".to_string(), "1".to_string());
cfg.insert("lance.auto_cleanup.older_than".to_string(), "0ms".to_string());
cfg.insert(
"lance.auto_cleanup.older_than".to_string(),
"0ms".to_string(),
);
ds.update_config(cfg).await.unwrap();
// Several writes through the engine's staged commit path.
@ -1084,7 +1221,7 @@ async fn commit_staged_skips_auto_cleanup_so_pinned_versions_survive() {
.await
.unwrap();
ds = store
.commit_staged(Arc::new(ds.clone()), staged.transaction)
.commit_staged(Arc::new(ds.clone()), staged)
.await
.unwrap();
}

View file

@ -15,6 +15,7 @@
mod helpers;
use arrow_array::Array;
use lance::Dataset;
use omnigraph::db::commit_graph::CommitGraph;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::error::OmniError;
@ -504,6 +505,11 @@ query delete_two_persons($first: String, $second: String) {
delete Person where name = $second
}
query delete_overlapping_persons($name: String, $threshold: I32) {
delete Person where name = $name
delete Person where age > $threshold
}
query update_age_by_name($name: String, $age: I32) {
update Person set { age: $age } where name = $name
}
@ -554,6 +560,111 @@ async fn mutation_rejects_mixed_insert_and_delete_at_parse_time() {
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
}
/// Overlapping delete predicates within one query must NOT double-count
/// `affected_*`. Deletes stage (they no longer inline-commit), so both
/// statements scan the same unchanged committed snapshot; counting each
/// predicate independently over-reports when they overlap. The contract —
/// matching the old inline path, where each delete committed before the next
/// ran — is the DISTINCT count of rows removed (= what the combined
/// `(p1) OR (p2)` staged delete actually removes).
///
/// Fixture: Alice(30), Bob(25), Charlie(35), Diana(28); Knows Alice→Bob,
/// Alice→Charlie, Bob→Diana; WorksAt Alice→Acme, Bob→Globex. `name = "Alice"`
/// `age > 29` = {Alice, Charlie} (2 distinct nodes); the combined cascade
/// removes {Alice→Bob, Alice→Charlie, Alice→Acme} (3 distinct edges — Charlie
/// adds none new). Buggy per-statement counting reports 3 nodes / 6 edges.
#[tokio::test]
async fn overlapping_delete_predicates_do_not_double_count_affected() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let r = db
.mutate(
"main",
STAGED_QUERIES,
"delete_overlapping_persons",
&mixed_params(&[("$name", "Alice")], &[("$threshold", 29)]),
)
.await
.expect("delete-only mutation must succeed");
assert_eq!(
r.affected_nodes, 2,
"distinct nodes removed are {{Alice, Charlie}}; overlapping predicates must not double-count",
);
assert_eq!(
r.affected_edges, 3,
"distinct edges removed are {{Alice→Bob, Alice→Charlie, Alice→Acme}}; cascade must not double-count",
);
// The data is correct regardless of the count: Bob + Diana remain.
assert_eq!(count_rows(&db, "node:Person").await, 2, "Bob and Diana remain");
assert_eq!(count_rows(&db, "edge:Knows").await, 1, "only Bob→Diana remains");
assert_eq!(
count_rows(&db, "edge:WorksAt").await,
1,
"only Bob→Globex remains",
);
}
/// The overlap-exclusion filter must use SQL `IS NOT TRUE`, not `NOT`: a prior
/// delete predicate referencing a NULLable column must NOT drop a later
/// statement's matching row just because that column is NULL (SQL UNKNOWN).
/// With `NOT (age > 30)`, a row with NULL `age` makes the clause UNKNOWN and the
/// row is filtered out of `deleted_ids` — skipping its cascade (orphaned edges),
/// or, if it is the only match, leaving the node undeleted. This is a data bug,
/// not just a miscount.
///
/// Data: Charlie (age 35), Zoe (age NULL); Knows Zoe→Charlie. The query deletes
/// `age > 30` (Charlie) then `name = "Zoe"`. Zoe must still be deleted and her
/// edge cascaded despite the prior `age > 30` evaluating to UNKNOWN for her.
#[tokio::test]
async fn delete_dedup_filter_does_not_drop_null_column_rows() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = r#"
node Person {
name: String @key
age: I32?
}
edge Knows: Person -> Person
"#;
let data = r#"{"type":"Person","data":{"name":"Charlie","age":35}}
{"type":"Person","data":{"name":"Zoe"}}
{"edge":"Knows","from":"Zoe","to":"Charlie"}"#;
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(&mut db, data, LoadMode::Overwrite).await.unwrap();
let q = r#"
query del_age_then_name($threshold: I32, $name: String) {
delete Person where age > $threshold
delete Person where name = $name
}
"#;
let r = db
.mutate(
"main",
q,
"del_age_then_name",
&mixed_params(&[("$name", "Zoe")], &[("$threshold", 30)]),
)
.await
.expect("delete-only mutation must succeed");
assert_eq!(
count_rows(&db, "node:Person").await,
0,
"both Charlie (age>30) and Zoe (name=Zoe, NULL age) must be deleted",
);
assert_eq!(
count_rows(&db, "edge:Knows").await,
0,
"Zoe→Charlie must cascade — Zoe's NULL age must not skip her cascade",
);
assert_eq!(r.affected_nodes, 2, "Charlie + Zoe");
assert_eq!(r.affected_edges, 1, "Zoe→Charlie, counted once");
}
/// `insert Person 'X'; update Person where name='X' set age=...` — both
/// ops produce content on `node:Person` and coalesce into one
/// `stage_merge_insert` at end-of-query. The accumulator's last-write-wins
@ -1662,7 +1773,7 @@ async fn branch_cascade_delete_forks_node_and_edges_under_held_queues() {
// #283: a mutation predicate (`where camelField = ...`) on a camelCase column
// must execute, not fail at the Lance scan with "No field named ...". Covers
// both `update` (committed scan via scan_with_pending) and `delete`
// (delete_where), which share the same emitted SQL filter string.
// (stage_delete), which share the same emitted SQL filter string.
const CC_SCHEMA: &str = r#"
node Doc {
slug: String @key
@ -1726,6 +1837,63 @@ query chain($repo: String) {
assert_eq!(r.affected_nodes, 2, "both ops should touch the acme Doc (read-your-writes)");
}
/// A zero-row cascade delete must not advance an edge table's Lance HEAD past
/// its manifest version. A `delete <Node>` cascades a delete into every incident
/// edge type (`exec/mutation.rs`). The original bug this guards against: the old
/// inline `delete_where` (`Dataset::delete`) advanced Lance HEAD **even when zero
/// edges matched**, while the cascade recorded the new version in the manifest
/// only `if deleted_rows > 0`. So deleting a node with no incident edges advanced
/// `edge:Knows` Lance HEAD while the manifest stayed behind — a `HEAD > manifest`
/// drift that then tripped the next strict write's `ExpectedVersionMismatch`, and
/// `repair` refused (delete-class drift), wedging the graph.
///
/// This pins the invariant directly: after any node delete, every edge table's
/// manifest version must equal its on-disk Lance HEAD — no write may advance HEAD
/// past the manifest (invariant 2 / the deny-list). Now GREEN: `delete` is staged
/// (MR-A / iss-950, via Lance 7.0's `DeleteBuilder::execute_uncommitted`), so a
/// 0-row delete commits no Lance version at all — correct by construction.
#[tokio::test]
async fn node_delete_with_no_incident_edges_leaves_no_edge_table_drift() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let root = dir.path().to_str().unwrap().to_string();
// A person with NO Knows edges. Deleting it cascades a 0-row delete
// into `edge:Knows` (the cascade runs for every incident edge type).
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Loner")], &[("$age", 30)]),
)
.await
.unwrap();
mutate_main(
&mut db,
MUTATION_QUERIES,
"remove_person",
&params(&[("$name", "Loner")]),
)
.await
.expect("the first delete itself succeeds — it leaves the drift for the NEXT write");
// The invariant: edge:Knows manifest version == its on-disk Lance HEAD.
let snap = snapshot_main(&db).await.unwrap();
let entry = snap
.entry("edge:Knows")
.expect("edge:Knows must be in the manifest");
let full = format!("{}/{}", root.trim_end_matches('/'), entry.table_path);
let head = Dataset::open(&full).await.unwrap().version().version;
assert_eq!(
entry.table_version, head,
"a node delete matching no edges advanced edge:Knows Lance HEAD to v{head} but the \
manifest still records v{} HEAD>manifest drift from a 0-row cascade delete. A staged \
0-row delete must commit no Lance version at all (MR-A); this drift means that \
regressed.",
entry.table_version,
);
}
/// RFC-013 PR2 #1b: the publisher folds the new `known_state` in-memory after a
/// publish instead of re-scanning `__manifest`. That fold MUST be byte-identical
/// to a fresh re-scan, or the warm coordinator silently desyncs. After a sequence