fix(engine): stop branch-merge fast-forward OOM on embedding tables (#277)

* fix(engine): stop branch-merge fast-forward OOM on embedding tables

A branch→main fast-forward merge of a forked, embedding-bearing table
re-derived the whole branch row-by-row: it lumped new + changed rows into
one Lance `merge_insert`, i.e. a full-outer hash join over the entire
delta that exhausts the DataFusion memory pool (8k rows × 3072-dim →
`Resources exhausted: 188MB HashJoinInput, 100MB pool`), so the merge
hung/failed instead of completing.

Fix the data path on existing, substrate-supported primitives:

- Adopt-with-delta split: new rows → `stage_append` (a streaming
  `Operation::Append`, no hash join), only genuinely-changed rows →
  a bounded `stage_merge_insert`, deletes inline. New `AdoptDelta` /
  `compute_adopt_delta` / `publish_adopted_delta` replace the combined
  `compute_source_delta` path; the three-way merge path is untouched.
- Stream the append via `stage_append_stream` →
  `execute_uncommitted_stream` (the substrate-blessed bulk-append path),
  removing the `Vec`+`concat` full-delta materialization. Blob-aware via
  `scan_stream_for_rewrite`. Exposed on the sealed `TableStorage` trait.
- Lazy row-signature: stop stringifying every row's embedding eagerly;
  compute the signature only for the `(Some,Some)` changed-candidate arm.
- Index coverage is reconciler-owned: the adopt path no longer rebuilds
  vector/FTS indexes inline; `optimize`/`ensure_indices` folds the new
  rows in (reads stay correct via brute-force tail). Post-merge
  index-coverage contract documented in docs/user/branching/merge.md.
- Recovery pin: new `CandidateTableState::AdoptWithDelta` is classified
  and pinned so the append's HEAD advance is sidecar-covered
  (invariant 5); the `BranchMerge` sidecar's loose classification covers
  the two-commit shape.

The regression gate is structural, not a brittle size threshold: task-local
write probes assert an append-only fast-forward merge does 0
`stage_merge_insert` (the OOM hash join), appends via `stage_append`, and
streams (0 whole-delta materialization). Plus functional correctness,
blob round-trip, index-defer, and a Phase-B failpoint recovery test.

Residual: the classify-time staging round-trip is still O(N) in memory
(architecturally required for the all-or-nothing multi-table publish);
bounding it fully is the fragment-adopt follow-up.

* test(engine): partial branch-merge Phase B must roll back (RED regression)

A branch-merge per-table publish is a multi-commit sequence — adopt:
append → upsert → delete; three-way: merge_insert → delete → index — each
step advancing Lance HEAD before the single manifest publish. Add four
failpoint sites at those windows and four regression tests (mixed delta:
a fresh id, a modified base id, a removed base id) asserting that a crash
mid-sequence rolls the whole merge BACK on the next open and a re-run
re-applies the full delta.

RED against current code: the loose `BranchMerge` classification rolls any
`lance_head > manifest_pinned` forward, so the partial is published and the
merge recorded — the rolled-back-to-base assertion fails with the partial
state visible (e.g. bob appended, dave not deleted). The fix lands next.

The failpoint sites are no-ops unless the `failpoints` feature activates them.

* fix(engine): roll back partial branch-merge Phase B (recovery WAL confirmation)

A branch-merge publishes each table with several Lance commits (adopt:
append → upsert → delete; three-way: merge_insert → delete → index), then
one manifest publish makes them atomic. Recovery classified `BranchMerge`
loosely: any `lance_head > manifest_pinned` with a matching CAS pin rolled
*forward* to the observed HEAD. So a crash mid-sequence published a partial
delta (e.g. the append without its sibling upsert/delete) and recorded the
merge as complete — silent data loss; a re-merge sees "already up to date"
and never repairs it.

Fix: make the recovery sidecar a two-phase WAL for `BranchMerge`. After the
whole per-table publish loop completes, stamp each pin's `confirmed_version`
with its exact achieved Lance version (a second sidecar write), then publish
the manifest. Recovery now:

- rolls FORWARD only to a pin's `confirmed_version` (set ⇒ Phase B finished);
- rolls BACK (`TableClassification::IncompletePhaseB`) when the HEAD moved but
  no confirmation was recorded ⇒ a partial publish ⇒ all-or-nothing restore to
  the manifest pin, so a re-run re-applies the full delta.

Scope: `BranchMerge` only. Other loose writers (`SchemaApply`,
`EnsureIndices`, `Optimize`) keep the loose roll-forward — their drift is
derived state (index coverage, compaction) a partial roll-forward never
corrupts, so confirmation would be cost without benefit.

This is the write-ahead intent record + idempotent roll-forward that the
fast-forward-main commit model requires to be crash-atomic across N tables;
version-recorded (not phase-count-derived), so it survives later changes to
the per-table commit sequence.

Regression tests (failpoints): four partial-window crashes — adopt
after-append / after-upsert, three-way after-merge / after-delete — each with
a mixed delta (new id, modified id, removed id) now roll the whole merge back;
the existing complete-Phase-B tests still roll forward.

* fix(engine): scope merge index docs to fast-forward; record append probe after write

Two PR-review fixes:

- docs(merge): the "a merge does not build indexes inline" note only holds for
  the fast-forward / adopt path (deferred to the reconciler). The three-way
  `Merged` path still rebuilds indexes inline in its publish, so a
  Merged-outcome merge of an embedding table pays the build up front. Scope the
  doc so a Merged-outcome user isn't surprised or led to skip a post-merge
  optimize.

- `stage_append` recorded its instrumentation probe before the fallible
  `execute_uncommitted`, so a failed staging write left the call/row counters
  inflated — and diverged from `stage_append_stream`, which records after the
  transaction is built. Record after the write succeeds.

* fix(engine): record stage_merge_insert / vector-index probes after write too

The prior commit moved `stage_append`'s instrumentation probe to after the
write, but left the two sibling write primitives with the identical ordering
bug: `stage_merge_insert` recorded before `execute_uncommitted`, and
`create_vector_index` before the index build. A failed write on either would
inflate the probe counter. Move both to record only after the write succeeds,
so all write-primitive probes share one rule (record-after-success) — closing
the class rather than the single instance the review flagged.

* docs(engine): mark the fragment-adopt excision boundary in the merge code

Comment the transitional row-level merge code so a future fragment-adopt
implementation (Lance branch-merge/rebase #7263 + UUID branch paths #7185)
knows exactly what it deletes and what it keeps:

- `AdoptDelta` / `compute_adopt_delta` / `publish_adopted_delta` — the row-level
  re-derivation; removed wholesale when a fast-forward merge becomes a fragment
  graft (adopt the source table version's fragments + indexes by reference).
- `stage_append_stream` — its only caller is that merge append; dead with it
  unless re-adopted as a general bulk-append path.
- `confirm_sidecar_phase_b` — the boundary marker: this SURVIVES. The recovery
  sidecar is the cross-table WAL a fast-forward-main commit still needs; only the
  within-table multi-commit reason for `IncompletePhaseB` narrows once each table
  is a single graft commit. Keep the sidecar; only simplify the classifier.

Comments only; no behavior change.

* test(engine): pre-upgrade v1 branch-merge sidecar must roll forward (RED)

Phase-B confirmation made the recovery classifier strict for every BranchMerge
sidecar — including ones written by a binary that predates confirmation. A
pre-upgrade crash in the Phase-B→C gap can leave such a sidecar over a COMPLETED
merge; the new classifier reads its absent confirmed_version as a partial and
rolls it back, silently discarding the finished merge (greptile P1 / Cursor High).

This regression test synthesizes that sidecar realistically: crash after Phase B
(real sidecar + advanced Lance HEAD), downgrade the on-disk JSON to the
pre-confirmation v1 shape (schema_version=1, strip confirmed_version), reopen.
RED: the merge rolls back, `bob` is discarded (left ["alice"], want
["alice","bob"]). The versioning fix lands next.

* fix(engine): version the recovery sidecar; read pre-confirmation merges as loose

Phase-B confirmation changed how a BranchMerge sidecar's absent confirmed_version
is interpreted (roll forward → roll back) without versioning the artifact, so the
new classifier silently discarded completed pre-upgrade merges (greptile P1 /
Cursor High). A capability flag would not fix the symmetric direction — keeping
schema_version=1, an OLD binary reading a NEW sidecar sails through its
already-shipped strict gate, ignores the unknown flag, and applies loose
semantics to a new partial → the same data loss on downgrade. Use the versioning
system instead.

- Bump SIDECAR_SCHEMA_VERSION 1 → 2; add a fixed CONFIRMATION_SCHEMA_VERSION = 2
  (the generation at which confirmation shipped — pinned, so a later v3 keeps v2
  confirmation-aware).
- Make the read gate version-aware (`parse_sidecar`): refuse only versions NEWER
  than this binary; accept and interpret older ones with their original
  semantics — no operator toil draining pre-upgrade sidecars. Rename
  `SidecarSchemaError.supported_version` → `max_supported_version` and reword.
- Dispatch classification by version: the strict BranchMerge confirmation path is
  gated on `schema_version >= CONFIRMATION_SCHEMA_VERSION`; a v1 BranchMerge
  sidecar falls through to the existing loose roll-forward. Thread
  `sidecar.schema_version` from `process_sidecar`.

This is bidirectionally safe: a new binary interprets v1 (loose) and v2 (strict)
and refuses the future; an old binary's `!= 1` gate already refuses v2, so it
never misreads a new sidecar. The flag was an additive-field pattern misapplied
to a semantics change; versioning is the correct mechanism.

Honest residual (any approach): an old *partial* sidecar still rolls forward —
v1 carries no confirmation, so partialness is undetectable in it. The fix stops
us from interpreting old sidecars under new rules; it can't retrofit information
they never had.

* fix(engine): harden recovery — mode resolver, loud divergence check, publish classified version

Three correct-by-design fixes from the holistic review of the recovery path, all
in recovery.rs (each closes a class, not an instance):

A. Resolve the classification mode from `(kind, schema_version)` once, instead of
   a kind×version match accreting fall-through guards in `classify_table`. New
   `ClassificationMode { Strict, Loose, Confirmed }` + an exhaustive
   `SidecarKind::classification_mode` — adding a writer kind or version floor is
   now one arm in the resolver (the compiler forces it), not a guard threaded
   through the classifier. No behavior change; existing classify/decide tests are
   the guard.

B. `confirm_sidecar_phase_b` now errors loudly when a pinned table has no achieved
   version in the publish `updates`, instead of silently skipping it (which left
   the pin unconfirmed → `IncompletePhaseB` → a silent rollback of a COMPLETE
   merge). Guards the implicit `pins ⊆ updates` invariant against a future
   divergence between the two filters (invariants 9/13). + a unit test.

C. Recovery roll-forward publishes the version classification OBSERVED
   (`state.lance_head`), not a fresh HEAD re-read at publish time. For a Confirmed
   pin classify already validated `lance_head == confirmed_version`, so this
   publishes the recorded WAL intent by construction and closes the
   classify→publish re-derivation/TOCTOU for every writer (invariant 15).
   `push_table_update_at_head` → `push_table_update(target_version: Option<u64>)`:
   roll-forward pins the classified version; roll-back keeps `None` (publishes the
   restore commit it just made). In-scope behavior is preserved, so the existing
   roll-forward integration tests are the guard; the drift-hardening is
   correct-by-construction (deterministic mid-sweep drift injection isn't feasible
   — a sync failpoint can't do an async Lance write).
This commit is contained in:
Ragnor Comerford 2026-06-19 00:15:06 +02:00 committed by GitHub
parent f2c512ae26
commit 7168ee0ed0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1670 additions and 234 deletions

View file

@ -33,10 +33,10 @@ pub(crate) use namespace::open_table_head_for_write;
use namespace::{branch_manifest_namespace, staged_table_namespace};
use publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
pub(crate) use recovery::{
RecoveryMode, RecoverySidecarHandle, SidecarKind, SidecarTablePin, SidecarTableRegistration,
SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, heal_pending_sidecars_roll_forward,
list_sidecars, new_sidecar, recover_manifest_drift, schema_apply_serial_queue_key,
write_sidecar,
RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin,
SidecarTableRegistration, SidecarTombstone, confirm_sidecar_phase_b, delete_sidecar,
has_schema_apply_sidecar, heal_pending_sidecars_roll_forward, list_sidecars, new_sidecar,
recover_manifest_drift, schema_apply_serial_queue_key, write_sidecar,
};
pub use state::SubTableEntry;
#[cfg(test)]

View file

@ -62,10 +62,26 @@ pub(crate) const RECOVERY_ACTOR: &str = "omnigraph:recovery";
/// Subdirectory under the graph root holding sidecar files.
pub(crate) const RECOVERY_DIR_NAME: &str = "__recovery";
/// Current sidecar JSON shape version. Bumping this is a breaking change:
/// older binaries will refuse to interpret newer sidecars (intentional —
/// see [`SidecarSchemaError`]).
pub(crate) const SIDECAR_SCHEMA_VERSION: u32 = 1;
/// Max sidecar JSON shape/semantics version this binary writes and understands.
/// The reader accepts every version `<= ` this and refuses only versions ABOVE
/// it (an older binary cannot guess semantics a newer writer baked in — see
/// [`SidecarSchemaError`] and [`parse_sidecar`]). Bump this whenever a change
/// alters how an existing field is *interpreted* (not merely adds an optional
/// one), and add a fixed `*_SCHEMA_VERSION` floor like the one below so older
/// generations keep their original semantics.
///
/// v1 → v2: Phase-B confirmation. A `BranchMerge` sidecar at v2 carries
/// `confirmed_version` and is classified strictly (unconfirmed ⇒ partial ⇒ roll
/// back); at v1 it predates confirmation and keeps the loose roll-forward. The
/// reader must distinguish the two, so this is a real version bump, not an
/// additive field.
pub(crate) const SIDECAR_SCHEMA_VERSION: u32 = 2;
/// The version at which Phase-B confirmation shipped. A `BranchMerge` sidecar is
/// confirmation-aware (strict classification) iff `schema_version >=` this.
/// FIXED at 2 — NOT derived from [`SIDECAR_SCHEMA_VERSION`] — so a future bump to
/// v3+ still treats v2 sidecars as confirmation-aware.
pub(crate) const CONFIRMATION_SCHEMA_VERSION: u32 = 2;
/// Selects which recovery actions are allowed in a sweep.
///
@ -115,6 +131,54 @@ pub(crate) enum SidecarKind {
Optimize,
}
/// Which recovery-classification semantics a sidecar's tables use. Resolved once
/// from `(writer_kind, schema_version)` — see [`SidecarKind::classification_mode`]
/// — so [`classify_table`] dispatches on the mode instead of re-deriving it from
/// a kind×version match. Adding a writer kind or a version floor is then one arm
/// in the resolver, not a guard threaded through `classify_table`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ClassificationMode {
/// Exactly one `commit_staged` per table (`Mutation`, `Load`): require
/// `lance_head == manifest_pinned + 1` and the pin to match.
Strict,
/// N ≥ 1 commits per table whose drift is content-preserving / derived
/// state (`SchemaApply`, `EnsureIndices`, `Optimize`, and pre-confirmation
/// `BranchMerge`): any `lance_head > manifest_pinned` rolls forward.
Loose,
/// Multi-commit publish of *distinct logical rows* with a recorded
/// `confirmed_version` (`BranchMerge` at `schema_version >=
/// CONFIRMATION_SCHEMA_VERSION`): roll forward ONLY to the confirmed
/// version; an unconfirmed moved HEAD is a partial publish and rolls back.
Confirmed,
}
impl SidecarKind {
/// Resolve the classification mode for this writer at a given sidecar
/// `schema_version`. Exhaustive over `SidecarKind`, so adding a variant is a
/// compile error here until its recovery semantics are declared.
pub(crate) fn classification_mode(self, schema_version: u32) -> ClassificationMode {
match self {
SidecarKind::Mutation | SidecarKind::Load => ClassificationMode::Strict,
// BranchMerge gained two-phase confirmation at
// `CONFIRMATION_SCHEMA_VERSION`. A sidecar written before that
// carries no `confirmed_version` and must keep the prior loose
// roll-forward — classifying it strictly would misread a *completed*
// pre-upgrade merge as a partial and roll it back. (The read gate
// already refused any version newer than this binary.)
SidecarKind::BranchMerge => {
if schema_version >= CONFIRMATION_SCHEMA_VERSION {
ClassificationMode::Confirmed
} else {
ClassificationMode::Loose
}
}
SidecarKind::SchemaApply | SidecarKind::EnsureIndices | SidecarKind::Optimize => {
ClassificationMode::Loose
}
}
}
}
/// One table's contribution to a sidecar's intended commit. The classifier
/// uses these to decide per-table state at recovery time.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@ -126,8 +190,22 @@ pub(crate) struct SidecarTablePin {
/// Manifest-pinned version at writer start (CAS expectation).
pub expected_version: u64,
/// Lance HEAD that the writer's `commit_staged` would produce
/// (typically `expected_version + 1`).
/// (typically `expected_version + 1`). For multi-commit writers this is
/// only a *lower bound* — see `confirmed_version`.
pub post_commit_pin: u64,
/// Phase-B confirmation: the exact Lance HEAD this table reached once the
/// writer's *entire* multi-commit publish for it finished, recorded by a
/// second sidecar write immediately before the manifest publish (Phase C).
/// `None` means Phase B did not complete (the writer crashed mid-publish),
/// so the on-disk drift is a *partial* commit and recovery must roll the
/// whole operation BACK rather than publish an incomplete state. Only the
/// `BranchMerge` writer records this today (its per-table publish is
/// append → upsert → delete, several HEAD advances that the manifest
/// publish makes atomic); other writers leave it `None` and keep their
/// existing loose roll-forward. Backward-compatible: absent on older
/// sidecars.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub confirmed_version: Option<u64>,
/// Lance branch ref this table lives on (mirrors
/// `SubTableEntry::table_branch`). Required for the recovery sweep
/// to open the dataset at the correct ref — `Dataset::open(path)`
@ -218,25 +296,27 @@ pub(crate) struct RecoverySidecarHandle {
pub(crate) sidecar_uri: String,
}
/// Error returned when the sidecar's `schema_version` is unknown to this
/// binary. We refuse-and-error rather than read-and-warn: an old binary
/// cannot guess what semantics a newer writer baked into a future shape.
/// Operator action is required (typically: upgrade the binary).
/// Error returned when the sidecar's `schema_version` is NEWER than this binary
/// understands. We refuse-and-error rather than read-and-warn: an old binary
/// cannot guess what semantics a newer writer baked into a future shape. (Older
/// versions are accepted and interpreted with their original semantics — see
/// [`parse_sidecar`].) Operator action is required (typically: upgrade the
/// binary).
#[derive(Debug)]
pub(crate) struct SidecarSchemaError {
pub sidecar_uri: String,
pub found_version: u32,
pub supported_version: u32,
pub max_supported_version: u32,
}
impl std::fmt::Display for SidecarSchemaError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"recovery sidecar at '{}' declares schema_version={}, but this \
binary supports only schema_version={}; refusing to interpret \
"recovery sidecar at '{}' declares schema_version={}, newer than the \
maximum this binary supports (schema_version={}); refusing to interpret \
upgrade omnigraph or remove the sidecar with operator review",
self.sidecar_uri, self.found_version, self.supported_version,
self.sidecar_uri, self.found_version, self.max_supported_version,
)
}
}
@ -271,6 +351,14 @@ pub(crate) enum TableClassification {
/// previous restore attempt or an external mutation. Roll back to
/// the manifest pin.
UnexpectedMultistep,
/// A confirmation-using writer (`BranchMerge`) advanced this table's HEAD
/// (`lance_head > manifest_pinned`) but the sidecar carries no
/// `confirmed_version` — its multi-commit publish crashed mid-flight, so
/// the drift is a *partial* commit (e.g. an append without its sibling
/// upsert/delete). Roll back to the manifest pin; the whole operation is
/// re-run from scratch. Distinct from `UnexpectedMultistep` so the audit
/// records a partial Phase B, not a foreign mutation.
IncompletePhaseB,
/// `lance_head < manifest_pinned`. Should be impossible: the manifest
/// pin can only advance after a successful Lance commit. Surface
/// loudly and abort recovery.
@ -341,6 +429,58 @@ pub(crate) async fn write_sidecar(
})
}
/// Phase-B confirmation: stamp each pin with the exact Lance HEAD its publish
/// reached, then re-write the sidecar in place (same object). Called once, after
/// the writer's whole multi-commit publish completed and before the manifest
/// publish (Phase C). Recovery then rolls forward ONLY to these confirmed
/// versions; a sidecar still missing them is a partial Phase B that rolls back.
///
/// Overwriting the same object is atomic (same contract as [`write_sidecar`]):
/// a torn rewrite is never observed, so recovery reads either the pre-confirm
/// sidecar (→ roll back, safe) or the confirmed one (→ roll forward). A failure
/// here leaves the pre-confirm sidecar, so the operation rolls back — correct.
///
/// SURVIVES the fragment-adopt work (unlike the row-level merge it currently
/// serves — see `AdoptDelta` in `exec/merge.rs`). The recovery sidecar is the
/// cross-table write-ahead log that makes a fast-forward-main commit
/// all-or-nothing across N tables, which a fragment graft still needs. What
/// narrows is the *within-table* reason for confirmation: once each table's
/// merge is a single graft commit, the multi-step partial window shrinks to one
/// commit, so the `BranchMerge` arm of `classify_table` could fold back into the
/// strict single-commit path and `IncompletePhaseB` retire. Do NOT delete this
/// with the row path — keep the sidecar; only simplify the classifier.
pub(crate) async fn confirm_sidecar_phase_b(
root_uri: &str,
storage: &dyn StorageAdapter,
sidecar: &mut RecoverySidecar,
confirmed_versions: &HashMap<String, u64>,
) -> Result<()> {
// Failpoint: models a storage failure on the confirmation write — the
// pre-confirm sidecar stays on disk, so recovery rolls the operation back.
crate::failpoints::maybe_fail("recovery.sidecar_confirm")?;
for pin in &mut sidecar.tables {
// Every pinned table MUST have an achieved version. A miss means the
// pin set and the publish `updates` diverged — fail loudly at the
// producer rather than leave the pin unconfirmed, which recovery would
// read as a partial Phase B and silently roll the whole (complete) merge
// back. Today the two are kept in lockstep by construction; this guards
// the invariant against a future edit to either filter.
let version = confirmed_versions.get(&pin.table_key).ok_or_else(|| {
OmniError::manifest_internal(format!(
"confirm_sidecar_phase_b: no achieved version for pinned table '{}' \
(pins and publish updates diverged)",
pin.table_key
))
})?;
pin.confirmed_version = Some(*version);
}
let uri = sidecar_uri(root_uri, &sidecar.operation_id);
let json = serde_json::to_string_pretty(sidecar).map_err(|err| {
OmniError::manifest_internal(format!("failed to serialize recovery sidecar: {}", err))
})?;
storage.write_text(&uri, &json).await
}
/// Delete a sidecar after Phase C succeeded. Idempotent (safe to retry).
pub(crate) async fn delete_sidecar(
handle: &RecoverySidecarHandle,
@ -408,11 +548,15 @@ pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result<RecoverySid
sidecar_uri, err
))
})?;
if peek.schema_version != SIDECAR_SCHEMA_VERSION {
// Accept every version we were built to understand (`<= max`); refuse only
// versions NEWER than us. Interpreting older generations with their original
// semantics (rather than refusing them) is what avoids billing operators to
// drain pre-upgrade sidecars; classification then dispatches by version.
if peek.schema_version > SIDECAR_SCHEMA_VERSION {
return Err(SidecarSchemaError {
sidecar_uri: sidecar_uri.to_string(),
found_version: peek.schema_version,
supported_version: SIDECAR_SCHEMA_VERSION,
max_supported_version: SIDECAR_SCHEMA_VERSION,
}
.into());
}
@ -427,26 +571,38 @@ pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result<RecoverySid
/// Classify one table's observed state vs. the sidecar's intent.
///
/// `kind` adjusts the precision of the `RolledPastExpected` predicate:
/// - **Confirmation** (`BranchMerge`): the writer's per-table publish is
/// several HEAD advances (append → upsert → delete), so a bare
/// `lance_head > manifest_pinned` is ambiguous — it may be a *complete*
/// publish or a *partial* one crashed mid-sequence. The writer resolves
/// the ambiguity by recording the exact achieved version
/// (`confirmed_version`) only after the whole publish finished. So roll
/// forward ONLY to that confirmed version; a missing confirmation is a
/// partial commit (`IncompletePhaseB`) and rolls back. This is the safe
/// form of the loose match for writers where a partial would publish an
/// incomplete delta.
/// - **Strict** (`Mutation`, `Load`): exactly one `commit_staged` per
/// table, so `lance_head == manifest_pinned + 1` AND
/// `post_commit_pin == lance_head` is required.
/// - **Loose** (`SchemaApply`, `EnsureIndices`, `BranchMerge`,
/// `Optimize`): the writer advances the Lance HEAD by N ≥ 1 commits
/// per table (one per index built + one for the overwrite, etc.;
/// merge tables run merge_insert + delete_where + index rebuilds;
/// `Optimize` runs `compact_files`, which commits reserve-fragments +
/// rewrite) and the exact N is hard to compute at sidecar-write time.
/// The loose match accepts
/// - **Loose** (`SchemaApply`, `EnsureIndices`, `Optimize`): the writer
/// advances the Lance HEAD by N ≥ 1 commits per table (one per index
/// built + one for the overwrite, etc.; `Optimize` runs `compact_files`,
/// which commits reserve-fragments + rewrite) and the exact N is hard to
/// compute at sidecar-write time. The loose match accepts
/// any `lance_head > manifest_pinned` as `RolledPastExpected` when
/// `pin.expected_version == manifest_pinned` (the writer's CAS
/// target matches what the manifest currently shows). The risk this
/// admits — an external agent advancing HEAD between sidecar write
/// and recovery — is out of scope for the single-coordinator model.
/// target matches what the manifest currently shows). This is safe for
/// these writers because their drift is derived state (index coverage,
/// compaction) the reconciler reproduces — a partial roll-forward loses
/// no logical rows. The risk it admits — an external agent advancing HEAD
/// between sidecar write and recovery — is out of scope for the
/// single-coordinator model.
pub(crate) fn classify_table(
pin: &SidecarTablePin,
lance_head: u64,
manifest_pinned: u64,
kind: SidecarKind,
schema_version: u32,
) -> TableClassification {
use TableClassification::*;
if lance_head < manifest_pinned {
@ -457,27 +613,49 @@ pub(crate) fn classify_table(
if lance_head == manifest_pinned {
return NoMovement;
}
// lance_head > manifest_pinned
let strict = matches!(kind, SidecarKind::Mutation | SidecarKind::Load);
if strict {
if lance_head == manifest_pinned + 1 {
if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head {
RolledPastExpected
} else {
UnexpectedAtP1
// lance_head > manifest_pinned. The "which semantics" decision is resolved
// once from (kind, schema_version); dispatch on it.
match kind.classification_mode(schema_version) {
ClassificationMode::Confirmed => {
// Two-phase confirmation: roll forward only to the exact version the
// writer recorded after its whole multi-commit publish completed. No
// confirmation ⇒ the publish crashed mid-sequence ⇒ partial ⇒ roll
// back. A confirmation that doesn't match the observed HEAD means a
// foreign writer advanced the table — don't roll a surprise forward.
match pin.confirmed_version {
Some(confirmed)
if lance_head == confirmed && pin.expected_version == manifest_pinned =>
{
RolledPastExpected
}
Some(_) => UnexpectedMultistep,
None => IncompletePhaseB,
}
} else {
// lance_head > manifest_pinned + 1
UnexpectedMultistep
}
} else {
// Loose match for multi-commit writers (SchemaApply, EnsureIndices).
if pin.expected_version == manifest_pinned {
RolledPastExpected
} else if lance_head == manifest_pinned + 1 {
UnexpectedAtP1
} else {
UnexpectedMultistep
ClassificationMode::Strict => {
if lance_head == manifest_pinned + 1 {
if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head {
RolledPastExpected
} else {
UnexpectedAtP1
}
} else {
// lance_head > manifest_pinned + 1
UnexpectedMultistep
}
}
ClassificationMode::Loose => {
// Multi-commit writers whose drift is content-preserving / derived
// state (and pre-confirmation BranchMerge sidecars): any
// `lance_head > manifest_pinned` rolls forward when the CAS target
// matches what the manifest currently shows.
if pin.expected_version == manifest_pinned {
RolledPastExpected
} else if lance_head == manifest_pinned + 1 {
UnexpectedAtP1
} else {
UnexpectedMultistep
}
}
}
}
@ -496,7 +674,7 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision
}
if classifications
.iter()
.any(|c| matches!(c, NoMovement | UnexpectedAtP1 | UnexpectedMultistep))
.any(|c| matches!(c, NoMovement | UnexpectedAtP1 | UnexpectedMultistep | IncompletePhaseB))
{
return RollBack;
}
@ -891,7 +1069,13 @@ async fn process_sidecar(
.map(|e| e.table_version)
.unwrap_or(0);
states.push(ClassifiedTable {
classification: classify_table(pin, lance_head, manifest_pinned, sidecar.writer_kind),
classification: classify_table(
pin,
lance_head,
manifest_pinned,
sidecar.writer_kind,
sidecar.schema_version,
),
manifest_pinned,
lance_head,
});
@ -1028,7 +1212,7 @@ async fn process_sidecar(
Phase C did not land)"
);
let (new_manifest_version, published_versions) =
roll_forward_all(root_uri, sidecar, snapshot).await?;
roll_forward_all(root_uri, sidecar, &states, snapshot).await?;
// `to_version` records the ACTUAL Lance HEAD published for
// each table (not pin.post_commit_pin, which is a lower bound
// for loose-match writers like SchemaApply / EnsureIndices /
@ -1112,6 +1296,7 @@ async fn roll_back_sidecar(
TableClassification::RolledPastExpected
| TableClassification::UnexpectedAtP1
| TableClassification::UnexpectedMultistep
| TableClassification::IncompletePhaseB
) {
restore_table_to_version(
&pin.table_path,
@ -1119,14 +1304,17 @@ async fn roll_back_sidecar(
state.manifest_pinned,
)
.await?;
// Publish the post-restore HEAD, CAS against the current (unmoved)
// manifest pin — the same helper roll-forward uses.
push_table_update_at_head(
// Publish the post-restore HEAD (the restore commit we just made),
// CAS against the current (unmoved) manifest pin — the same helper
// roll-forward uses. `None` target: there is no prior observation to
// pin to; the version to publish is the HEAD the restore produced.
push_table_update(
root_uri,
&pin.table_key,
&pin.table_path,
pin.table_branch.as_deref(),
state.manifest_pinned,
None,
&mut updates,
&mut expected,
)
@ -1227,6 +1415,7 @@ async fn record_audit_recovery_rollforward(
async fn roll_forward_all(
root_uri: &str,
sidecar: &RecoverySidecar,
states: &[ClassifiedTable],
snapshot: &Snapshot,
) -> Result<(u64, HashMap<String, u64>)> {
let total_changes =
@ -1236,22 +1425,25 @@ async fn roll_forward_all(
let mut published_versions: HashMap<String, u64> =
HashMap::with_capacity(sidecar.tables.len() + sidecar.additional_registrations.len());
for pin in &sidecar.tables {
// Publish to the table's CURRENT Lance HEAD on the pin's branch (not the
// sidecar's `post_commit_pin`, a lower bound for loose-match writers that
// run multiple commit_staged calls per table). CAS against the pin's
// pre-write `expected_version`.
let head_version = push_table_update_at_head(
for (pin, state) in sidecar.tables.iter().zip(states.iter()) {
// Publish the version classification OBSERVED (`state.lance_head`), not a
// fresh HEAD re-read. For a `Confirmed` pin classify already validated
// `lance_head == confirmed_version`, so this publishes the recorded WAL
// intent by construction; for loose/strict pins it's the multi-commit
// HEAD classify saw. Single observation, no classify→publish TOCTOU. CAS
// against the pin's pre-write `expected_version`.
let published = push_table_update(
root_uri,
&pin.table_key,
&pin.table_path,
pin.table_branch.as_deref(),
pin.expected_version,
Some(state.lance_head),
&mut updates,
&mut expected,
)
.await?;
published_versions.insert(pin.table_key.clone(), head_version);
published_versions.insert(pin.table_key.clone(), published);
}
// SchemaApply-only: register added tables (and renamed targets) and
@ -1351,45 +1543,61 @@ async fn roll_forward_all(
/// version the table was just restored to). The HEAD is read AFTER any restore
/// in the same single-threaded sweep, so no concurrent writer can have advanced
/// it.
async fn push_table_update_at_head(
/// Stage a manifest `Update` for one table.
///
/// `target_version` selects WHICH Lance version's state to publish:
/// - `Some(v)` — pin the dataset at version `v` and publish it. Roll-FORWARD
/// passes the version classification observed (and, for a `Confirmed` pin,
/// validated equals `confirmed_version`), so recovery publishes the version it
/// *decided* on rather than re-reading a HEAD a concurrent writer may have
/// advanced since classification — one observation, used for both the decision
/// and the publish (invariant 15).
/// - `None` — publish the dataset's current HEAD. Roll-BACK uses this: it just
/// created the restore commit, so HEAD *is* the version to publish.
async fn push_table_update(
root_uri: &str,
table_key: &str,
table_path: &str,
branch: Option<&str>,
expected_version: u64,
target_version: Option<u64>,
updates: &mut Vec<ManifestChange>,
expected: &mut HashMap<String, u64>,
) -> Result<u64> {
let head_ds = Dataset::open(table_path)
let ds = Dataset::open(table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let head_ds = match branch {
Some(b) if b != "main" => head_ds
let ds = match branch {
Some(b) if b != "main" => ds
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => head_ds,
_ => ds,
};
let head_version = head_ds.version().version;
let row_count = head_ds
let ds = match target_version {
Some(v) => ds
.checkout_version(v)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
None => ds,
};
let published_version = ds.version().version;
let row_count = ds
.count_rows(None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))? as u64;
let table_relative_path = super::table_path_for_table_key(table_key)?;
let version_metadata = super::metadata::TableVersionMetadata::from_dataset(
root_uri,
&table_relative_path,
&head_ds,
)?;
let version_metadata =
super::metadata::TableVersionMetadata::from_dataset(root_uri, &table_relative_path, &ds)?;
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: table_key.to_string(),
table_version: head_version,
table_version: published_version,
table_branch: branch.map(str::to_string),
row_count,
version_metadata,
}));
expected.insert(table_key.to_string(), expected_version);
Ok(head_version)
Ok(published_version)
}
/// Append the audit row describing this recovery action.
@ -1573,6 +1781,7 @@ mod tests {
table_path: table_path.to_string(),
expected_version: expected,
post_commit_pin: post,
confirmed_version: None,
table_branch: None,
}
}
@ -1597,30 +1806,39 @@ mod tests {
}
#[test]
fn parse_sidecar_refuses_unknown_schema_version() {
let body = r#"{
"schema_version": 99,
"operation_id": "01H000000000000000000000XX",
"started_at": "0",
"branch": null,
"actor_id": null,
"writer_kind": "Mutation",
"tables": []
}"#;
let err = parse_sidecar("file:///tmp/__recovery/x.json", body).unwrap_err();
fn parse_sidecar_refuses_future_but_accepts_older_schema_version() {
let body = |version: u32| {
format!(
r#"{{
"schema_version": {version},
"operation_id": "01H000000000000000000000XX",
"started_at": "0",
"branch": null,
"actor_id": null,
"writer_kind": "BranchMerge",
"tables": []
}}"#
)
};
// A version NEWER than this binary's max → refuse (can't guess the future).
let err = parse_sidecar("file:///tmp/__recovery/x.json", &body(99)).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("schema_version=99") && msg.contains("supports only schema_version=1"),
"expected SidecarSchemaError mentioning the version mismatch, got: {}",
msg,
msg.contains("schema_version=99") && msg.contains("newer than the maximum"),
"expected a future-version refusal, got: {msg}",
);
// An OLDER version (pre-confirmation v1) → accept and interpret with its
// original semantics; never refuse a version we were built to understand.
let parsed = parse_sidecar("file:///tmp/__recovery/x.json", &body(1))
.expect("a v1 (older) sidecar must parse, not be refused");
assert_eq!(parsed.schema_version, 1);
}
#[test]
fn classify_no_movement_when_head_equals_pinned() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 5, 5, SidecarKind::Mutation),
classify_table(&pin, 5, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION),
TableClassification::NoMovement,
);
}
@ -1629,7 +1847,7 @@ mod tests {
fn classify_rolled_past_expected_when_sidecar_matches_strict() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 6, 5, SidecarKind::Mutation),
classify_table(&pin, 6, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION),
TableClassification::RolledPastExpected,
);
}
@ -1639,7 +1857,7 @@ mod tests {
// Same +1 drift but post_commit_pin says it should be 7, not 6.
let pin = make_pin("node:Person", "irrelevant", 5, 7);
assert_eq!(
classify_table(&pin, 6, 5, SidecarKind::Mutation),
classify_table(&pin, 6, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION),
TableClassification::UnexpectedAtP1,
);
}
@ -1648,7 +1866,7 @@ mod tests {
fn classify_unexpected_multistep_when_head_jumped_more_than_one_strict() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 8, 5, SidecarKind::Mutation),
classify_table(&pin, 8, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION),
TableClassification::UnexpectedMultistep,
);
}
@ -1657,7 +1875,7 @@ mod tests {
fn classify_invariant_violation_when_head_below_pinned() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 3, 5, SidecarKind::Mutation),
classify_table(&pin, 3, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION),
TableClassification::InvariantViolation { observed: 3 },
);
}
@ -1673,7 +1891,7 @@ mod tests {
// built two indices). Strict would say UnexpectedMultistep; loose
// accepts it as RolledPastExpected.
assert_eq!(
classify_table(&pin, 8, 5, SidecarKind::SchemaApply),
classify_table(&pin, 8, 5, SidecarKind::SchemaApply, SIDECAR_SCHEMA_VERSION),
TableClassification::RolledPastExpected,
);
}
@ -1682,7 +1900,7 @@ mod tests {
fn classify_loose_match_accepts_multi_commit_drift_for_ensure_indices() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 9, 5, SidecarKind::EnsureIndices),
classify_table(&pin, 9, 5, SidecarKind::EnsureIndices, SIDECAR_SCHEMA_VERSION),
TableClassification::RolledPastExpected,
);
}
@ -1691,7 +1909,7 @@ mod tests {
fn classify_loose_match_no_movement_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 5, 5, SidecarKind::SchemaApply),
classify_table(&pin, 5, 5, SidecarKind::SchemaApply, SIDECAR_SCHEMA_VERSION),
TableClassification::NoMovement,
);
}
@ -1700,31 +1918,65 @@ mod tests {
fn classify_loose_match_invariant_violation_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 3, 5, SidecarKind::SchemaApply),
classify_table(&pin, 3, 5, SidecarKind::SchemaApply, SIDECAR_SCHEMA_VERSION),
TableClassification::InvariantViolation { observed: 3 },
);
}
/// BranchMerge must be loose-matched, not strict: while the strict
/// classifier expects exactly one `commit_staged` per table,
/// `publish_rewritten_merge_table` runs multiple per table
/// (merge_insert + delete_where + index rebuilds — the comment in
/// `merge.rs` explicitly says so). Strict classification would roll
/// back valid completed Phase B work as `UnexpectedMultistep`.
/// BranchMerge advances each table by several commits per table
/// (adopt: append + upsert + delete; three-way: merge_insert + delete +
/// index), so a bare "HEAD moved" is ambiguous between a complete and a
/// partial publish. At a confirmation-aware version the two-phase
/// confirmation resolves it: roll forward ONLY to the recorded
/// `confirmed_version`; an unconfirmed moved HEAD is a partial publish
/// (`IncompletePhaseB` ⇒ roll back), and a confirmed version that doesn't
/// match the observed HEAD is a foreign advance (`UnexpectedMultistep` ⇒
/// roll back). A *pre-confirmation* (v1) sidecar carries no confirmation and
/// must keep the original loose roll-forward — reading it as strict would
/// roll a completed pre-upgrade merge back (silent discard).
#[test]
fn classify_loose_match_accepts_multi_commit_drift_for_branch_merge() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
fn classify_branch_merge_requires_phase_b_confirmation() {
// Unconfirmed multi-commit drift at a confirmation-aware version →
// partial Phase B → roll back.
let unconfirmed = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 8, 5, SidecarKind::BranchMerge),
classify_table(&unconfirmed, 8, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION),
TableClassification::IncompletePhaseB,
);
// Backward-compat: the SAME unconfirmed pin in a PRE-confirmation (v1)
// sidecar → loose roll-forward (the regression fix — a completed
// pre-upgrade merge must not be discarded).
assert_eq!(
classify_table(
&unconfirmed,
8,
5,
SidecarKind::BranchMerge,
CONFIRMATION_SCHEMA_VERSION - 1,
),
TableClassification::RolledPastExpected,
);
// Confirmed to the observed HEAD → complete Phase B → roll forward.
let confirmed = SidecarTablePin {
confirmed_version: Some(8),
..make_pin("node:Person", "irrelevant", 5, 6)
};
assert_eq!(
classify_table(&confirmed, 8, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION),
TableClassification::RolledPastExpected,
);
// Confirmed, but HEAD drifted past it (foreign writer) → roll back.
assert_eq!(
classify_table(&confirmed, 9, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION),
TableClassification::UnexpectedMultistep,
);
}
#[test]
fn classify_loose_match_branch_merge_no_movement_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 5, 5, SidecarKind::BranchMerge),
classify_table(&pin, 5, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION),
TableClassification::NoMovement,
);
}
@ -1733,7 +1985,7 @@ mod tests {
fn classify_loose_match_branch_merge_invariant_violation_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 3, 5, SidecarKind::BranchMerge),
classify_table(&pin, 3, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION),
TableClassification::InvariantViolation { observed: 3 },
);
}
@ -1888,6 +2140,37 @@ mod tests {
assert!(after.is_empty());
}
#[tokio::test]
async fn confirm_sidecar_phase_b_errors_when_pin_missing_from_updates() {
// A pinned table with no achieved version in the publish `updates` must
// be a loud producer error, NOT a silent skip that leaves the pin
// unconfirmed (which recovery would read as a partial Phase B and roll
// the whole complete merge back). Guards the implicit `pins ⊆ updates`
// invariant against a future divergence between the two filters.
let dir = tempfile::tempdir().unwrap();
let storage = ObjectStorageAdapter::local();
let mut sidecar = new_sidecar(
SidecarKind::BranchMerge,
Some("main".to_string()),
None,
vec![make_pin("node:Person", "file:///tmp/x.lance", 5, 6)],
);
// The confirmed-versions map does NOT cover the pinned table.
let confirmed: HashMap<String, u64> = HashMap::new();
let err = confirm_sidecar_phase_b(
dir.path().to_str().unwrap(),
&storage,
&mut sidecar,
&confirmed,
)
.await
.expect_err("a pinned table with no achieved version must be a loud error");
assert!(
err.to_string().contains("pins and publish updates diverged"),
"expected a pin/updates divergence error, got: {err}",
);
}
#[tokio::test]
async fn list_sidecars_skips_non_json_files() {
let dir = tempfile::tempdir().unwrap();

View file

@ -420,6 +420,9 @@ async fn optimize_one_table(
// Lower bound — compaction commits N≥1 versions (reserve + rewrite);
// the classifier loose-matches SidecarKind::Optimize.
post_commit_pin: expected_version + 1,
// Optimize uses the loose match (drift is derived state), not
// BranchMerge's Phase-B confirmation — left None.
confirmed_version: None,
table_branch: None,
}],
);

View file

@ -362,6 +362,9 @@ where
table_path: db.storage().dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
// SchemaApply uses the loose match, not BranchMerge's Phase-B
// confirmation — left None.
confirmed_version: None,
table_branch: entry.table_branch.clone(),
})
})

View file

@ -125,6 +125,9 @@ pub(super) async fn ensure_indices_for_branch(
table_path: full_path,
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
// EnsureIndices uses the loose match (index coverage is derived
// state), not BranchMerge's Phase-B confirmation — left None.
confirmed_version: None,
// Use active_branch (where commits actually land), NOT
// entry.table_branch (where the table currently lives).
// open_owned_dataset_for_branch_write forks a feature
@ -150,6 +153,9 @@ pub(super) async fn ensure_indices_for_branch(
table_path: full_path,
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
// EnsureIndices uses the loose match (index coverage is derived
// state), not BranchMerge's Phase-B confirmation — left None.
confirmed_version: None,
// Use active_branch (where commits actually land), NOT
// entry.table_branch (where the table currently lives).
// open_owned_dataset_for_branch_write forks a feature

View file

@ -5,7 +5,14 @@ const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR";
#[derive(Debug)]
enum CandidateTableState {
/// Adopt the source's table state via a pointer switch or a branch fork —
/// no data HEAD advance, so nothing to pin for recovery.
AdoptSourceState,
/// Adopt the source's state by applying a non-empty delta onto the target's
/// lineage (append new + upsert changed + delete removed). The delta is
/// pre-computed at classification so this candidate can be recovery-pinned:
/// its publish advances Lance HEAD before the manifest commit.
AdoptWithDelta(AdoptDelta),
RewriteMerged(StagedMergeResult),
}
@ -22,6 +29,38 @@ struct StagedMergeResult {
deleted_ids: Vec<String>,
}
/// Delta for an adopted-source merge (the fast-forward / target-owns path):
/// the new + changed rows to apply onto the target's base lineage, plus the ids
/// removed on source. Distinct from [`StagedMergeResult`] (the three-way path),
/// which also carries a `full_staged` table for validation — the adopt path
/// validates against the source snapshot directly (`candidate_dataset`), so it
/// needs no `full_staged` and never builds it.
///
/// TRANSITIONAL — fragment-adopt excision point. This whole row-level adopt
/// (`AdoptDelta`, [`compute_adopt_delta`], [`publish_adopted_delta`], and the
/// streaming append it drives) re-derives the source branch row-by-row because
/// today's Lance offers no fragment-level branch merge. When Lance ships
/// branch-merge/rebase ([#7263]) + UUID branch paths ([#7185]), a fast-forward
/// merge becomes a *fragment graft* — adopt the source table version's
/// fragments (and their already-built indexes) by reference, no rows scanned,
/// re-appended, upserted, or deleted. At that point this struct and its two
/// functions are removed wholesale; the merge collapses to ~one ref/metadata
/// op per table. Keep them self-contained so that excision stays a clean delete.
///
/// [#7263]: https://github.com/lance-format/lance/issues/7263
/// [#7185]: https://github.com/lance-format/lance/issues/7185
#[derive(Debug)]
struct AdoptDelta {
/// New-on-source rows → `stage_append` (a streaming `Operation::Append`, no
/// hash join). The connector's dominant case and the OOM fix: appending new
/// rows never buffers the whole delta in a full-outer hash join.
appends: Option<StagedTable>,
/// Changed-on-source rows → `stage_merge_insert` (a hash join bounded to the
/// genuinely-changed set, not the whole delta).
upserts: Option<StagedTable>,
deleted_ids: Vec<String>,
}
#[derive(Debug, Clone)]
struct CursorRow {
id: String,
@ -31,24 +70,48 @@ struct CursorRow {
row_index: usize,
}
impl CursorRow {
/// Compute this row's signature on demand. Used by the lazy adopt cursor,
/// where `signature` is left empty; the value is identical to the eager
/// `signature` field the three-way cursor populates.
fn compute_signature(&self) -> Result<String> {
row_signature(&self.batch, self.row_index)
}
}
struct OrderedTableCursor {
stream: Option<std::pin::Pin<Box<DatasetRecordBatchStream>>>,
dataset: Option<Dataset>,
current_batch: Option<RecordBatch>,
current_row: usize,
peeked: Option<CursorRow>,
/// When false, `next_row` leaves `CursorRow::signature` empty and callers
/// compute it on demand via `CursorRow::compute_signature`. The adopt path
/// uses this: new/deleted rows never need a signature comparison and would
/// otherwise eagerly stringify their embedding for nothing.
eager_signatures: bool,
}
impl OrderedTableCursor {
async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
Self::open(snapshot, table_key, true).await
}
/// Like `from_snapshot` but leaves row signatures uncomputed (callers use
/// `CursorRow::compute_signature` on demand). See `eager_signatures`.
async fn from_snapshot_lazy(snapshot: &Snapshot, table_key: &str) -> Result<Self> {
Self::open(snapshot, table_key, false).await
}
async fn open(snapshot: &Snapshot, table_key: &str, eager_signatures: bool) -> Result<Self> {
let dataset = match snapshot.entry(table_key) {
Some(_) => Some(snapshot.open(table_key).await?),
None => None,
};
Self::from_dataset(dataset).await
Self::from_dataset(dataset, eager_signatures).await
}
async fn from_dataset(dataset: Option<Dataset>) -> Result<Self> {
async fn from_dataset(dataset: Option<Dataset>, eager_signatures: bool) -> Result<Self> {
let stream = if let Some(ds) = &dataset {
Some(Box::pin(
crate::table_store::TableStore::scan_stream_with(
@ -71,6 +134,7 @@ impl OrderedTableCursor {
current_batch: None,
current_row: 0,
peeked: None,
eager_signatures,
})
}
@ -97,9 +161,14 @@ impl OrderedTableCursor {
let dataset = self.dataset.clone().ok_or_else(|| {
OmniError::manifest("cursor row missing source dataset".to_string())
})?;
let signature = if self.eager_signatures {
row_signature(batch, row_index)?
} else {
String::new()
};
return Ok(Some(CursorRow {
id: row_id_at(batch, row_index)?,
signature: row_signature(batch, row_index)?,
signature,
dataset,
batch: batch.clone(),
row_index,
@ -258,20 +327,30 @@ fn sanitize_table_key(table_key: &str) -> String {
}
/// Computes the delta between base and source for an adopted-source merge.
/// Returns the changed/new rows (for merge_insert) and deleted IDs (for delete).
async fn compute_source_delta(
/// Returns the new + changed rows and the ids deleted on source.
///
/// Unchanged rows are dropped: the adopt path validates against the source
/// snapshot directly (`candidate_dataset`), so no `full_staged` table is built
/// — saving the O(rows) temp write that `compute_source_delta` used to produce
/// and then discard.
///
/// TRANSITIONAL — removed by the fragment-adopt work (see [`AdoptDelta`]): a
/// fragment graft adopts the source's fragments by reference, so there is no
/// row-level delta to compute.
async fn compute_adopt_delta(
table_key: &str,
catalog: &Catalog,
base_snapshot: &Snapshot,
source_snapshot: &Snapshot,
) -> Result<Option<StagedMergeResult>> {
) -> Result<Option<AdoptDelta>> {
let schema = schema_for_table_key(catalog, table_key)?;
let mut full_writer =
StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?;
let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?;
let mut append_writer =
StagedTableWriter::new(&format!("{}_adopt_append", table_key), schema.clone())?;
let mut upsert_writer =
StagedTableWriter::new(&format!("{}_adopt_upsert", table_key), schema)?;
let mut deleted_ids: Vec<String> = Vec::new();
let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?;
let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?;
let mut base = OrderedTableCursor::from_snapshot_lazy(base_snapshot, table_key).await?;
let mut source = OrderedTableCursor::from_snapshot_lazy(source_snapshot, table_key).await?;
let mut needs_update = false;
@ -297,9 +376,6 @@ async fn compute_source_delta(
None
};
let base_sig = base_row.as_ref().map(|r| r.signature.as_str());
let source_sig = source_row.as_ref().map(|r| r.signature.as_str());
match (&base_row, &source_row) {
(Some(_), None) => {
// Deleted on source
@ -307,20 +383,21 @@ async fn compute_source_delta(
needs_update = true;
}
(None, Some(src)) => {
// New on source
full_writer.push_row(src).await?;
delta_writer.push_row(src).await?;
// New on source → append (streaming, no hash join). No signature
// needed — a new id is absent from base by construction.
append_writer.push_row(src).await?;
needs_update = true;
}
(Some(_), Some(src)) if source_sig != base_sig => {
// Changed on source
full_writer.push_row(src).await?;
delta_writer.push_row(src).await?;
needs_update = true;
}
(Some(base), Some(_)) => {
// Unchanged — write to full (for validation), skip delta
full_writer.push_row(base).await?;
(Some(base), Some(src)) => {
// Present on both — compute signatures lazily (the only case
// that needs them) to tell a changed row from an unchanged one.
// New/deleted rows above skip the embedding stringify entirely.
if src.compute_signature()? != base.compute_signature()? {
// Changed on source → upsert.
upsert_writer.push_row(src).await?;
needs_update = true;
}
// else unchanged — already on the target's base lineage; drop.
}
(None, None) => unreachable!(),
}
@ -330,15 +407,20 @@ async fn compute_source_delta(
return Ok(None);
}
let delta_staged = if delta_writer.row_count > 0 {
Some(delta_writer.finish().await?)
let appends = if append_writer.row_count > 0 {
Some(append_writer.finish().await?)
} else {
None
};
let upserts = if upsert_writer.row_count > 0 {
Some(upsert_writer.finish().await?)
} else {
None
};
Ok(Some(StagedMergeResult {
full_staged: full_writer.finish().await?,
delta_staged,
Ok(Some(AdoptDelta {
appends,
upserts,
deleted_ids,
}))
}
@ -651,10 +733,12 @@ async fn candidate_dataset(
) -> Result<Option<Dataset>> {
if let Some(candidate) = candidates.get(table_key) {
return match candidate {
CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) {
Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
None => Ok(None),
},
CandidateTableState::AdoptSourceState | CandidateTableState::AdoptWithDelta(_) => {
match source_snapshot.entry(table_key) {
Some(_) => Ok(Some(source_snapshot.open(table_key).await?)),
None => Ok(None),
}
}
CandidateTableState::RewriteMerged(staged) => {
Ok(Some(staged.full_staged.dataset.clone()))
}
@ -840,13 +924,62 @@ fn row_id_at(batch: &RecordBatch, row: usize) -> Result<String> {
Ok(ids.value(row).to_string())
}
async fn publish_adopted_source_state(
/// Classify a table whose target state equals base (the adopt / fast-forward
/// case). Returns [`CandidateTableState::AdoptWithDelta`] — with the delta
/// pre-computed so it can be recovery-pinned — when the adopt applies a
/// non-empty delta onto the target's lineage (a HEAD-advancing publish via
/// [`publish_adopted_delta`]); otherwise [`CandidateTableState::AdoptSourceState`]
/// (a pointer switch or fork, which does not advance the data HEAD).
///
/// The HEAD-advancing subcases mirror [`publish_adopted_source_state`]: source
/// on a branch with the target either on main or owning the table. Computing the
/// delta here (rather than inside the publish) is what closes the recovery gap —
/// the classifier knows whether the publish will move Lance HEAD.
async fn classify_adopt(
target_db: &Omnigraph,
catalog: &Catalog,
base_snapshot: &Snapshot,
source_snapshot: &Snapshot,
target_snapshot: &Snapshot,
table_key: &str,
) -> Result<CandidateTableState> {
let Some(source_entry) = source_snapshot.entry(table_key) else {
return Ok(CandidateTableState::AdoptSourceState);
};
let target_entry = target_snapshot.entry(table_key);
let target_active = target_db.active_branch().await;
let advances_head = match (
target_active.as_deref(),
source_entry.table_branch.as_deref(),
) {
// Source on a branch, target on main — delta applied onto main's lineage.
(None, Some(_)) => true,
// Both on branches, target owns this table — delta applied onto it.
(Some(target_branch), Some(_)) => {
target_entry.and_then(|e| e.table_branch.as_deref()) == Some(target_branch)
}
// Source on main (pointer switch) or target doesn't own (fork): no advance.
_ => false,
};
if !advances_head {
return Ok(CandidateTableState::AdoptSourceState);
}
match compute_adopt_delta(table_key, catalog, base_snapshot, source_snapshot).await? {
Some(delta) => Ok(CandidateTableState::AdoptWithDelta(delta)),
None => Ok(CandidateTableState::AdoptSourceState),
}
}
/// Adopt the source's table state without applying a row delta: a pointer
/// switch (source/target share lineage) or a branch fork. The HEAD-advancing
/// delta case is classified [`CandidateTableState::AdoptWithDelta`] and
/// published by [`publish_adopted_delta`], so reaching the branch-bearing arms
/// here means the delta was empty.
async fn publish_adopted_source_state(
target_db: &Omnigraph,
source_snapshot: &Snapshot,
target_snapshot: &Snapshot,
table_key: &str,
) -> Result<crate::db::SubTableUpdate> {
let source_entry = source_snapshot
.entry(table_key)
@ -875,44 +1008,31 @@ async fn publish_adopted_source_state(
row_count: source_entry.row_count,
version_metadata: source_entry.version_metadata.clone(),
}),
// Source on branch, target on main — apply delta to preserve version metadata
(None, Some(_source_branch)) => {
let delta =
compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?;
match delta {
Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await,
None => Ok(crate::db::SubTableUpdate {
table_key: table_key.to_string(),
table_version: target_entry
.map(|e| e.table_version)
.unwrap_or(source_entry.table_version),
table_branch: None,
row_count: source_entry.row_count,
version_metadata: target_entry
.map(|entry| entry.version_metadata.clone())
.unwrap_or_else(|| source_entry.version_metadata.clone()),
}),
}
}
// Source on branch, target on main, empty delta — adopt source's
// version by a pointer switch (the non-empty case is `AdoptWithDelta`).
(None, Some(_source_branch)) => Ok(crate::db::SubTableUpdate {
table_key: table_key.to_string(),
table_version: target_entry
.map(|e| e.table_version)
.unwrap_or(source_entry.table_version),
table_branch: None,
row_count: source_entry.row_count,
version_metadata: target_entry
.map(|entry| entry.version_metadata.clone())
.unwrap_or_else(|| source_entry.version_metadata.clone()),
}),
// Both on branches
(Some(target_branch), Some(source_branch)) => {
if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) {
// Target already owns this table — apply delta onto its lineage
let delta =
compute_source_delta(table_key, catalog, base_snapshot, source_snapshot)
.await?;
match delta {
Some(staged) => {
publish_rewritten_merge_table(target_db, table_key, &staged).await
}
None => Ok(crate::db::SubTableUpdate {
table_key: table_key.to_string(),
table_version: target_entry.unwrap().table_version,
table_branch: Some(target_branch.to_string()),
row_count: source_entry.row_count,
version_metadata: target_entry.unwrap().version_metadata.clone(),
}),
}
// Target already owns this table, empty delta — pointer switch
// onto its own lineage (the non-empty case is `AdoptWithDelta`).
Ok(crate::db::SubTableUpdate {
table_key: table_key.to_string(),
table_version: target_entry.unwrap().table_version,
table_branch: Some(target_branch.to_string()),
row_count: source_entry.row_count,
version_metadata: target_entry.unwrap().version_metadata.clone(),
})
} else {
// Target doesn't own this table yet — fork from source state.
// This creates the target branch on the sub-table dataset.
@ -1000,6 +1120,13 @@ async fn publish_rewritten_merge_table(
}
}
// Failpoint: crash after the Phase 1 merge_insert commit, before the delete.
// Models a partial Phase B on the three-way path — the merged constructive
// rows are on Lance HEAD but the delete has not committed and the
// achieved-version intent has not been recorded, so recovery must roll BACK.
// See tests/failpoints.rs::branch_merge_rewrite_partial_after_merge_rolls_back.
crate::failpoints::maybe_fail("branch_merge.rewrite_after_merge_pre_delete")?;
// Phase 2: delete removed rows via deletion vectors.
//
// INLINE-COMMIT RESIDUAL: lance-6.0.1 does not expose a public
@ -1023,6 +1150,14 @@ async fn publish_rewritten_merge_table(
current_ds = new_ds;
}
// Failpoint: crash after the Phase 2 delete commit, before the index build.
// Models a partial Phase B on the three-way path — constructive rows +
// deletes are on Lance HEAD but the achieved-version intent has not been
// recorded, so recovery must roll BACK (the index is reconciler-owned derived
// state, but the merge itself never reached its commit boundary). See
// tests/failpoints.rs::branch_merge_rewrite_partial_after_delete_rolls_back.
crate::failpoints::maybe_fail("branch_merge.rewrite_after_delete_pre_index")?;
// Phase 3: rebuild indices.
//
// `build_indices_on_dataset` uses `stage_create_btree_index` /
@ -1054,6 +1189,160 @@ async fn publish_rewritten_merge_table(
})
}
/// Scan a staged temp table and concat its non-empty batches into the single
/// batch that `stage_append` / `stage_merge_insert` consume. Returns `None` when
/// the table has no rows (both staged primitives reject an empty batch).
async fn scan_staged_combined(
target_db: &Omnigraph,
table: &StagedTable,
) -> Result<Option<RecordBatch>> {
crate::instrumentation::record_scan_staged_combined();
let snapshot = SnapshotHandle::new(table.dataset.clone());
let batches: Vec<RecordBatch> = target_db
.storage()
.scan_batches_for_rewrite(&snapshot)
.await?
.into_iter()
.filter(|batch| batch.num_rows() > 0)
.collect();
if batches.is_empty() {
return Ok(None);
}
let combined = if batches.len() == 1 {
batches.into_iter().next().unwrap()
} else {
let schema = batches[0].schema();
arrow_select::concat::concat_batches(&schema, &batches)
.map_err(|e| OmniError::Lance(e.to_string()))?
};
Ok(Some(combined))
}
/// Apply an [`AdoptDelta`] onto the target's base lineage (the fast-forward /
/// target-owns path). Kept separate from `publish_rewritten_merge_table` (the
/// three-way path) because the two paths diverge: commit 3 splits this Phase 1
/// into append (new) + merge_insert (changed), and commit 6 makes its index
/// coverage incremental — neither of which the three-way path takes.
///
/// `open_for_mutation(Merge)` opens the target's own table lineage (active
/// branch is the merge target after the caller's swap), so every write lands on
/// the target and survives source-branch deletion — GC-safe.
///
/// TRANSITIONAL — removed by the fragment-adopt work (see [`AdoptDelta`]): the
/// multi-commit append → upsert → delete publish here (the source of the
/// partial-Phase-B recovery window the sidecar confirmation guards) collapses to
/// a single fragment-graft commit per table, so this whole function goes away.
async fn publish_adopted_delta(
target_db: &Omnigraph,
table_key: &str,
delta: &AdoptDelta,
) -> Result<crate::db::SubTableUpdate> {
let (ds, full_path, table_branch) = target_db
.open_for_mutation(table_key, crate::db::MutationOpKind::Merge)
.await?;
let mut current_ds = ds;
// Phase 1a: append the NEW rows. `stage_append_stream` is a streaming
// `Operation::Append` — no hash join — so it never buffers the delta and
// cannot exhaust the DataFusion memory pool (the OOM fix). It streams the
// staged rows straight into the target (Lance rolls fragments at
// `max_rows_per_file`), so memory is bounded regardless of how many rows the
// connector appended — never the whole set in one batch. New ids are absent
// from base by construction (the ordered walk only classifies a row
// `(None, Some)` when base lacks it), so they never collide on `id`. Routed
// through the staged primitive so a failure between writing fragments and
// committing leaves no Lance-HEAD drift. `appends` is `Some` only when the
// staged table is non-empty (`compute_adopt_delta`).
if let Some(append_table) = &delta.appends {
let source = SnapshotHandle::new(append_table.dataset.clone());
let staged = target_db
.storage()
.stage_append_stream(&current_ds, &source, &[])
.await?;
current_ds = target_db
.storage()
.commit_staged(current_ds, staged)
.await?;
}
// Failpoint: crash after the Phase 1a append commit, before the upsert.
// Models a partial Phase B — appends are on Lance HEAD but the upserts/deletes
// have not committed and the achieved-version intent has not been recorded, so
// recovery must roll BACK (not publish the appends-only state). See
// tests/failpoints.rs::branch_merge_adopt_partial_after_append_rolls_back.
crate::failpoints::maybe_fail("branch_merge.adopt_after_append_pre_upsert")?;
// Phase 1b: upsert the CHANGED rows. The merge_insert hash join is now
// bounded to the genuinely-changed set, not the whole delta. It runs against
// the committed view that already includes the appends; the changed ids are
// disjoint from the appended ids (each id is classified into exactly one of
// new / changed / deleted / unchanged in the single ordered walk), so the
// join never collides with an appended row.
if let Some(upsert_table) = &delta.upserts {
if let Some(combined) = scan_staged_combined(target_db, upsert_table).await? {
let staged_merge = target_db
.storage()
.stage_merge_insert(
current_ds.clone(),
combined,
vec!["id".to_string()],
lance::dataset::WhenMatched::UpdateAll,
lance::dataset::WhenNotMatched::InsertAll,
)
.await?;
current_ds = target_db
.storage()
.commit_staged(current_ds, staged_merge)
.await?;
}
}
// Failpoint: crash after the Phase 1b upsert commit, before the delete.
// Models a partial Phase B — appends + upserts on Lance HEAD but the delete
// has not committed and the achieved-version intent has not been recorded, so
// recovery must roll BACK. See
// tests/failpoints.rs::branch_merge_adopt_partial_after_upsert_rolls_back.
crate::failpoints::maybe_fail("branch_merge.adopt_after_upsert_pre_delete")?;
// Phase 2: delete removed rows via deletion vectors (inline-commit residual,
// same as the three-way path until Lance ships a public two-phase delete).
if !delta.deleted_ids.is_empty() {
let escaped: Vec<String> = delta
.deleted_ids
.iter()
.map(|id| format!("'{}'", id.replace('\'', "''")))
.collect();
let filter = format!("id IN ({})", escaped.join(", "));
let (new_ds, _) = target_db
.storage_inline_residual()
.delete_where(&full_path, current_ds, &filter)
.await?;
current_ds = new_ds;
}
// Phase 4: index coverage is reconciler-owned on the adopt path. Unlike the
// three-way `RewriteMerged` path, this does NOT build indices inline: the
// appended/upserted rows are left uncovered (reads stay correct via
// brute-force — indexes are derived state, invariant 7) and
// `optimize` / `ensure_indices` folds them in. This keeps even the first
// merge into a freshly schema-applied (unindexed) table fast — no inline IVF
// retrain on the publish path — and is the row-level approximation of Layer
// 2's fragment-adopt, where the source branch's already-built indices carry
// over by reference. See docs/user/branching/merge.md.
let final_state = target_db
.storage()
.table_state(&full_path, &current_ds)
.await?;
Ok(crate::db::SubTableUpdate {
table_key: table_key.to_string(),
table_version: final_state.version,
table_branch,
row_count: final_state.row_count,
version_metadata: final_state.version_metadata,
})
}
impl Omnigraph {
pub async fn branch_merge(&self, source: &str, target: &str) -> Result<MergeOutcome> {
self.branch_merge_as(source, target, None).await
@ -1262,7 +1551,16 @@ impl Omnigraph {
continue;
}
if same_manifest_state(base_entry, target_entry) {
candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState);
let candidate = classify_adopt(
self,
&self.catalog(),
base_snapshot,
source_snapshot,
&target_snapshot,
table_key,
)
.await?;
candidates.insert(table_key.clone(), candidate);
continue;
}
@ -1290,31 +1588,24 @@ impl Omnigraph {
validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
// Recovery sidecar: protect the per-table commit_staged loop.
// Pin only `RewriteMerged` candidates because they always
// advance Lance HEAD through `publish_rewritten_merge_table`
// (which runs stage_merge_insert + delete_where + index
// rebuilds — multiple commit_staged calls per table; loose
// classification handles the multi-step drift).
// Pin `RewriteMerged` and `AdoptWithDelta` candidates — both advance
// Lance HEAD before the manifest publish (RewriteMerged via
// publish_rewritten_merge_table; AdoptWithDelta via publish_adopted_delta:
// stage_append + stage_merge_insert + delete_where + index — multiple
// commit_staged calls per table, which the loose classification handles
// as multi-step drift).
//
// `AdoptSourceState` candidates are NOT pinned: their publish
// path is `publish_adopted_source_state`, whose subcases mostly
// don't advance Lance HEAD (pure manifest pointer switch, or
// fork via `fork_dataset_from_entry_state` which only adds a
// Lance branch ref). If those subcases were pinned, recovery
// would classify them as NoMovement and the all-or-nothing
// decision would force a rollback that destroys legitimately-
// committed work on sibling RewriteMerged tables.
// (`publish_adopted_source_state`) is a pure pointer switch or a fork
// (`fork_dataset_from_entry_state` only adds a Lance branch ref), neither
// of which advances the data HEAD. Pinning them would classify as
// NoMovement and force an all-or-nothing rollback that destroys sibling
// tables' committed work.
//
// Residual: two `AdoptSourceState` subcases (when source has a
// table_branch AND the source delta is non-empty) internally
// call `publish_rewritten_merge_table` and DO advance HEAD.
// Those are not covered by this sidecar — if they fail mid-
// commit, the residual persists until the next ReadWrite open
// detects it via a subsequent ExpectedVersionMismatch from a
// later writer that touches the same table. Closing this gap
// requires pre-computing source deltas during candidate
// classification (a structural change to `CandidateTableState`)
// and is left as follow-up work.
// The former gap — adopt subcases that applied a non-empty delta advanced
// HEAD unpinned — is closed: `classify_adopt` pre-computes the delta, so a
// HEAD-advancing adopt is `AdoptWithDelta` (pinned here) and an empty-delta
// adopt stays `AdoptSourceState`.
// Acquire per-(table_key, target_branch) queues for every table
// touched by the merge plan. Sorted-order acquisition prevents
// lock-order inversion against concurrent multi-table writers.
@ -1334,6 +1625,7 @@ impl Omnigraph {
candidates.get(*table_key),
Some(CandidateTableState::RewriteMerged(_))
| Some(CandidateTableState::AdoptSourceState)
| Some(CandidateTableState::AdoptWithDelta(_))
)
})
.map(|table_key| (table_key.clone(), active_branch_for_keys.clone()))
@ -1347,7 +1639,9 @@ impl Omnigraph {
};
if !matches!(
candidate,
CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptSourceState
CandidateTableState::RewriteMerged(_)
| CandidateTableState::AdoptSourceState
| CandidateTableState::AdoptWithDelta(_)
) {
continue;
}
@ -1368,7 +1662,10 @@ impl Omnigraph {
.iter()
.filter_map(|table_key| {
let candidate = candidates.get(table_key)?;
if !matches!(candidate, CandidateTableState::RewriteMerged(_)) {
if !matches!(
candidate,
CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptWithDelta(_)
) {
return None;
}
let entry = target_snapshot.entry(table_key)?;
@ -1377,6 +1674,11 @@ impl Omnigraph {
table_path: self.storage().dataset_uri(&entry.table_path),
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
// Stamped after the whole per-table publish completes
// (Phase-B confirmation, just before the manifest publish).
// Until then `None` marks an unfinished publish that
// recovery must roll back, not roll forward.
confirmed_version: None,
// Use the merge target branch (where commits actually
// land), NOT entry.table_branch (where the table
// currently lives). publish_rewritten_merge_table calls
@ -1393,7 +1695,14 @@ impl Omnigraph {
})
})
.collect();
let recovery_handle = if recovery_pins.is_empty() {
// Keep the sidecar alongside its handle: after the per-table publish
// loop completes (Phase B), we re-write it with each table's confirmed
// version before the manifest publish, so recovery can tell a finished
// publish (roll forward) from a partial one (roll back).
let mut recovery: Option<(
crate::db::manifest::RecoverySidecar,
crate::db::manifest::RecoverySidecarHandle,
)> = if recovery_pins.is_empty() {
None
} else {
// Use the merge target branch directly, NOT a heuristic
@ -1418,14 +1727,13 @@ impl Omnigraph {
// this, future merges between the same pair lose
// already-up-to-date detection and merge-base correctness.
sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string());
Some(
crate::db::manifest::write_sidecar(
self.root_uri(),
self.storage_adapter(),
&sidecar,
)
.await?,
let handle = crate::db::manifest::write_sidecar(
self.root_uri(),
self.storage_adapter(),
&sidecar,
)
.await?;
Some((sidecar, handle))
};
let mut updates = Vec::new();
@ -1436,15 +1744,11 @@ impl Omnigraph {
};
let update = match candidate_state {
CandidateTableState::AdoptSourceState => {
publish_adopted_source_state(
self,
&self.catalog(),
base_snapshot,
source_snapshot,
&target_snapshot,
table_key,
)
.await?
publish_adopted_source_state(self, source_snapshot, &target_snapshot, table_key)
.await?
}
CandidateTableState::AdoptWithDelta(delta) => {
publish_adopted_delta(self, table_key, delta).await?
}
CandidateTableState::RewriteMerged(staged) => {
publish_rewritten_merge_table(self, table_key, staged).await?
@ -1456,10 +1760,33 @@ impl Omnigraph {
updates.push(update);
}
// Phase-B confirmation: every table's publish finished, so stamp the
// sidecar with each table's exact achieved version before the manifest
// publish. This is the commit point of the recovery WAL: a crash from
// here on rolls FORWARD to these versions, while a crash anywhere in the
// publish loop above left the sidecar unconfirmed and rolls BACK. The
// `updates` carry the real per-table final versions (multiple
// commit_staged calls per table, so not derivable from `post_commit_pin`
// alone). A failure here leaves the unconfirmed sidecar → roll back.
if let Some((sidecar, _)) = recovery.as_mut() {
let confirmed_versions: std::collections::HashMap<String, u64> = updates
.iter()
.map(|u| (u.table_key.clone(), u.table_version))
.collect();
crate::db::manifest::confirm_sidecar_phase_b(
self.root_uri(),
self.storage_adapter(),
sidecar,
&confirmed_versions,
)
.await?;
}
// Failpoint: pin the per-writer Phase B → Phase C residual for
// branch_merge. Lance HEAD has advanced on every touched table
// (publish_*) but the manifest publish below hasn't run. Used
// by `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`.
// (publish_*) AND the sidecar is confirmed, but the manifest publish
// below hasn't run — so recovery rolls FORWARD. Used by
// `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`.
crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
let manifest_version = if updates.is_empty() {
@ -1471,7 +1798,7 @@ impl Omnigraph {
// Recovery sidecar lifecycle: delete after manifest publish.
// Best-effort cleanup; the merge already landed durably so
// failing the user here is undesirable.
if let Some(handle) = recovery_handle {
if let Some((_, handle)) = recovery {
if let Err(err) =
crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await
{

View file

@ -712,6 +712,9 @@ impl StagedMutation {
table_path: entry.path.full_path.clone(),
expected_version: entry.expected_version,
post_commit_pin: entry.expected_version + 1,
// Mutation/Load use strict single-commit classification, not
// BranchMerge's Phase-B confirmation — left None.
confirmed_version: None,
table_branch: entry.path.table_branch.clone(),
});
}
@ -738,6 +741,7 @@ impl StagedMutation {
// can advance HEAD by more than one version (e.g.,
// when Lance internally compacts deletion vectors).
post_commit_pin: update.table_version,
confirmed_version: None,
table_branch: path.table_branch.clone(),
});
}

View file

@ -80,6 +80,95 @@ pub(crate) fn record_probe() {
let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed));
}
/// Per-operation staged-write counts, installed for a task via
/// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write
/// primitive an operation invokes — e.g. that an append-only fast-forward merge
/// routes new rows through `stage_append` and does **zero** `stage_merge_insert`
/// (the full-outer hash join). Counts the publish-path primitives only;
/// merge-staging temp tables use `append_or_create_batch`, not these.
#[derive(Clone, Default)]
pub struct MergeWriteProbes {
pub stage_append_calls: Arc<AtomicU64>,
pub stage_append_rows: Arc<AtomicU64>,
pub stage_merge_insert_calls: Arc<AtomicU64>,
pub stage_merge_insert_rows: Arc<AtomicU64>,
/// Inline vector-index (IVF) builds. The fast-forward adopt path defers
/// index coverage to the reconciler, so an adopt merge must do 0 of these.
pub create_vector_index_calls: Arc<AtomicU64>,
/// Times the merge materialized a staged delta into one in-memory batch
/// (`scan_staged_combined`). The append path streams instead, so an
/// append-only fast-forward merge must do 0 of these.
pub scan_staged_combined_calls: Arc<AtomicU64>,
}
impl MergeWriteProbes {
pub fn stage_append_calls(&self) -> u64 {
self.stage_append_calls.load(Ordering::Relaxed)
}
pub fn stage_append_rows(&self) -> u64 {
self.stage_append_rows.load(Ordering::Relaxed)
}
pub fn stage_merge_insert_calls(&self) -> u64 {
self.stage_merge_insert_calls.load(Ordering::Relaxed)
}
pub fn stage_merge_insert_rows(&self) -> u64 {
self.stage_merge_insert_rows.load(Ordering::Relaxed)
}
pub fn create_vector_index_calls(&self) -> u64 {
self.create_vector_index_calls.load(Ordering::Relaxed)
}
pub fn scan_staged_combined_calls(&self) -> u64 {
self.scan_staged_combined_calls.load(Ordering::Relaxed)
}
}
tokio::task_local! {
static MERGE_WRITE_PROBES: MergeWriteProbes;
}
/// Run `fut` with staged-write probes installed. Test-only entry point; nothing
/// in production sets the probes, so `record_stage_*` below are no-ops.
pub async fn with_merge_write_probes<F>(probes: MergeWriteProbes, fut: F) -> F::Output
where
F: std::future::Future,
{
MERGE_WRITE_PROBES.scope(probes, fut).await
}
/// Record one `stage_append` of `rows` rows against the active probes. No-op in
/// production (no probes installed).
pub(crate) fn record_stage_append(rows: u64) {
let _ = MERGE_WRITE_PROBES.try_with(|p| {
p.stage_append_calls.fetch_add(1, Ordering::Relaxed);
p.stage_append_rows.fetch_add(rows, Ordering::Relaxed);
});
}
/// Record one `stage_merge_insert` of `rows` rows against the active probes.
/// No-op in production (no probes installed).
pub(crate) fn record_stage_merge_insert(rows: u64) {
let _ = MERGE_WRITE_PROBES.try_with(|p| {
p.stage_merge_insert_calls.fetch_add(1, Ordering::Relaxed);
p.stage_merge_insert_rows.fetch_add(rows, Ordering::Relaxed);
});
}
/// Record one inline vector-index build against the active probes. No-op in
/// production (no probes installed).
pub(crate) fn record_create_vector_index() {
let _ = MERGE_WRITE_PROBES.try_with(|p| {
p.create_vector_index_calls.fetch_add(1, Ordering::Relaxed);
});
}
/// Record one `scan_staged_combined` materialization against the active probes.
/// No-op in production (no probes installed).
pub(crate) fn record_scan_staged_combined() {
let _ = MERGE_WRITE_PROBES.try_with(|p| {
p.scan_staged_combined_calls.fetch_add(1, Ordering::Relaxed);
});
}
/// Open a Lance dataset at `uri`, attaching `wrapper` (for IO counting) when
/// present. With no wrapper this is exactly `Dataset::open(uri)`. The wrapper is
/// set via `ObjectStoreParams` on the builder so the open itself is counted

View file

@ -353,6 +353,15 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
prior_stages: &[StagedHandle],
) -> Result<StagedHandle>;
/// Append `source`'s rows into `snapshot`'s table, streaming so the whole
/// row set is never materialized in memory (see `TableStore::stage_append_stream`).
async fn stage_append_stream(
&self,
snapshot: &SnapshotHandle,
source: &SnapshotHandle,
prior_stages: &[StagedHandle],
) -> Result<StagedHandle>;
async fn stage_merge_insert(
&self,
snapshot: SnapshotHandle,
@ -684,6 +693,18 @@ impl TableStorage for TableStore {
.map(StagedHandle::new)
}
async fn stage_append_stream(
&self,
snapshot: &SnapshotHandle,
source: &SnapshotHandle,
prior_stages: &[StagedHandle],
) -> Result<StagedHandle> {
let staged_writes = staged_handles_as_writes(prior_stages);
TableStore::stage_append_stream(self, snapshot.dataset(), source.dataset(), &staged_writes)
.await
.map(StagedHandle::new)
}
async fn stage_merge_insert(
&self,
snapshot: SnapshotHandle,

View file

@ -2,6 +2,7 @@ use arrow_array::{
Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array,
};
use arrow_schema::SchemaRef;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::TryStreamExt;
use lance::Dataset;
use lance::blob::BlobArrayBuilder;
@ -362,6 +363,29 @@ impl TableStore {
Ok(materialized)
}
/// Streaming, blob-aware sibling of [`Self::scan_batches_for_rewrite`].
/// Yields the dataset's rows lazily as a `SendableRecordBatchStream` so a
/// downstream writer (`stage_append_stream`) never materializes the whole
/// table in memory. Blob columns still need per-row rebuild, so those tables
/// fall back to the materialized path and are re-streamed from the `Vec`
/// (rare — only tables with a `Blob` property; bounded-memory blob streaming
/// is a follow-up). The non-blob path is a true lazy scan.
pub async fn scan_stream_for_rewrite(&self, ds: &Dataset) -> Result<SendableRecordBatchStream> {
let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob());
if has_blob_columns {
let arrow_schema: SchemaRef = Arc::new(ds.schema().into());
let batches = self.scan_batches_for_rewrite(ds).await?;
let reader = arrow_array::RecordBatchIterator::new(
batches.into_iter().map(Ok),
arrow_schema,
);
return Ok(lance_datafusion::utils::reader_to_stream(Box::new(reader)));
}
// Non-blob: a true lazy scan. `DatasetRecordBatchStream` converts to the
// `SendableRecordBatchStream` that `execute_uncommitted_stream` consumes.
Ok(Self::scan_stream(ds, None, None, None, false).await?.into())
}
pub(crate) async fn materialize_blob_batch(
ds: &Dataset,
batch: RecordBatch,
@ -919,6 +943,7 @@ impl TableStore {
"stage_append called with empty batch".to_string(),
));
}
let appended_rows = batch.num_rows() as u64;
let params = WriteParams {
mode: WriteMode::Append,
allow_external_blob_outside_bases: true,
@ -931,6 +956,9 @@ impl TableStore {
.execute_uncommitted(vec![batch])
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
// Record only after the staging write succeeds, so a failed write does
// not inflate the probe (matches `stage_append_stream`'s ordering).
crate::instrumentation::record_stage_append(appended_rows);
let mut new_fragments = match &transaction.operation {
Operation::Append { fragments } => fragments.clone(),
Operation::Overwrite { fragments, .. } => fragments.clone(),
@ -971,6 +999,71 @@ impl TableStore {
})
}
/// Streaming variant of [`Self::stage_append`]: appends the rows of `source`
/// into `ds` without materializing them in memory. It scans `source` lazily
/// (`scan_stream_for_rewrite`) and hands the stream to Lance's
/// `execute_uncommitted_stream`, which rolls fragments at `max_rows_per_file`
/// — bounded memory, one Append transaction. This is the substrate-blessed
/// bulk-append path (the same one LanceDB's `Table::add` uses). Identical
/// fragment-id / stable-row-id staging as `stage_append`.
///
/// TRANSITIONAL caller — its only caller is the row-level merge append
/// (`publish_adopted_delta`, see `AdoptDelta`), which the fragment-adopt work
/// (Lance #7263/#7185) removes: a fragment graft re-appends no rows. This
/// primitive and `scan_stream_for_rewrite` are then dead unless re-adopted as
/// a general bulk-append path (the `Table::add` shape makes that plausible).
pub async fn stage_append_stream(
&self,
ds: &Dataset,
source: &Dataset,
prior_stages: &[StagedWrite],
) -> Result<StagedWrite> {
let stream = self.scan_stream_for_rewrite(source).await?;
let params = WriteParams {
mode: WriteMode::Append,
allow_external_blob_outside_bases: true,
auto_cleanup: None,
skip_auto_cleanup: true,
..Default::default()
};
let transaction = InsertBuilder::new(Arc::new(ds.clone()))
.with_params(&params)
.execute_uncommitted_stream(stream)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let mut new_fragments = match &transaction.operation {
Operation::Append { fragments } => fragments.clone(),
Operation::Overwrite { fragments, .. } => fragments.clone(),
other => {
return Err(OmniError::manifest_internal(format!(
"stage_append_stream: unexpected Lance operation {:?}",
std::mem::discriminant(other)
)));
}
};
let appended_rows: u64 = new_fragments
.iter()
.filter_map(|f| f.physical_rows)
.map(|r| r as u64)
.sum();
crate::instrumentation::record_stage_append(appended_rows);
// Same commit-time fragment-id / row-id renumbering as `stage_append`.
let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64
+ 1
+ prior_stages_fragment_count(prior_stages);
assign_fragment_ids(&mut new_fragments, next_id_base);
if ds.manifest.uses_stable_row_ids() {
let prior_rows = prior_stages_row_count(prior_stages)?;
let start_row_id = ds.manifest.next_row_id + prior_rows;
assign_row_id_meta(&mut new_fragments, start_row_id)?;
}
Ok(StagedWrite {
transaction,
new_fragments,
removed_fragment_ids: Vec::new(),
})
}
/// Stage a merge_insert (upsert): write fragment files describing the
/// merge result, return the uncommitted transaction plus the new
/// fragments. The transaction's `Operation::Update` carries the
@ -1012,6 +1105,7 @@ impl TableStore {
"stage_merge_insert called with empty batch".to_string(),
));
}
let merged_rows = batch.num_rows() as u64;
// Precondition for the FirstSeen workaround below: every call path that
// reaches stage_merge_insert (load, MutationStaging::finalize,
@ -1052,6 +1146,9 @@ impl TableStore {
.execute_uncommitted(stream)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
// Record only after the staging write succeeds, so a failed write does
// not inflate the probe (matches `stage_append`/`stage_append_stream`).
crate::instrumentation::record_stage_merge_insert(merged_rows);
// Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } —
// `new_fragments` are the freshly inserted rows; `updated_fragments`
// are rewrites of existing fragments that include both retained and
@ -1541,8 +1638,11 @@ impl TableStore {
ds.create_index_builder(&[column], IndexType::Vector, &params)
.replace(true)
.await
.map(|_| ())
.map_err(|e| OmniError::Lance(e.to_string()))
.map_err(|e| OmniError::Lance(e.to_string()))?;
// Record only after the index build succeeds, so a failed build does not
// inflate the probe (matches the `stage_*` probes).
crate::instrumentation::record_create_vector_index();
Ok(())
}
pub async fn create_empty_dataset(dataset_uri: &str, schema: &SchemaRef) -> Result<Dataset> {

View file

@ -8,12 +8,15 @@ use omnigraph::db::Omnigraph;
use omnigraph::error::{ManifestErrorKind, OmniError};
use omnigraph::failpoints::ScopedFailPoint;
use omnigraph::loader::LoadMode;
use serial_test::serial;
use helpers::recovery::{
FollowUpMutation, RecoveryExpectation, TableExpectation, assert_post_recovery_invariants,
branch_head_commit_id, single_sidecar_operation_id,
};
use helpers::{MUTATION_QUERIES, mixed_params, mutate_main, version_main};
use helpers::{
MUTATION_QUERIES, collect_column_strings, mixed_params, mutate_main, read_table, version_main,
};
const SCHEMA_V1: &str = "node Person { name: String @key }\n";
const SCHEMA_V2_ADDED_TYPE: &str =
@ -3176,6 +3179,7 @@ async fn optimize_phase_b_failure_recovered_on_next_open() {
}
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_phase_b_failure_recovered_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
@ -3337,6 +3341,352 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() {
drop(db);
}
/// AdoptWithDelta recovery (the gap closure): a fast-forward merge — main has
/// NOT advanced since the branch forked, so the touched table is classified
/// `AdoptWithDelta`, not `RewriteMerged` — that fails after Phase B must still
/// recover on the next open. Before the recovery-pin closure this drifted
/// silently: the adopt path advanced Lance HEAD but was unpinned, so the sweep
/// found no sidecar and the merge was lost.
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_adopt_with_delta_phase_b_failure_recovered_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// Seed main, branch off, mutate ONLY the branch. main stays at base, so the
// merge is a fast-forward and Person classifies `AdoptWithDelta` (forked
// source, target == base, non-empty delta) — NOT `RewriteMerged`.
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
)
.await
.unwrap();
// main intentionally NOT mutated → fast-forward → AdoptWithDelta.
}
let pre_failure_version = {
let db = Omnigraph::open(&uri).await.unwrap();
version_main(&db).await.unwrap()
};
// Fail after the per-table publish loop, before commit_manifest_updates.
{
let db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
let err = db.branch_merge("feature", "main").await.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
// The gap closure: an AdoptWithDelta merge must persist a sidecar.
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"AdoptWithDelta merge must persist exactly one recovery sidecar (the closed gap)"
);
}
// Reopen → the recovery sweep rolls the AdoptWithDelta merge forward.
let db = Omnigraph::open(&uri).await.unwrap();
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted post-recovery; remaining: {remaining:?}"
);
}
let post_recovery_version = version_main(&db).await.unwrap();
assert!(
post_recovery_version > pre_failure_version,
"manifest must advance post-recovery; pre={pre_failure_version} post={post_recovery_version}"
);
let names = collect_column_strings(&read_table(&db, "node:Person").await, "name");
assert!(
names.contains(&"Bob".to_string()),
"recovered AdoptWithDelta merge must include Bob; have {names:?}"
);
drop(db);
}
/// Which branch-merge publish path a partial-Phase-B test exercises.
enum MergeScenario {
/// main stays at base → the touched table is `AdoptWithDelta`
/// (`publish_adopted_delta`: append → upsert → delete).
Adopt,
/// main advances past base → the touched table is `RewriteMerged`
/// (`publish_rewritten_merge_table`: merge_insert → delete → index).
Rewrite,
}
async fn sorted_person_names(db: &Omnigraph) -> Vec<String> {
let mut names = collect_column_strings(&read_table(db, "node:Person").await, "name");
names.sort();
names
}
/// THE recovery-atomicity regression gate. A branch merge whose per-table publish
/// is a multi-commit sequence (append → upsert → delete, or merge_insert → delete
/// → index) advances Lance HEAD step by step before the manifest publish. If the
/// process dies *mid*-sequence — after some commits but before the achieved-version
/// intent is recorded — recovery must roll the whole merge **back**, not publish
/// the partial and record the merge as complete.
///
/// The delta is deliberately MIXED — a fresh id (`bob`, append), a modified base id
/// (`carol`, upsert) and a removed base id (`dave`, delete) — so every partial
/// window leaves real work undone. Proof of rollback: after recovery the target is
/// back at its base name-set, and a *re-run* of the merge re-applies the full delta
/// (the partial was not silently recorded as "already merged").
///
/// RED before the fix: the loose `BranchMerge` classification rolls any
/// `lance_head > manifest_pinned` forward, so the partial is published (e.g. `bob`
/// present, `dave` kept) and the merge recorded — the first assert (back at base)
/// fails. GREEN after: `achieved_version == None` → `IncompletePhaseB` → roll back.
async fn assert_partial_merge_rolls_back(scenario: MergeScenario, failpoint: &str) {
use omnigraph::loader::load_jsonl;
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// Seed main {alice, carol, dave}; on `feature` add bob (append), bump carol
// (upsert), remove dave (delete). For Rewrite, also move main past base so the
// table classifies RewriteMerged instead of a fast-forward AdoptWithDelta.
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"alice\",\"age\":30}}\n\
{\"type\":\"Person\",\"data\":{\"name\":\"carol\",\"age\":50}}\n\
{\"type\":\"Person\",\"data\":{\"name\":\"dave\",\"age\":60}}\n",
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "bob")], &[("$age", 40)]),
)
.await
.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "carol")], &[("$age", 55)]),
)
.await
.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"remove_person",
&mixed_params(&[("$name", "dave")], &[]),
)
.await
.unwrap();
if matches!(scenario, MergeScenario::Rewrite) {
db.mutate(
"main",
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "alice")], &[("$age", 35)]),
)
.await
.unwrap();
}
}
// Crash mid-Phase-B at the injected window.
{
let db = Omnigraph::open(&uri).await.unwrap();
let _fp = ScopedFailPoint::new(failpoint, "return");
let err = db.branch_merge("feature", "main").await.unwrap_err();
assert!(
err.to_string().contains(failpoint),
"expected the injected failpoint {failpoint}, got: {err}"
);
}
// Reopen → the open-time sweep must ROLL BACK to base (the merge never reached
// its commit boundary), and a re-run must then apply the FULL delta.
{
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
sorted_person_names(&db).await,
vec!["alice", "carol", "dave"],
"partial Phase B at {failpoint} must roll back to base \
(no bob, dave kept, carol's upsert reverted); the merge must NOT be recorded",
);
db.branch_merge("feature", "main").await.unwrap();
assert_eq!(
sorted_person_names(&db).await,
vec!["alice", "bob", "carol"],
"re-merge after rollback must re-apply the full delta \
(bob added, dave removed) proof the partial was not silently recorded",
);
}
}
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_adopt_partial_after_append_rolls_back() {
assert_partial_merge_rolls_back(
MergeScenario::Adopt,
"branch_merge.adopt_after_append_pre_upsert",
)
.await;
}
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_adopt_partial_after_upsert_rolls_back() {
assert_partial_merge_rolls_back(
MergeScenario::Adopt,
"branch_merge.adopt_after_upsert_pre_delete",
)
.await;
}
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_rewrite_partial_after_merge_rolls_back() {
assert_partial_merge_rolls_back(
MergeScenario::Rewrite,
"branch_merge.rewrite_after_merge_pre_delete",
)
.await;
}
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_rewrite_partial_after_delete_rolls_back() {
assert_partial_merge_rolls_back(
MergeScenario::Rewrite,
"branch_merge.rewrite_after_delete_pre_index",
)
.await;
}
/// Backward-compat: a `BranchMerge` sidecar written by a *pre-confirmation*
/// binary (schema_version 1, no `confirmed_version`) must NOT be misread as a
/// partial Phase B and rolled back. A pre-upgrade crash in the Phase-B→C gap can
/// leave such a sidecar over a *completed* merge; rolling it back would silently
/// discard a finished merge with no operator signal — the regression greptile /
/// Cursor flagged.
///
/// We synthesize the pre-upgrade sidecar realistically: crash after Phase B (a
/// real sidecar + advanced Lance HEAD), then downgrade the on-disk JSON to the
/// v1 shape (`schema_version` = 1, strip every pin's `confirmed_version`) before
/// reopening — exactly what an old binary would have left.
///
/// RED before the versioning fix: a v1 sidecar with no `confirmed_version`
/// classifies `IncompletePhaseB` → rolls back → `bob` is discarded. GREEN after:
/// the version-aware classifier reads v1 as the old loose generation → rolls
/// forward → `bob` preserved.
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn pre_upgrade_v1_branch_merge_sidecar_rolls_forward_not_back() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// main {alice}; feature adds bob → a fast-forward AdoptWithDelta merge, which
// writes a recovery sidecar.
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"alice\",\"age\":30}}\n",
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "bob")], &[("$age", 40)]),
)
.await
.unwrap();
}
// Crash after Phase B (Lance HEAD advanced, manifest not published) → a real
// sidecar lands on disk.
{
let db = Omnigraph::open(&uri).await.unwrap();
let _fp = ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
db.branch_merge("feature", "main").await.unwrap_err();
}
// Downgrade the sidecar to the pre-confirmation v1 shape an old binary writes.
{
let recovery_dir = std::path::Path::new(&uri).join("__recovery");
let path = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(Result::ok)
.map(|e| e.path())
.find(|p| p.extension().is_some_and(|x| x == "json"))
.expect("a recovery sidecar must exist after the post-Phase-B crash");
let mut v: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
v["schema_version"] = serde_json::json!(1);
for table in v["tables"].as_array_mut().unwrap() {
table.as_object_mut().unwrap().remove("confirmed_version");
}
std::fs::write(&path, serde_json::to_string_pretty(&v).unwrap()).unwrap();
}
// Reopen → the pre-upgrade completed merge must roll FORWARD (bob kept), not
// be silently discarded.
{
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
sorted_person_names(&db).await,
vec!["alice", "bob"],
"a pre-confirmation (v1) BranchMerge sidecar over a completed merge must roll \
forward, not be misread as a partial and rolled back",
);
}
}
/// Branch-axis variant of the branch_merge recovery test: target is a
/// non-main branch. Catches the branch-specific commit-graph head bug
/// (D2) — without `CommitGraph::open_at_branch`, the recovery sweep
@ -3344,6 +3694,7 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() {
/// target, and future merges between the same pair would lose
/// already-up-to-date detection.
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
use omnigraph::loader::{LoadMode, load_jsonl};
@ -3468,6 +3819,7 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
/// keeps RewriteMerged tables on active_branch), the contract assertion
/// catches a regression that reverts to `entry.table_branch.clone()`.
#[tokio::test]
#[serial(branch_merge_phase_b)]
async fn branch_merge_sidecar_pins_table_branch_to_active_branch() {
use omnigraph::loader::{LoadMode, load_jsonl};

View file

@ -0,0 +1,213 @@
//! Fast-forward branch-merge cost + correctness.
//!
//! The data-path fix routes *new* rows of an adopted-source merge through
//! `stage_append` (a streaming `Operation::Append`) instead of lumping new +
//! changed rows into one `stage_merge_insert` (a full-outer hash join that
//! buffers the whole delta and exhausts the DataFusion memory pool on
//! embedding-bearing tables).
//!
//! The regression gate here is *structural*, not a brittle size threshold: it
//! asserts WHICH staged-write primitive the merge invokes, via the task-local
//! write probes in `omnigraph::instrumentation`. That is deterministic and
//! machine-independent — it cannot flake on a bigger memory pool.
// Wrapping `branch_merge` in `with_merge_write_probes` (a task-local scope)
// nests the already-deep merge future one layer deeper, overflowing rustc's
// default 128 layout-query depth. Bump it for this test crate.
#![recursion_limit = "512"]
mod helpers;
use omnigraph::db::{MergeOutcome, Omnigraph};
use omnigraph::instrumentation::{MergeWriteProbes, with_merge_write_probes};
use helpers::*;
/// Insert `n` brand-new persons (fresh ids) onto `branch`, forking the Person
/// table onto it. All rows are "new on source" — none collide with base ids.
async fn append_new_persons(db: &mut Omnigraph, branch: &str, n: usize) {
for i in 0..n {
mutate_branch(
db,
branch,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", &format!("ff_new_{i}"))], &[("$age", 30)]),
)
.await
.unwrap();
}
}
/// THE cost-budget gate. An append-only fast-forward merge must append the new
/// rows and run **zero** `stage_merge_insert` (the full-outer hash join that is
/// the OOM). RED today (new + changed are lumped into one `stage_merge_insert`);
/// GREEN once the adopt path splits new→`stage_append`, changed→`stage_merge_insert`.
#[tokio::test]
async fn append_only_fast_forward_merge_does_no_merge_insert() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let main = init_and_load(&dir).await;
main.branch_create("feature").await.unwrap();
let mut feature = Omnigraph::open(uri).await.unwrap();
append_new_persons(&mut feature, "feature", 5).await;
let probes = MergeWriteProbes::default();
let outcome =
with_merge_write_probes(probes.clone(), main.branch_merge("feature", "main"))
.await
.unwrap();
assert_eq!(outcome, MergeOutcome::FastForward);
assert_eq!(
probes.stage_merge_insert_calls(),
0,
"append-only fast-forward merge must do 0 stage_merge_insert (the OOM hash join); did {}",
probes.stage_merge_insert_calls(),
);
assert!(
probes.stage_append_calls() >= 1,
"append-only fast-forward merge must append the new rows via stage_append; did {}",
probes.stage_append_calls(),
);
assert_eq!(
probes.scan_staged_combined_calls(),
0,
"append-only merge must stream the append (stage_append_stream), not materialize the \
whole delta into one batch via scan_staged_combined; did {}",
probes.scan_staged_combined_calls(),
);
}
/// Functional correctness: a fast-forward merge of an append-only branch leaves
/// main equal to the source branch. Independent of the cost-budget gate.
#[tokio::test]
async fn fast_forward_merge_yields_source_state() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let main = init_and_load(&dir).await;
let base_count = count_rows(&main, "node:Person").await;
main.branch_create("feature").await.unwrap();
let mut feature = Omnigraph::open(uri).await.unwrap();
append_new_persons(&mut feature, "feature", 5).await;
let source_count = count_rows_branch(&feature, "feature", "node:Person").await;
assert_eq!(source_count, base_count + 5);
let outcome = main.branch_merge("feature", "main").await.unwrap();
assert_eq!(outcome, MergeOutcome::FastForward);
// main now equals source: the 5 new persons are present, the base rows kept.
assert_eq!(count_rows(&main, "node:Person").await, source_count);
let names = collect_column_strings(&read_table(&main, "node:Person").await, "name");
for i in 0..5 {
assert!(
names.contains(&format!("ff_new_{i}")),
"merged main missing new person ff_new_{i}; have {names:?}"
);
}
}
const VEC_SCHEMA: &str = "node Chunk {\n slug: String @key\n embedding: Vector(8) @index\n}\n";
/// Commit 6 behavior: the fast-forward adopt path does NOT build indices inline
/// — index coverage is reconciler-owned (`optimize`/`ensure_indices`). A merge
/// into a freshly-initialized (unindexed) vector table must perform **0** inline
/// vector-index (IVF) builds; reads stay correct via brute-force until
/// `optimize` covers the new rows. RED before the change (the publish path built
/// the IVF inline); GREEN after.
#[tokio::test]
async fn fast_forward_merge_defers_vector_index_to_reconciler() {
use omnigraph::loader::LoadMode;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
// Empty Chunk table → no vector index at init (KMeans can't train on 0 rows).
let main = Omnigraph::init(uri, VEC_SCHEMA).await.unwrap();
main.branch_create("feature").await.unwrap();
// Load embedding-bearing chunks onto the branch. The branch builds its own
// index here (outside the probe scope) — irrelevant to the merge's cost.
let mut rows = String::new();
for i in 0..24 {
let v: Vec<String> = (0..8).map(|j| format!("{}.0", (i + j) % 5)).collect();
rows.push_str(&format!(
"{{\"type\":\"Chunk\",\"data\":{{\"slug\":\"c{i}\",\"embedding\":[{}]}}}}\n",
v.join(",")
));
}
let feature = Omnigraph::open(uri).await.unwrap();
feature.load("feature", &rows, LoadMode::Merge).await.unwrap();
// Merge, counting inline vector-index builds the publish path performs.
let probes = MergeWriteProbes::default();
let outcome = with_merge_write_probes(probes.clone(), main.branch_merge("feature", "main"))
.await
.unwrap();
assert_eq!(outcome, MergeOutcome::FastForward);
assert_eq!(
probes.create_vector_index_calls(),
0,
"fast-forward adopt merge must defer vector-index coverage to the reconciler \
(0 inline IVF builds); did {}",
probes.create_vector_index_calls(),
);
// Correctness: the rows landed on main (reads brute-force until optimize).
assert_eq!(count_rows(&main, "node:Chunk").await, 24);
}
const BLOB_SCHEMA: &str = "node Document {\n title: String @key\n content: Blob?\n note: String?\n}\n";
const BLOB_INSERT: &str = r#"
query insert_doc($title: String, $content: Blob, $note: String) {
insert Document { title: $title, content: $content, note: $note }
}
"#;
/// A fast-forward merge of a branch with a Blob column exercises the blob
/// fallback in `scan_stream_for_rewrite` (materialize → re-stream) through the
/// streaming append. main is NOT mutated, so Document is `AdoptWithDelta` (the
/// adopt/append path), not `RewriteMerged`. The blob bytes must survive the
/// materialize → stream → append round-trip.
#[tokio::test]
async fn fast_forward_merge_streams_blob_columns() {
use omnigraph::loader::{LoadMode, load_jsonl};
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut main = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap();
load_jsonl(
&mut main,
"{\"type\":\"Document\",\"data\":{\"title\":\"seed\",\"content\":\"base64:U2VlZA==\",\"note\":\"base\"}}",
LoadMode::Overwrite,
)
.await
.unwrap();
main.branch_create("feature").await.unwrap();
// Only the branch is mutated → fast-forward → adopt/append path.
let mut feature = Omnigraph::open(uri).await.unwrap();
mutate_branch(
&mut feature,
"feature",
BLOB_INSERT,
"insert_doc",
&params(&[
("$title", "readme"),
("$content", "base64:SGVsbG8="),
("$note", "branch"),
]),
)
.await
.unwrap();
let outcome = main.branch_merge("feature", "main").await.unwrap();
assert_eq!(outcome, MergeOutcome::FastForward);
// The appended blob row's bytes survive the streaming append; the base row stays intact.
let readme = main.read_blob("Document", "readme", "content").await.unwrap();
assert_eq!(&readme.read().await.unwrap()[..], b"Hello");
let seed = main.read_blob("Document", "seed", "content").await.unwrap();
assert_eq!(&seed.read().await.unwrap()[..], b"Seed");
}

View file

@ -104,8 +104,10 @@ async fn recovery_refuses_unknown_schema_version_on_open() {
let _db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
drop(_db);
// A sidecar from a hypothetical future writer; the older binary must
// refuse to interpret it (resolved-decisions §3 in the design doc).
// A sidecar from a hypothetical future writer (version NEWER than this
// binary's max); the reader must refuse to interpret it — it cannot guess
// semantics a newer writer baked in. (Older versions are accepted and
// interpreted with their original semantics; see `parse_sidecar`.)
let sidecar_json = r#"{
"schema_version": 99,
"operation_id": "01H000000000000000000000ZZ",
@ -120,11 +122,11 @@ async fn recovery_refuses_unknown_schema_version_on_open() {
let err = Omnigraph::open(uri)
.await
.err()
.expect("expected open to fail because of unknown sidecar schema_version");
.expect("expected open to fail because of a future sidecar schema_version");
let msg = err.to_string();
assert!(
msg.contains("schema_version=99") && msg.contains("supports only schema_version=1"),
"expected SidecarSchemaError mentioning the version mismatch, got: {}",
msg.contains("schema_version=99") && msg.contains("newer than the maximum"),
"expected a future-version refusal, got: {}",
msg,
);
// Sidecar must still be on disk — we don't auto-delete unparseable files.