From 7168ee0ed0fbbbb8bb38a1e41add1b6c77d7e791 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 19 Jun 2026 00:15:06 +0200 Subject: [PATCH] fix(engine): stop branch-merge fast-forward OOM on embedding tables (#277) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(engine): stop branch-merge fast-forward OOM on embedding tables A branch→main fast-forward merge of a forked, embedding-bearing table re-derived the whole branch row-by-row: it lumped new + changed rows into one Lance `merge_insert`, i.e. a full-outer hash join over the entire delta that exhausts the DataFusion memory pool (8k rows × 3072-dim → `Resources exhausted: 188MB HashJoinInput, 100MB pool`), so the merge hung/failed instead of completing. Fix the data path on existing, substrate-supported primitives: - Adopt-with-delta split: new rows → `stage_append` (a streaming `Operation::Append`, no hash join), only genuinely-changed rows → a bounded `stage_merge_insert`, deletes inline. New `AdoptDelta` / `compute_adopt_delta` / `publish_adopted_delta` replace the combined `compute_source_delta` path; the three-way merge path is untouched. - Stream the append via `stage_append_stream` → `execute_uncommitted_stream` (the substrate-blessed bulk-append path), removing the `Vec`+`concat` full-delta materialization. Blob-aware via `scan_stream_for_rewrite`. Exposed on the sealed `TableStorage` trait. - Lazy row-signature: stop stringifying every row's embedding eagerly; compute the signature only for the `(Some,Some)` changed-candidate arm. - Index coverage is reconciler-owned: the adopt path no longer rebuilds vector/FTS indexes inline; `optimize`/`ensure_indices` folds the new rows in (reads stay correct via brute-force tail). Post-merge index-coverage contract documented in docs/user/branching/merge.md. - Recovery pin: new `CandidateTableState::AdoptWithDelta` is classified and pinned so the append's HEAD advance is sidecar-covered (invariant 5); the `BranchMerge` sidecar's loose classification covers the two-commit shape. The regression gate is structural, not a brittle size threshold: task-local write probes assert an append-only fast-forward merge does 0 `stage_merge_insert` (the OOM hash join), appends via `stage_append`, and streams (0 whole-delta materialization). Plus functional correctness, blob round-trip, index-defer, and a Phase-B failpoint recovery test. Residual: the classify-time staging round-trip is still O(N) in memory (architecturally required for the all-or-nothing multi-table publish); bounding it fully is the fragment-adopt follow-up. * test(engine): partial branch-merge Phase B must roll back (RED regression) A branch-merge per-table publish is a multi-commit sequence — adopt: append → upsert → delete; three-way: merge_insert → delete → index — each step advancing Lance HEAD before the single manifest publish. Add four failpoint sites at those windows and four regression tests (mixed delta: a fresh id, a modified base id, a removed base id) asserting that a crash mid-sequence rolls the whole merge BACK on the next open and a re-run re-applies the full delta. RED against current code: the loose `BranchMerge` classification rolls any `lance_head > manifest_pinned` forward, so the partial is published and the merge recorded — the rolled-back-to-base assertion fails with the partial state visible (e.g. bob appended, dave not deleted). The fix lands next. The failpoint sites are no-ops unless the `failpoints` feature activates them. * fix(engine): roll back partial branch-merge Phase B (recovery WAL confirmation) A branch-merge publishes each table with several Lance commits (adopt: append → upsert → delete; three-way: merge_insert → delete → index), then one manifest publish makes them atomic. Recovery classified `BranchMerge` loosely: any `lance_head > manifest_pinned` with a matching CAS pin rolled *forward* to the observed HEAD. So a crash mid-sequence published a partial delta (e.g. the append without its sibling upsert/delete) and recorded the merge as complete — silent data loss; a re-merge sees "already up to date" and never repairs it. Fix: make the recovery sidecar a two-phase WAL for `BranchMerge`. After the whole per-table publish loop completes, stamp each pin's `confirmed_version` with its exact achieved Lance version (a second sidecar write), then publish the manifest. Recovery now: - rolls FORWARD only to a pin's `confirmed_version` (set ⇒ Phase B finished); - rolls BACK (`TableClassification::IncompletePhaseB`) when the HEAD moved but no confirmation was recorded ⇒ a partial publish ⇒ all-or-nothing restore to the manifest pin, so a re-run re-applies the full delta. Scope: `BranchMerge` only. Other loose writers (`SchemaApply`, `EnsureIndices`, `Optimize`) keep the loose roll-forward — their drift is derived state (index coverage, compaction) a partial roll-forward never corrupts, so confirmation would be cost without benefit. This is the write-ahead intent record + idempotent roll-forward that the fast-forward-main commit model requires to be crash-atomic across N tables; version-recorded (not phase-count-derived), so it survives later changes to the per-table commit sequence. Regression tests (failpoints): four partial-window crashes — adopt after-append / after-upsert, three-way after-merge / after-delete — each with a mixed delta (new id, modified id, removed id) now roll the whole merge back; the existing complete-Phase-B tests still roll forward. * fix(engine): scope merge index docs to fast-forward; record append probe after write Two PR-review fixes: - docs(merge): the "a merge does not build indexes inline" note only holds for the fast-forward / adopt path (deferred to the reconciler). The three-way `Merged` path still rebuilds indexes inline in its publish, so a Merged-outcome merge of an embedding table pays the build up front. Scope the doc so a Merged-outcome user isn't surprised or led to skip a post-merge optimize. - `stage_append` recorded its instrumentation probe before the fallible `execute_uncommitted`, so a failed staging write left the call/row counters inflated — and diverged from `stage_append_stream`, which records after the transaction is built. Record after the write succeeds. * fix(engine): record stage_merge_insert / vector-index probes after write too The prior commit moved `stage_append`'s instrumentation probe to after the write, but left the two sibling write primitives with the identical ordering bug: `stage_merge_insert` recorded before `execute_uncommitted`, and `create_vector_index` before the index build. A failed write on either would inflate the probe counter. Move both to record only after the write succeeds, so all write-primitive probes share one rule (record-after-success) — closing the class rather than the single instance the review flagged. * docs(engine): mark the fragment-adopt excision boundary in the merge code Comment the transitional row-level merge code so a future fragment-adopt implementation (Lance branch-merge/rebase #7263 + UUID branch paths #7185) knows exactly what it deletes and what it keeps: - `AdoptDelta` / `compute_adopt_delta` / `publish_adopted_delta` — the row-level re-derivation; removed wholesale when a fast-forward merge becomes a fragment graft (adopt the source table version's fragments + indexes by reference). - `stage_append_stream` — its only caller is that merge append; dead with it unless re-adopted as a general bulk-append path. - `confirm_sidecar_phase_b` — the boundary marker: this SURVIVES. The recovery sidecar is the cross-table WAL a fast-forward-main commit still needs; only the within-table multi-commit reason for `IncompletePhaseB` narrows once each table is a single graft commit. Keep the sidecar; only simplify the classifier. Comments only; no behavior change. * test(engine): pre-upgrade v1 branch-merge sidecar must roll forward (RED) Phase-B confirmation made the recovery classifier strict for every BranchMerge sidecar — including ones written by a binary that predates confirmation. A pre-upgrade crash in the Phase-B→C gap can leave such a sidecar over a COMPLETED merge; the new classifier reads its absent confirmed_version as a partial and rolls it back, silently discarding the finished merge (greptile P1 / Cursor High). This regression test synthesizes that sidecar realistically: crash after Phase B (real sidecar + advanced Lance HEAD), downgrade the on-disk JSON to the pre-confirmation v1 shape (schema_version=1, strip confirmed_version), reopen. RED: the merge rolls back, `bob` is discarded (left ["alice"], want ["alice","bob"]). The versioning fix lands next. * fix(engine): version the recovery sidecar; read pre-confirmation merges as loose Phase-B confirmation changed how a BranchMerge sidecar's absent confirmed_version is interpreted (roll forward → roll back) without versioning the artifact, so the new classifier silently discarded completed pre-upgrade merges (greptile P1 / Cursor High). A capability flag would not fix the symmetric direction — keeping schema_version=1, an OLD binary reading a NEW sidecar sails through its already-shipped strict gate, ignores the unknown flag, and applies loose semantics to a new partial → the same data loss on downgrade. Use the versioning system instead. - Bump SIDECAR_SCHEMA_VERSION 1 → 2; add a fixed CONFIRMATION_SCHEMA_VERSION = 2 (the generation at which confirmation shipped — pinned, so a later v3 keeps v2 confirmation-aware). - Make the read gate version-aware (`parse_sidecar`): refuse only versions NEWER than this binary; accept and interpret older ones with their original semantics — no operator toil draining pre-upgrade sidecars. Rename `SidecarSchemaError.supported_version` → `max_supported_version` and reword. - Dispatch classification by version: the strict BranchMerge confirmation path is gated on `schema_version >= CONFIRMATION_SCHEMA_VERSION`; a v1 BranchMerge sidecar falls through to the existing loose roll-forward. Thread `sidecar.schema_version` from `process_sidecar`. This is bidirectionally safe: a new binary interprets v1 (loose) and v2 (strict) and refuses the future; an old binary's `!= 1` gate already refuses v2, so it never misreads a new sidecar. The flag was an additive-field pattern misapplied to a semantics change; versioning is the correct mechanism. Honest residual (any approach): an old *partial* sidecar still rolls forward — v1 carries no confirmation, so partialness is undetectable in it. The fix stops us from interpreting old sidecars under new rules; it can't retrofit information they never had. * fix(engine): harden recovery — mode resolver, loud divergence check, publish classified version Three correct-by-design fixes from the holistic review of the recovery path, all in recovery.rs (each closes a class, not an instance): A. Resolve the classification mode from `(kind, schema_version)` once, instead of a kind×version match accreting fall-through guards in `classify_table`. New `ClassificationMode { Strict, Loose, Confirmed }` + an exhaustive `SidecarKind::classification_mode` — adding a writer kind or version floor is now one arm in the resolver (the compiler forces it), not a guard threaded through the classifier. No behavior change; existing classify/decide tests are the guard. B. `confirm_sidecar_phase_b` now errors loudly when a pinned table has no achieved version in the publish `updates`, instead of silently skipping it (which left the pin unconfirmed → `IncompletePhaseB` → a silent rollback of a COMPLETE merge). Guards the implicit `pins ⊆ updates` invariant against a future divergence between the two filters (invariants 9/13). + a unit test. C. Recovery roll-forward publishes the version classification OBSERVED (`state.lance_head`), not a fresh HEAD re-read at publish time. For a Confirmed pin classify already validated `lance_head == confirmed_version`, so this publishes the recorded WAL intent by construction and closes the classify→publish re-derivation/TOCTOU for every writer (invariant 15). `push_table_update_at_head` → `push_table_update(target_version: Option)`: roll-forward pins the classified version; roll-back keeps `None` (publishes the restore commit it just made). In-scope behavior is preserved, so the existing roll-forward integration tests are the guard; the drift-hardening is correct-by-construction (deterministic mid-sweep drift injection isn't feasible — a sync failpoint can't do an async Lance write). --- crates/omnigraph/src/db/manifest.rs | 8 +- crates/omnigraph/src/db/manifest/recovery.rs | 493 +++++++++++---- crates/omnigraph/src/db/omnigraph/optimize.rs | 3 + .../src/db/omnigraph/schema_apply.rs | 3 + .../omnigraph/src/db/omnigraph/table_ops.rs | 6 + crates/omnigraph/src/exec/merge.rs | 559 ++++++++++++++---- crates/omnigraph/src/exec/staging.rs | 4 + crates/omnigraph/src/instrumentation.rs | 89 +++ crates/omnigraph/src/storage_layer.rs | 21 + crates/omnigraph/src/table_store.rs | 104 +++- crates/omnigraph/tests/failpoints.rs | 354 ++++++++++- crates/omnigraph/tests/merge_fast_forward.rs | 213 +++++++ crates/omnigraph/tests/recovery.rs | 12 +- docs/dev/writes.md | 16 +- docs/user/branching/merge.md | 19 + 15 files changed, 1670 insertions(+), 234 deletions(-) create mode 100644 crates/omnigraph/tests/merge_fast_forward.rs diff --git a/crates/omnigraph/src/db/manifest.rs b/crates/omnigraph/src/db/manifest.rs index ce91513..19f25a3 100644 --- a/crates/omnigraph/src/db/manifest.rs +++ b/crates/omnigraph/src/db/manifest.rs @@ -33,10 +33,10 @@ pub(crate) use namespace::open_table_head_for_write; use namespace::{branch_manifest_namespace, staged_table_namespace}; use publisher::{GraphNamespacePublisher, ManifestBatchPublisher}; pub(crate) use recovery::{ - RecoveryMode, RecoverySidecarHandle, SidecarKind, SidecarTablePin, SidecarTableRegistration, - SidecarTombstone, delete_sidecar, has_schema_apply_sidecar, heal_pending_sidecars_roll_forward, - list_sidecars, new_sidecar, recover_manifest_drift, schema_apply_serial_queue_key, - write_sidecar, + RecoveryMode, RecoverySidecar, RecoverySidecarHandle, SidecarKind, SidecarTablePin, + SidecarTableRegistration, SidecarTombstone, confirm_sidecar_phase_b, delete_sidecar, + has_schema_apply_sidecar, heal_pending_sidecars_roll_forward, list_sidecars, new_sidecar, + recover_manifest_drift, schema_apply_serial_queue_key, write_sidecar, }; pub use state::SubTableEntry; #[cfg(test)] diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 4b0f870..d21e0fd 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -62,10 +62,26 @@ pub(crate) const RECOVERY_ACTOR: &str = "omnigraph:recovery"; /// Subdirectory under the graph root holding sidecar files. pub(crate) const RECOVERY_DIR_NAME: &str = "__recovery"; -/// Current sidecar JSON shape version. Bumping this is a breaking change: -/// older binaries will refuse to interpret newer sidecars (intentional — -/// see [`SidecarSchemaError`]). -pub(crate) const SIDECAR_SCHEMA_VERSION: u32 = 1; +/// Max sidecar JSON shape/semantics version this binary writes and understands. +/// The reader accepts every version `<= ` this and refuses only versions ABOVE +/// it (an older binary cannot guess semantics a newer writer baked in — see +/// [`SidecarSchemaError`] and [`parse_sidecar`]). Bump this whenever a change +/// alters how an existing field is *interpreted* (not merely adds an optional +/// one), and add a fixed `*_SCHEMA_VERSION` floor like the one below so older +/// generations keep their original semantics. +/// +/// v1 → v2: Phase-B confirmation. A `BranchMerge` sidecar at v2 carries +/// `confirmed_version` and is classified strictly (unconfirmed ⇒ partial ⇒ roll +/// back); at v1 it predates confirmation and keeps the loose roll-forward. The +/// reader must distinguish the two, so this is a real version bump, not an +/// additive field. +pub(crate) const SIDECAR_SCHEMA_VERSION: u32 = 2; + +/// The version at which Phase-B confirmation shipped. A `BranchMerge` sidecar is +/// confirmation-aware (strict classification) iff `schema_version >=` this. +/// FIXED at 2 — NOT derived from [`SIDECAR_SCHEMA_VERSION`] — so a future bump to +/// v3+ still treats v2 sidecars as confirmation-aware. +pub(crate) const CONFIRMATION_SCHEMA_VERSION: u32 = 2; /// Selects which recovery actions are allowed in a sweep. /// @@ -115,6 +131,54 @@ pub(crate) enum SidecarKind { Optimize, } +/// Which recovery-classification semantics a sidecar's tables use. Resolved once +/// from `(writer_kind, schema_version)` — see [`SidecarKind::classification_mode`] +/// — so [`classify_table`] dispatches on the mode instead of re-deriving it from +/// a kind×version match. Adding a writer kind or a version floor is then one arm +/// in the resolver, not a guard threaded through `classify_table`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ClassificationMode { + /// Exactly one `commit_staged` per table (`Mutation`, `Load`): require + /// `lance_head == manifest_pinned + 1` and the pin to match. + Strict, + /// N ≥ 1 commits per table whose drift is content-preserving / derived + /// state (`SchemaApply`, `EnsureIndices`, `Optimize`, and pre-confirmation + /// `BranchMerge`): any `lance_head > manifest_pinned` rolls forward. + Loose, + /// Multi-commit publish of *distinct logical rows* with a recorded + /// `confirmed_version` (`BranchMerge` at `schema_version >= + /// CONFIRMATION_SCHEMA_VERSION`): roll forward ONLY to the confirmed + /// version; an unconfirmed moved HEAD is a partial publish and rolls back. + Confirmed, +} + +impl SidecarKind { + /// Resolve the classification mode for this writer at a given sidecar + /// `schema_version`. Exhaustive over `SidecarKind`, so adding a variant is a + /// compile error here until its recovery semantics are declared. + pub(crate) fn classification_mode(self, schema_version: u32) -> ClassificationMode { + match self { + SidecarKind::Mutation | SidecarKind::Load => ClassificationMode::Strict, + // BranchMerge gained two-phase confirmation at + // `CONFIRMATION_SCHEMA_VERSION`. A sidecar written before that + // carries no `confirmed_version` and must keep the prior loose + // roll-forward — classifying it strictly would misread a *completed* + // pre-upgrade merge as a partial and roll it back. (The read gate + // already refused any version newer than this binary.) + SidecarKind::BranchMerge => { + if schema_version >= CONFIRMATION_SCHEMA_VERSION { + ClassificationMode::Confirmed + } else { + ClassificationMode::Loose + } + } + SidecarKind::SchemaApply | SidecarKind::EnsureIndices | SidecarKind::Optimize => { + ClassificationMode::Loose + } + } + } +} + /// One table's contribution to a sidecar's intended commit. The classifier /// uses these to decide per-table state at recovery time. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -126,8 +190,22 @@ pub(crate) struct SidecarTablePin { /// Manifest-pinned version at writer start (CAS expectation). pub expected_version: u64, /// Lance HEAD that the writer's `commit_staged` would produce - /// (typically `expected_version + 1`). + /// (typically `expected_version + 1`). For multi-commit writers this is + /// only a *lower bound* — see `confirmed_version`. pub post_commit_pin: u64, + /// Phase-B confirmation: the exact Lance HEAD this table reached once the + /// writer's *entire* multi-commit publish for it finished, recorded by a + /// second sidecar write immediately before the manifest publish (Phase C). + /// `None` means Phase B did not complete (the writer crashed mid-publish), + /// so the on-disk drift is a *partial* commit and recovery must roll the + /// whole operation BACK rather than publish an incomplete state. Only the + /// `BranchMerge` writer records this today (its per-table publish is + /// append → upsert → delete, several HEAD advances that the manifest + /// publish makes atomic); other writers leave it `None` and keep their + /// existing loose roll-forward. Backward-compatible: absent on older + /// sidecars. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub confirmed_version: Option, /// Lance branch ref this table lives on (mirrors /// `SubTableEntry::table_branch`). Required for the recovery sweep /// to open the dataset at the correct ref — `Dataset::open(path)` @@ -218,25 +296,27 @@ pub(crate) struct RecoverySidecarHandle { pub(crate) sidecar_uri: String, } -/// Error returned when the sidecar's `schema_version` is unknown to this -/// binary. We refuse-and-error rather than read-and-warn: an old binary -/// cannot guess what semantics a newer writer baked into a future shape. -/// Operator action is required (typically: upgrade the binary). +/// Error returned when the sidecar's `schema_version` is NEWER than this binary +/// understands. We refuse-and-error rather than read-and-warn: an old binary +/// cannot guess what semantics a newer writer baked into a future shape. (Older +/// versions are accepted and interpreted with their original semantics — see +/// [`parse_sidecar`].) Operator action is required (typically: upgrade the +/// binary). #[derive(Debug)] pub(crate) struct SidecarSchemaError { pub sidecar_uri: String, pub found_version: u32, - pub supported_version: u32, + pub max_supported_version: u32, } impl std::fmt::Display for SidecarSchemaError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "recovery sidecar at '{}' declares schema_version={}, but this \ - binary supports only schema_version={}; refusing to interpret \ + "recovery sidecar at '{}' declares schema_version={}, newer than the \ + maximum this binary supports (schema_version={}); refusing to interpret \ — upgrade omnigraph or remove the sidecar with operator review", - self.sidecar_uri, self.found_version, self.supported_version, + self.sidecar_uri, self.found_version, self.max_supported_version, ) } } @@ -271,6 +351,14 @@ pub(crate) enum TableClassification { /// previous restore attempt or an external mutation. Roll back to /// the manifest pin. UnexpectedMultistep, + /// A confirmation-using writer (`BranchMerge`) advanced this table's HEAD + /// (`lance_head > manifest_pinned`) but the sidecar carries no + /// `confirmed_version` — its multi-commit publish crashed mid-flight, so + /// the drift is a *partial* commit (e.g. an append without its sibling + /// upsert/delete). Roll back to the manifest pin; the whole operation is + /// re-run from scratch. Distinct from `UnexpectedMultistep` so the audit + /// records a partial Phase B, not a foreign mutation. + IncompletePhaseB, /// `lance_head < manifest_pinned`. Should be impossible: the manifest /// pin can only advance after a successful Lance commit. Surface /// loudly and abort recovery. @@ -341,6 +429,58 @@ pub(crate) async fn write_sidecar( }) } +/// Phase-B confirmation: stamp each pin with the exact Lance HEAD its publish +/// reached, then re-write the sidecar in place (same object). Called once, after +/// the writer's whole multi-commit publish completed and before the manifest +/// publish (Phase C). Recovery then rolls forward ONLY to these confirmed +/// versions; a sidecar still missing them is a partial Phase B that rolls back. +/// +/// Overwriting the same object is atomic (same contract as [`write_sidecar`]): +/// a torn rewrite is never observed, so recovery reads either the pre-confirm +/// sidecar (→ roll back, safe) or the confirmed one (→ roll forward). A failure +/// here leaves the pre-confirm sidecar, so the operation rolls back — correct. +/// +/// SURVIVES the fragment-adopt work (unlike the row-level merge it currently +/// serves — see `AdoptDelta` in `exec/merge.rs`). The recovery sidecar is the +/// cross-table write-ahead log that makes a fast-forward-main commit +/// all-or-nothing across N tables, which a fragment graft still needs. What +/// narrows is the *within-table* reason for confirmation: once each table's +/// merge is a single graft commit, the multi-step partial window shrinks to one +/// commit, so the `BranchMerge` arm of `classify_table` could fold back into the +/// strict single-commit path and `IncompletePhaseB` retire. Do NOT delete this +/// with the row path — keep the sidecar; only simplify the classifier. +pub(crate) async fn confirm_sidecar_phase_b( + root_uri: &str, + storage: &dyn StorageAdapter, + sidecar: &mut RecoverySidecar, + confirmed_versions: &HashMap, +) -> Result<()> { + // Failpoint: models a storage failure on the confirmation write — the + // pre-confirm sidecar stays on disk, so recovery rolls the operation back. + crate::failpoints::maybe_fail("recovery.sidecar_confirm")?; + for pin in &mut sidecar.tables { + // Every pinned table MUST have an achieved version. A miss means the + // pin set and the publish `updates` diverged — fail loudly at the + // producer rather than leave the pin unconfirmed, which recovery would + // read as a partial Phase B and silently roll the whole (complete) merge + // back. Today the two are kept in lockstep by construction; this guards + // the invariant against a future edit to either filter. + let version = confirmed_versions.get(&pin.table_key).ok_or_else(|| { + OmniError::manifest_internal(format!( + "confirm_sidecar_phase_b: no achieved version for pinned table '{}' \ + (pins and publish updates diverged)", + pin.table_key + )) + })?; + pin.confirmed_version = Some(*version); + } + let uri = sidecar_uri(root_uri, &sidecar.operation_id); + let json = serde_json::to_string_pretty(sidecar).map_err(|err| { + OmniError::manifest_internal(format!("failed to serialize recovery sidecar: {}", err)) + })?; + storage.write_text(&uri, &json).await +} + /// Delete a sidecar after Phase C succeeded. Idempotent (safe to retry). pub(crate) async fn delete_sidecar( handle: &RecoverySidecarHandle, @@ -408,11 +548,15 @@ pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result SIDECAR_SCHEMA_VERSION { return Err(SidecarSchemaError { sidecar_uri: sidecar_uri.to_string(), found_version: peek.schema_version, - supported_version: SIDECAR_SCHEMA_VERSION, + max_supported_version: SIDECAR_SCHEMA_VERSION, } .into()); } @@ -427,26 +571,38 @@ pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result manifest_pinned` is ambiguous — it may be a *complete* +/// publish or a *partial* one crashed mid-sequence. The writer resolves +/// the ambiguity by recording the exact achieved version +/// (`confirmed_version`) only after the whole publish finished. So roll +/// forward ONLY to that confirmed version; a missing confirmation is a +/// partial commit (`IncompletePhaseB`) and rolls back. This is the safe +/// form of the loose match for writers where a partial would publish an +/// incomplete delta. /// - **Strict** (`Mutation`, `Load`): exactly one `commit_staged` per /// table, so `lance_head == manifest_pinned + 1` AND /// `post_commit_pin == lance_head` is required. -/// - **Loose** (`SchemaApply`, `EnsureIndices`, `BranchMerge`, -/// `Optimize`): the writer advances the Lance HEAD by N ≥ 1 commits -/// per table (one per index built + one for the overwrite, etc.; -/// merge tables run merge_insert + delete_where + index rebuilds; -/// `Optimize` runs `compact_files`, which commits reserve-fragments + -/// rewrite) and the exact N is hard to compute at sidecar-write time. -/// The loose match accepts +/// - **Loose** (`SchemaApply`, `EnsureIndices`, `Optimize`): the writer +/// advances the Lance HEAD by N ≥ 1 commits per table (one per index +/// built + one for the overwrite, etc.; `Optimize` runs `compact_files`, +/// which commits reserve-fragments + rewrite) and the exact N is hard to +/// compute at sidecar-write time. The loose match accepts /// any `lance_head > manifest_pinned` as `RolledPastExpected` when /// `pin.expected_version == manifest_pinned` (the writer's CAS -/// target matches what the manifest currently shows). The risk this -/// admits — an external agent advancing HEAD between sidecar write -/// and recovery — is out of scope for the single-coordinator model. +/// target matches what the manifest currently shows). This is safe for +/// these writers because their drift is derived state (index coverage, +/// compaction) the reconciler reproduces — a partial roll-forward loses +/// no logical rows. The risk it admits — an external agent advancing HEAD +/// between sidecar write and recovery — is out of scope for the +/// single-coordinator model. pub(crate) fn classify_table( pin: &SidecarTablePin, lance_head: u64, manifest_pinned: u64, kind: SidecarKind, + schema_version: u32, ) -> TableClassification { use TableClassification::*; if lance_head < manifest_pinned { @@ -457,27 +613,49 @@ pub(crate) fn classify_table( if lance_head == manifest_pinned { return NoMovement; } - // lance_head > manifest_pinned - let strict = matches!(kind, SidecarKind::Mutation | SidecarKind::Load); - if strict { - if lance_head == manifest_pinned + 1 { - if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head { - RolledPastExpected - } else { - UnexpectedAtP1 + // lance_head > manifest_pinned. The "which semantics" decision is resolved + // once from (kind, schema_version); dispatch on it. + match kind.classification_mode(schema_version) { + ClassificationMode::Confirmed => { + // Two-phase confirmation: roll forward only to the exact version the + // writer recorded after its whole multi-commit publish completed. No + // confirmation ⇒ the publish crashed mid-sequence ⇒ partial ⇒ roll + // back. A confirmation that doesn't match the observed HEAD means a + // foreign writer advanced the table — don't roll a surprise forward. + match pin.confirmed_version { + Some(confirmed) + if lance_head == confirmed && pin.expected_version == manifest_pinned => + { + RolledPastExpected + } + Some(_) => UnexpectedMultistep, + None => IncompletePhaseB, } - } else { - // lance_head > manifest_pinned + 1 - UnexpectedMultistep } - } else { - // Loose match for multi-commit writers (SchemaApply, EnsureIndices). - if pin.expected_version == manifest_pinned { - RolledPastExpected - } else if lance_head == manifest_pinned + 1 { - UnexpectedAtP1 - } else { - UnexpectedMultistep + ClassificationMode::Strict => { + if lance_head == manifest_pinned + 1 { + if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head { + RolledPastExpected + } else { + UnexpectedAtP1 + } + } else { + // lance_head > manifest_pinned + 1 + UnexpectedMultistep + } + } + ClassificationMode::Loose => { + // Multi-commit writers whose drift is content-preserving / derived + // state (and pre-confirmation BranchMerge sidecars): any + // `lance_head > manifest_pinned` rolls forward when the CAS target + // matches what the manifest currently shows. + if pin.expected_version == manifest_pinned { + RolledPastExpected + } else if lance_head == manifest_pinned + 1 { + UnexpectedAtP1 + } else { + UnexpectedMultistep + } } } } @@ -496,7 +674,7 @@ pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision } if classifications .iter() - .any(|c| matches!(c, NoMovement | UnexpectedAtP1 | UnexpectedMultistep)) + .any(|c| matches!(c, NoMovement | UnexpectedAtP1 | UnexpectedMultistep | IncompletePhaseB)) { return RollBack; } @@ -891,7 +1069,13 @@ async fn process_sidecar( .map(|e| e.table_version) .unwrap_or(0); states.push(ClassifiedTable { - classification: classify_table(pin, lance_head, manifest_pinned, sidecar.writer_kind), + classification: classify_table( + pin, + lance_head, + manifest_pinned, + sidecar.writer_kind, + sidecar.schema_version, + ), manifest_pinned, lance_head, }); @@ -1028,7 +1212,7 @@ async fn process_sidecar( Phase C did not land)" ); let (new_manifest_version, published_versions) = - roll_forward_all(root_uri, sidecar, snapshot).await?; + roll_forward_all(root_uri, sidecar, &states, snapshot).await?; // `to_version` records the ACTUAL Lance HEAD published for // each table (not pin.post_commit_pin, which is a lower bound // for loose-match writers like SchemaApply / EnsureIndices / @@ -1112,6 +1296,7 @@ async fn roll_back_sidecar( TableClassification::RolledPastExpected | TableClassification::UnexpectedAtP1 | TableClassification::UnexpectedMultistep + | TableClassification::IncompletePhaseB ) { restore_table_to_version( &pin.table_path, @@ -1119,14 +1304,17 @@ async fn roll_back_sidecar( state.manifest_pinned, ) .await?; - // Publish the post-restore HEAD, CAS against the current (unmoved) - // manifest pin — the same helper roll-forward uses. - push_table_update_at_head( + // Publish the post-restore HEAD (the restore commit we just made), + // CAS against the current (unmoved) manifest pin — the same helper + // roll-forward uses. `None` target: there is no prior observation to + // pin to; the version to publish is the HEAD the restore produced. + push_table_update( root_uri, &pin.table_key, &pin.table_path, pin.table_branch.as_deref(), state.manifest_pinned, + None, &mut updates, &mut expected, ) @@ -1227,6 +1415,7 @@ async fn record_audit_recovery_rollforward( async fn roll_forward_all( root_uri: &str, sidecar: &RecoverySidecar, + states: &[ClassifiedTable], snapshot: &Snapshot, ) -> Result<(u64, HashMap)> { let total_changes = @@ -1236,22 +1425,25 @@ async fn roll_forward_all( let mut published_versions: HashMap = HashMap::with_capacity(sidecar.tables.len() + sidecar.additional_registrations.len()); - for pin in &sidecar.tables { - // Publish to the table's CURRENT Lance HEAD on the pin's branch (not the - // sidecar's `post_commit_pin`, a lower bound for loose-match writers that - // run multiple commit_staged calls per table). CAS against the pin's - // pre-write `expected_version`. - let head_version = push_table_update_at_head( + for (pin, state) in sidecar.tables.iter().zip(states.iter()) { + // Publish the version classification OBSERVED (`state.lance_head`), not a + // fresh HEAD re-read. For a `Confirmed` pin classify already validated + // `lance_head == confirmed_version`, so this publishes the recorded WAL + // intent by construction; for loose/strict pins it's the multi-commit + // HEAD classify saw. Single observation, no classify→publish TOCTOU. CAS + // against the pin's pre-write `expected_version`. + let published = push_table_update( root_uri, &pin.table_key, &pin.table_path, pin.table_branch.as_deref(), pin.expected_version, + Some(state.lance_head), &mut updates, &mut expected, ) .await?; - published_versions.insert(pin.table_key.clone(), head_version); + published_versions.insert(pin.table_key.clone(), published); } // SchemaApply-only: register added tables (and renamed targets) and @@ -1351,45 +1543,61 @@ async fn roll_forward_all( /// version the table was just restored to). The HEAD is read AFTER any restore /// in the same single-threaded sweep, so no concurrent writer can have advanced /// it. -async fn push_table_update_at_head( +/// Stage a manifest `Update` for one table. +/// +/// `target_version` selects WHICH Lance version's state to publish: +/// - `Some(v)` — pin the dataset at version `v` and publish it. Roll-FORWARD +/// passes the version classification observed (and, for a `Confirmed` pin, +/// validated equals `confirmed_version`), so recovery publishes the version it +/// *decided* on rather than re-reading a HEAD a concurrent writer may have +/// advanced since classification — one observation, used for both the decision +/// and the publish (invariant 15). +/// - `None` — publish the dataset's current HEAD. Roll-BACK uses this: it just +/// created the restore commit, so HEAD *is* the version to publish. +async fn push_table_update( root_uri: &str, table_key: &str, table_path: &str, branch: Option<&str>, expected_version: u64, + target_version: Option, updates: &mut Vec, expected: &mut HashMap, ) -> Result { - let head_ds = Dataset::open(table_path) + let ds = Dataset::open(table_path) .await .map_err(|e| OmniError::Lance(e.to_string()))?; - let head_ds = match branch { - Some(b) if b != "main" => head_ds + let ds = match branch { + Some(b) if b != "main" => ds .checkout_branch(b) .await .map_err(|e| OmniError::Lance(e.to_string()))?, - _ => head_ds, + _ => ds, }; - let head_version = head_ds.version().version; - let row_count = head_ds + let ds = match target_version { + Some(v) => ds + .checkout_version(v) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?, + None => ds, + }; + let published_version = ds.version().version; + let row_count = ds .count_rows(None) .await .map_err(|e| OmniError::Lance(e.to_string()))? as u64; let table_relative_path = super::table_path_for_table_key(table_key)?; - let version_metadata = super::metadata::TableVersionMetadata::from_dataset( - root_uri, - &table_relative_path, - &head_ds, - )?; + let version_metadata = + super::metadata::TableVersionMetadata::from_dataset(root_uri, &table_relative_path, &ds)?; updates.push(ManifestChange::Update(SubTableUpdate { table_key: table_key.to_string(), - table_version: head_version, + table_version: published_version, table_branch: branch.map(str::to_string), row_count, version_metadata, })); expected.insert(table_key.to_string(), expected_version); - Ok(head_version) + Ok(published_version) } /// Append the audit row describing this recovery action. @@ -1573,6 +1781,7 @@ mod tests { table_path: table_path.to_string(), expected_version: expected, post_commit_pin: post, + confirmed_version: None, table_branch: None, } } @@ -1597,30 +1806,39 @@ mod tests { } #[test] - fn parse_sidecar_refuses_unknown_schema_version() { - let body = r#"{ - "schema_version": 99, - "operation_id": "01H000000000000000000000XX", - "started_at": "0", - "branch": null, - "actor_id": null, - "writer_kind": "Mutation", - "tables": [] - }"#; - let err = parse_sidecar("file:///tmp/__recovery/x.json", body).unwrap_err(); + fn parse_sidecar_refuses_future_but_accepts_older_schema_version() { + let body = |version: u32| { + format!( + r#"{{ + "schema_version": {version}, + "operation_id": "01H000000000000000000000XX", + "started_at": "0", + "branch": null, + "actor_id": null, + "writer_kind": "BranchMerge", + "tables": [] + }}"# + ) + }; + // A version NEWER than this binary's max → refuse (can't guess the future). + let err = parse_sidecar("file:///tmp/__recovery/x.json", &body(99)).unwrap_err(); let msg = err.to_string(); assert!( - msg.contains("schema_version=99") && msg.contains("supports only schema_version=1"), - "expected SidecarSchemaError mentioning the version mismatch, got: {}", - msg, + msg.contains("schema_version=99") && msg.contains("newer than the maximum"), + "expected a future-version refusal, got: {msg}", ); + // An OLDER version (pre-confirmation v1) → accept and interpret with its + // original semantics; never refuse a version we were built to understand. + let parsed = parse_sidecar("file:///tmp/__recovery/x.json", &body(1)) + .expect("a v1 (older) sidecar must parse, not be refused"); + assert_eq!(parsed.schema_version, 1); } #[test] fn classify_no_movement_when_head_equals_pinned() { let pin = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 5, 5, SidecarKind::Mutation), + classify_table(&pin, 5, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION), TableClassification::NoMovement, ); } @@ -1629,7 +1847,7 @@ mod tests { fn classify_rolled_past_expected_when_sidecar_matches_strict() { let pin = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 6, 5, SidecarKind::Mutation), + classify_table(&pin, 6, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION), TableClassification::RolledPastExpected, ); } @@ -1639,7 +1857,7 @@ mod tests { // Same +1 drift but post_commit_pin says it should be 7, not 6. let pin = make_pin("node:Person", "irrelevant", 5, 7); assert_eq!( - classify_table(&pin, 6, 5, SidecarKind::Mutation), + classify_table(&pin, 6, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION), TableClassification::UnexpectedAtP1, ); } @@ -1648,7 +1866,7 @@ mod tests { fn classify_unexpected_multistep_when_head_jumped_more_than_one_strict() { let pin = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 8, 5, SidecarKind::Mutation), + classify_table(&pin, 8, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION), TableClassification::UnexpectedMultistep, ); } @@ -1657,7 +1875,7 @@ mod tests { fn classify_invariant_violation_when_head_below_pinned() { let pin = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 3, 5, SidecarKind::Mutation), + classify_table(&pin, 3, 5, SidecarKind::Mutation, SIDECAR_SCHEMA_VERSION), TableClassification::InvariantViolation { observed: 3 }, ); } @@ -1673,7 +1891,7 @@ mod tests { // built two indices). Strict would say UnexpectedMultistep; loose // accepts it as RolledPastExpected. assert_eq!( - classify_table(&pin, 8, 5, SidecarKind::SchemaApply), + classify_table(&pin, 8, 5, SidecarKind::SchemaApply, SIDECAR_SCHEMA_VERSION), TableClassification::RolledPastExpected, ); } @@ -1682,7 +1900,7 @@ mod tests { fn classify_loose_match_accepts_multi_commit_drift_for_ensure_indices() { let pin = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 9, 5, SidecarKind::EnsureIndices), + classify_table(&pin, 9, 5, SidecarKind::EnsureIndices, SIDECAR_SCHEMA_VERSION), TableClassification::RolledPastExpected, ); } @@ -1691,7 +1909,7 @@ mod tests { fn classify_loose_match_no_movement_unchanged() { let pin = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 5, 5, SidecarKind::SchemaApply), + classify_table(&pin, 5, 5, SidecarKind::SchemaApply, SIDECAR_SCHEMA_VERSION), TableClassification::NoMovement, ); } @@ -1700,31 +1918,65 @@ mod tests { fn classify_loose_match_invariant_violation_unchanged() { let pin = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 3, 5, SidecarKind::SchemaApply), + classify_table(&pin, 3, 5, SidecarKind::SchemaApply, SIDECAR_SCHEMA_VERSION), TableClassification::InvariantViolation { observed: 3 }, ); } - /// BranchMerge must be loose-matched, not strict: while the strict - /// classifier expects exactly one `commit_staged` per table, - /// `publish_rewritten_merge_table` runs multiple per table - /// (merge_insert + delete_where + index rebuilds — the comment in - /// `merge.rs` explicitly says so). Strict classification would roll - /// back valid completed Phase B work as `UnexpectedMultistep`. + /// BranchMerge advances each table by several commits per table + /// (adopt: append + upsert + delete; three-way: merge_insert + delete + + /// index), so a bare "HEAD moved" is ambiguous between a complete and a + /// partial publish. At a confirmation-aware version the two-phase + /// confirmation resolves it: roll forward ONLY to the recorded + /// `confirmed_version`; an unconfirmed moved HEAD is a partial publish + /// (`IncompletePhaseB` ⇒ roll back), and a confirmed version that doesn't + /// match the observed HEAD is a foreign advance (`UnexpectedMultistep` ⇒ + /// roll back). A *pre-confirmation* (v1) sidecar carries no confirmation and + /// must keep the original loose roll-forward — reading it as strict would + /// roll a completed pre-upgrade merge back (silent discard). #[test] - fn classify_loose_match_accepts_multi_commit_drift_for_branch_merge() { - let pin = make_pin("node:Person", "irrelevant", 5, 6); + fn classify_branch_merge_requires_phase_b_confirmation() { + // Unconfirmed multi-commit drift at a confirmation-aware version → + // partial Phase B → roll back. + let unconfirmed = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 8, 5, SidecarKind::BranchMerge), + classify_table(&unconfirmed, 8, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION), + TableClassification::IncompletePhaseB, + ); + // Backward-compat: the SAME unconfirmed pin in a PRE-confirmation (v1) + // sidecar → loose roll-forward (the regression fix — a completed + // pre-upgrade merge must not be discarded). + assert_eq!( + classify_table( + &unconfirmed, + 8, + 5, + SidecarKind::BranchMerge, + CONFIRMATION_SCHEMA_VERSION - 1, + ), TableClassification::RolledPastExpected, ); + // Confirmed to the observed HEAD → complete Phase B → roll forward. + let confirmed = SidecarTablePin { + confirmed_version: Some(8), + ..make_pin("node:Person", "irrelevant", 5, 6) + }; + assert_eq!( + classify_table(&confirmed, 8, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION), + TableClassification::RolledPastExpected, + ); + // Confirmed, but HEAD drifted past it (foreign writer) → roll back. + assert_eq!( + classify_table(&confirmed, 9, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION), + TableClassification::UnexpectedMultistep, + ); } #[test] fn classify_loose_match_branch_merge_no_movement_unchanged() { let pin = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 5, 5, SidecarKind::BranchMerge), + classify_table(&pin, 5, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION), TableClassification::NoMovement, ); } @@ -1733,7 +1985,7 @@ mod tests { fn classify_loose_match_branch_merge_invariant_violation_unchanged() { let pin = make_pin("node:Person", "irrelevant", 5, 6); assert_eq!( - classify_table(&pin, 3, 5, SidecarKind::BranchMerge), + classify_table(&pin, 3, 5, SidecarKind::BranchMerge, SIDECAR_SCHEMA_VERSION), TableClassification::InvariantViolation { observed: 3 }, ); } @@ -1888,6 +2140,37 @@ mod tests { assert!(after.is_empty()); } + #[tokio::test] + async fn confirm_sidecar_phase_b_errors_when_pin_missing_from_updates() { + // A pinned table with no achieved version in the publish `updates` must + // be a loud producer error, NOT a silent skip that leaves the pin + // unconfirmed (which recovery would read as a partial Phase B and roll + // the whole complete merge back). Guards the implicit `pins ⊆ updates` + // invariant against a future divergence between the two filters. + let dir = tempfile::tempdir().unwrap(); + let storage = ObjectStorageAdapter::local(); + let mut sidecar = new_sidecar( + SidecarKind::BranchMerge, + Some("main".to_string()), + None, + vec![make_pin("node:Person", "file:///tmp/x.lance", 5, 6)], + ); + // The confirmed-versions map does NOT cover the pinned table. + let confirmed: HashMap = HashMap::new(); + let err = confirm_sidecar_phase_b( + dir.path().to_str().unwrap(), + &storage, + &mut sidecar, + &confirmed, + ) + .await + .expect_err("a pinned table with no achieved version must be a loud error"); + assert!( + err.to_string().contains("pins and publish updates diverged"), + "expected a pin/updates divergence error, got: {err}", + ); + } + #[tokio::test] async fn list_sidecars_skips_non_json_files() { let dir = tempfile::tempdir().unwrap(); diff --git a/crates/omnigraph/src/db/omnigraph/optimize.rs b/crates/omnigraph/src/db/omnigraph/optimize.rs index 498f9ae..9a0a17f 100644 --- a/crates/omnigraph/src/db/omnigraph/optimize.rs +++ b/crates/omnigraph/src/db/omnigraph/optimize.rs @@ -420,6 +420,9 @@ async fn optimize_one_table( // Lower bound — compaction commits N≥1 versions (reserve + rewrite); // the classifier loose-matches SidecarKind::Optimize. post_commit_pin: expected_version + 1, + // Optimize uses the loose match (drift is derived state), not + // BranchMerge's Phase-B confirmation — left None. + confirmed_version: None, table_branch: None, }], ); diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index d013eb2..3089641 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -362,6 +362,9 @@ where table_path: db.storage().dataset_uri(&entry.table_path), expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, + // SchemaApply uses the loose match, not BranchMerge's Phase-B + // confirmation — left None. + confirmed_version: None, table_branch: entry.table_branch.clone(), }) }) diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index c325931..ed5d082 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -125,6 +125,9 @@ pub(super) async fn ensure_indices_for_branch( table_path: full_path, expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, + // EnsureIndices uses the loose match (index coverage is derived + // state), not BranchMerge's Phase-B confirmation — left None. + confirmed_version: None, // Use active_branch (where commits actually land), NOT // entry.table_branch (where the table currently lives). // open_owned_dataset_for_branch_write forks a feature @@ -150,6 +153,9 @@ pub(super) async fn ensure_indices_for_branch( table_path: full_path, expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, + // EnsureIndices uses the loose match (index coverage is derived + // state), not BranchMerge's Phase-B confirmation — left None. + confirmed_version: None, // Use active_branch (where commits actually land), NOT // entry.table_branch (where the table currently lives). // open_owned_dataset_for_branch_write forks a feature diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index 5d0be74..600fdf1 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -5,7 +5,14 @@ const MERGE_STAGE_DIR_ENV: &str = "OMNIGRAPH_MERGE_STAGING_DIR"; #[derive(Debug)] enum CandidateTableState { + /// Adopt the source's table state via a pointer switch or a branch fork — + /// no data HEAD advance, so nothing to pin for recovery. AdoptSourceState, + /// Adopt the source's state by applying a non-empty delta onto the target's + /// lineage (append new + upsert changed + delete removed). The delta is + /// pre-computed at classification so this candidate can be recovery-pinned: + /// its publish advances Lance HEAD before the manifest commit. + AdoptWithDelta(AdoptDelta), RewriteMerged(StagedMergeResult), } @@ -22,6 +29,38 @@ struct StagedMergeResult { deleted_ids: Vec, } +/// Delta for an adopted-source merge (the fast-forward / target-owns path): +/// the new + changed rows to apply onto the target's base lineage, plus the ids +/// removed on source. Distinct from [`StagedMergeResult`] (the three-way path), +/// which also carries a `full_staged` table for validation — the adopt path +/// validates against the source snapshot directly (`candidate_dataset`), so it +/// needs no `full_staged` and never builds it. +/// +/// TRANSITIONAL — fragment-adopt excision point. This whole row-level adopt +/// (`AdoptDelta`, [`compute_adopt_delta`], [`publish_adopted_delta`], and the +/// streaming append it drives) re-derives the source branch row-by-row because +/// today's Lance offers no fragment-level branch merge. When Lance ships +/// branch-merge/rebase ([#7263]) + UUID branch paths ([#7185]), a fast-forward +/// merge becomes a *fragment graft* — adopt the source table version's +/// fragments (and their already-built indexes) by reference, no rows scanned, +/// re-appended, upserted, or deleted. At that point this struct and its two +/// functions are removed wholesale; the merge collapses to ~one ref/metadata +/// op per table. Keep them self-contained so that excision stays a clean delete. +/// +/// [#7263]: https://github.com/lance-format/lance/issues/7263 +/// [#7185]: https://github.com/lance-format/lance/issues/7185 +#[derive(Debug)] +struct AdoptDelta { + /// New-on-source rows → `stage_append` (a streaming `Operation::Append`, no + /// hash join). The connector's dominant case and the OOM fix: appending new + /// rows never buffers the whole delta in a full-outer hash join. + appends: Option, + /// Changed-on-source rows → `stage_merge_insert` (a hash join bounded to the + /// genuinely-changed set, not the whole delta). + upserts: Option, + deleted_ids: Vec, +} + #[derive(Debug, Clone)] struct CursorRow { id: String, @@ -31,24 +70,48 @@ struct CursorRow { row_index: usize, } +impl CursorRow { + /// Compute this row's signature on demand. Used by the lazy adopt cursor, + /// where `signature` is left empty; the value is identical to the eager + /// `signature` field the three-way cursor populates. + fn compute_signature(&self) -> Result { + row_signature(&self.batch, self.row_index) + } +} + struct OrderedTableCursor { stream: Option>>, dataset: Option, current_batch: Option, current_row: usize, peeked: Option, + /// When false, `next_row` leaves `CursorRow::signature` empty and callers + /// compute it on demand via `CursorRow::compute_signature`. The adopt path + /// uses this: new/deleted rows never need a signature comparison and would + /// otherwise eagerly stringify their embedding for nothing. + eager_signatures: bool, } impl OrderedTableCursor { async fn from_snapshot(snapshot: &Snapshot, table_key: &str) -> Result { + Self::open(snapshot, table_key, true).await + } + + /// Like `from_snapshot` but leaves row signatures uncomputed (callers use + /// `CursorRow::compute_signature` on demand). See `eager_signatures`. + async fn from_snapshot_lazy(snapshot: &Snapshot, table_key: &str) -> Result { + Self::open(snapshot, table_key, false).await + } + + async fn open(snapshot: &Snapshot, table_key: &str, eager_signatures: bool) -> Result { let dataset = match snapshot.entry(table_key) { Some(_) => Some(snapshot.open(table_key).await?), None => None, }; - Self::from_dataset(dataset).await + Self::from_dataset(dataset, eager_signatures).await } - async fn from_dataset(dataset: Option) -> Result { + async fn from_dataset(dataset: Option, eager_signatures: bool) -> Result { let stream = if let Some(ds) = &dataset { Some(Box::pin( crate::table_store::TableStore::scan_stream_with( @@ -71,6 +134,7 @@ impl OrderedTableCursor { current_batch: None, current_row: 0, peeked: None, + eager_signatures, }) } @@ -97,9 +161,14 @@ impl OrderedTableCursor { let dataset = self.dataset.clone().ok_or_else(|| { OmniError::manifest("cursor row missing source dataset".to_string()) })?; + let signature = if self.eager_signatures { + row_signature(batch, row_index)? + } else { + String::new() + }; return Ok(Some(CursorRow { id: row_id_at(batch, row_index)?, - signature: row_signature(batch, row_index)?, + signature, dataset, batch: batch.clone(), row_index, @@ -258,20 +327,30 @@ fn sanitize_table_key(table_key: &str) -> String { } /// Computes the delta between base and source for an adopted-source merge. -/// Returns the changed/new rows (for merge_insert) and deleted IDs (for delete). -async fn compute_source_delta( +/// Returns the new + changed rows and the ids deleted on source. +/// +/// Unchanged rows are dropped: the adopt path validates against the source +/// snapshot directly (`candidate_dataset`), so no `full_staged` table is built +/// — saving the O(rows) temp write that `compute_source_delta` used to produce +/// and then discard. +/// +/// TRANSITIONAL — removed by the fragment-adopt work (see [`AdoptDelta`]): a +/// fragment graft adopts the source's fragments by reference, so there is no +/// row-level delta to compute. +async fn compute_adopt_delta( table_key: &str, catalog: &Catalog, base_snapshot: &Snapshot, source_snapshot: &Snapshot, -) -> Result> { +) -> Result> { let schema = schema_for_table_key(catalog, table_key)?; - let mut full_writer = - StagedTableWriter::new(&format!("{}_adopt_full", table_key), schema.clone())?; - let mut delta_writer = StagedTableWriter::new(&format!("{}_adopt_delta", table_key), schema)?; + let mut append_writer = + StagedTableWriter::new(&format!("{}_adopt_append", table_key), schema.clone())?; + let mut upsert_writer = + StagedTableWriter::new(&format!("{}_adopt_upsert", table_key), schema)?; let mut deleted_ids: Vec = Vec::new(); - let mut base = OrderedTableCursor::from_snapshot(base_snapshot, table_key).await?; - let mut source = OrderedTableCursor::from_snapshot(source_snapshot, table_key).await?; + let mut base = OrderedTableCursor::from_snapshot_lazy(base_snapshot, table_key).await?; + let mut source = OrderedTableCursor::from_snapshot_lazy(source_snapshot, table_key).await?; let mut needs_update = false; @@ -297,9 +376,6 @@ async fn compute_source_delta( None }; - let base_sig = base_row.as_ref().map(|r| r.signature.as_str()); - let source_sig = source_row.as_ref().map(|r| r.signature.as_str()); - match (&base_row, &source_row) { (Some(_), None) => { // Deleted on source @@ -307,20 +383,21 @@ async fn compute_source_delta( needs_update = true; } (None, Some(src)) => { - // New on source - full_writer.push_row(src).await?; - delta_writer.push_row(src).await?; + // New on source → append (streaming, no hash join). No signature + // needed — a new id is absent from base by construction. + append_writer.push_row(src).await?; needs_update = true; } - (Some(_), Some(src)) if source_sig != base_sig => { - // Changed on source - full_writer.push_row(src).await?; - delta_writer.push_row(src).await?; - needs_update = true; - } - (Some(base), Some(_)) => { - // Unchanged — write to full (for validation), skip delta - full_writer.push_row(base).await?; + (Some(base), Some(src)) => { + // Present on both — compute signatures lazily (the only case + // that needs them) to tell a changed row from an unchanged one. + // New/deleted rows above skip the embedding stringify entirely. + if src.compute_signature()? != base.compute_signature()? { + // Changed on source → upsert. + upsert_writer.push_row(src).await?; + needs_update = true; + } + // else unchanged — already on the target's base lineage; drop. } (None, None) => unreachable!(), } @@ -330,15 +407,20 @@ async fn compute_source_delta( return Ok(None); } - let delta_staged = if delta_writer.row_count > 0 { - Some(delta_writer.finish().await?) + let appends = if append_writer.row_count > 0 { + Some(append_writer.finish().await?) + } else { + None + }; + let upserts = if upsert_writer.row_count > 0 { + Some(upsert_writer.finish().await?) } else { None }; - Ok(Some(StagedMergeResult { - full_staged: full_writer.finish().await?, - delta_staged, + Ok(Some(AdoptDelta { + appends, + upserts, deleted_ids, })) } @@ -651,10 +733,12 @@ async fn candidate_dataset( ) -> Result> { if let Some(candidate) = candidates.get(table_key) { return match candidate { - CandidateTableState::AdoptSourceState => match source_snapshot.entry(table_key) { - Some(_) => Ok(Some(source_snapshot.open(table_key).await?)), - None => Ok(None), - }, + CandidateTableState::AdoptSourceState | CandidateTableState::AdoptWithDelta(_) => { + match source_snapshot.entry(table_key) { + Some(_) => Ok(Some(source_snapshot.open(table_key).await?)), + None => Ok(None), + } + } CandidateTableState::RewriteMerged(staged) => { Ok(Some(staged.full_staged.dataset.clone())) } @@ -840,13 +924,62 @@ fn row_id_at(batch: &RecordBatch, row: usize) -> Result { Ok(ids.value(row).to_string()) } -async fn publish_adopted_source_state( +/// Classify a table whose target state equals base (the adopt / fast-forward +/// case). Returns [`CandidateTableState::AdoptWithDelta`] — with the delta +/// pre-computed so it can be recovery-pinned — when the adopt applies a +/// non-empty delta onto the target's lineage (a HEAD-advancing publish via +/// [`publish_adopted_delta`]); otherwise [`CandidateTableState::AdoptSourceState`] +/// (a pointer switch or fork, which does not advance the data HEAD). +/// +/// The HEAD-advancing subcases mirror [`publish_adopted_source_state`]: source +/// on a branch with the target either on main or owning the table. Computing the +/// delta here (rather than inside the publish) is what closes the recovery gap — +/// the classifier knows whether the publish will move Lance HEAD. +async fn classify_adopt( target_db: &Omnigraph, catalog: &Catalog, base_snapshot: &Snapshot, source_snapshot: &Snapshot, target_snapshot: &Snapshot, table_key: &str, +) -> Result { + let Some(source_entry) = source_snapshot.entry(table_key) else { + return Ok(CandidateTableState::AdoptSourceState); + }; + let target_entry = target_snapshot.entry(table_key); + let target_active = target_db.active_branch().await; + let advances_head = match ( + target_active.as_deref(), + source_entry.table_branch.as_deref(), + ) { + // Source on a branch, target on main — delta applied onto main's lineage. + (None, Some(_)) => true, + // Both on branches, target owns this table — delta applied onto it. + (Some(target_branch), Some(_)) => { + target_entry.and_then(|e| e.table_branch.as_deref()) == Some(target_branch) + } + // Source on main (pointer switch) or target doesn't own (fork): no advance. + _ => false, + }; + if !advances_head { + return Ok(CandidateTableState::AdoptSourceState); + } + match compute_adopt_delta(table_key, catalog, base_snapshot, source_snapshot).await? { + Some(delta) => Ok(CandidateTableState::AdoptWithDelta(delta)), + None => Ok(CandidateTableState::AdoptSourceState), + } +} + +/// Adopt the source's table state without applying a row delta: a pointer +/// switch (source/target share lineage) or a branch fork. The HEAD-advancing +/// delta case is classified [`CandidateTableState::AdoptWithDelta`] and +/// published by [`publish_adopted_delta`], so reaching the branch-bearing arms +/// here means the delta was empty. +async fn publish_adopted_source_state( + target_db: &Omnigraph, + source_snapshot: &Snapshot, + target_snapshot: &Snapshot, + table_key: &str, ) -> Result { let source_entry = source_snapshot .entry(table_key) @@ -875,44 +1008,31 @@ async fn publish_adopted_source_state( row_count: source_entry.row_count, version_metadata: source_entry.version_metadata.clone(), }), - // Source on branch, target on main — apply delta to preserve version metadata - (None, Some(_source_branch)) => { - let delta = - compute_source_delta(table_key, catalog, base_snapshot, source_snapshot).await?; - match delta { - Some(staged) => publish_rewritten_merge_table(target_db, table_key, &staged).await, - None => Ok(crate::db::SubTableUpdate { - table_key: table_key.to_string(), - table_version: target_entry - .map(|e| e.table_version) - .unwrap_or(source_entry.table_version), - table_branch: None, - row_count: source_entry.row_count, - version_metadata: target_entry - .map(|entry| entry.version_metadata.clone()) - .unwrap_or_else(|| source_entry.version_metadata.clone()), - }), - } - } + // Source on branch, target on main, empty delta — adopt source's + // version by a pointer switch (the non-empty case is `AdoptWithDelta`). + (None, Some(_source_branch)) => Ok(crate::db::SubTableUpdate { + table_key: table_key.to_string(), + table_version: target_entry + .map(|e| e.table_version) + .unwrap_or(source_entry.table_version), + table_branch: None, + row_count: source_entry.row_count, + version_metadata: target_entry + .map(|entry| entry.version_metadata.clone()) + .unwrap_or_else(|| source_entry.version_metadata.clone()), + }), // Both on branches (Some(target_branch), Some(source_branch)) => { if target_entry.and_then(|entry| entry.table_branch.as_deref()) == Some(target_branch) { - // Target already owns this table — apply delta onto its lineage - let delta = - compute_source_delta(table_key, catalog, base_snapshot, source_snapshot) - .await?; - match delta { - Some(staged) => { - publish_rewritten_merge_table(target_db, table_key, &staged).await - } - None => Ok(crate::db::SubTableUpdate { - table_key: table_key.to_string(), - table_version: target_entry.unwrap().table_version, - table_branch: Some(target_branch.to_string()), - row_count: source_entry.row_count, - version_metadata: target_entry.unwrap().version_metadata.clone(), - }), - } + // Target already owns this table, empty delta — pointer switch + // onto its own lineage (the non-empty case is `AdoptWithDelta`). + Ok(crate::db::SubTableUpdate { + table_key: table_key.to_string(), + table_version: target_entry.unwrap().table_version, + table_branch: Some(target_branch.to_string()), + row_count: source_entry.row_count, + version_metadata: target_entry.unwrap().version_metadata.clone(), + }) } else { // Target doesn't own this table yet — fork from source state. // This creates the target branch on the sub-table dataset. @@ -1000,6 +1120,13 @@ async fn publish_rewritten_merge_table( } } + // Failpoint: crash after the Phase 1 merge_insert commit, before the delete. + // Models a partial Phase B on the three-way path — the merged constructive + // rows are on Lance HEAD but the delete has not committed and the + // achieved-version intent has not been recorded, so recovery must roll BACK. + // See tests/failpoints.rs::branch_merge_rewrite_partial_after_merge_rolls_back. + crate::failpoints::maybe_fail("branch_merge.rewrite_after_merge_pre_delete")?; + // Phase 2: delete removed rows via deletion vectors. // // INLINE-COMMIT RESIDUAL: lance-6.0.1 does not expose a public @@ -1023,6 +1150,14 @@ async fn publish_rewritten_merge_table( current_ds = new_ds; } + // Failpoint: crash after the Phase 2 delete commit, before the index build. + // Models a partial Phase B on the three-way path — constructive rows + + // deletes are on Lance HEAD but the achieved-version intent has not been + // recorded, so recovery must roll BACK (the index is reconciler-owned derived + // state, but the merge itself never reached its commit boundary). See + // tests/failpoints.rs::branch_merge_rewrite_partial_after_delete_rolls_back. + crate::failpoints::maybe_fail("branch_merge.rewrite_after_delete_pre_index")?; + // Phase 3: rebuild indices. // // `build_indices_on_dataset` uses `stage_create_btree_index` / @@ -1054,6 +1189,160 @@ async fn publish_rewritten_merge_table( }) } +/// Scan a staged temp table and concat its non-empty batches into the single +/// batch that `stage_append` / `stage_merge_insert` consume. Returns `None` when +/// the table has no rows (both staged primitives reject an empty batch). +async fn scan_staged_combined( + target_db: &Omnigraph, + table: &StagedTable, +) -> Result> { + crate::instrumentation::record_scan_staged_combined(); + let snapshot = SnapshotHandle::new(table.dataset.clone()); + let batches: Vec = target_db + .storage() + .scan_batches_for_rewrite(&snapshot) + .await? + .into_iter() + .filter(|batch| batch.num_rows() > 0) + .collect(); + if batches.is_empty() { + return Ok(None); + } + let combined = if batches.len() == 1 { + batches.into_iter().next().unwrap() + } else { + let schema = batches[0].schema(); + arrow_select::concat::concat_batches(&schema, &batches) + .map_err(|e| OmniError::Lance(e.to_string()))? + }; + Ok(Some(combined)) +} + +/// Apply an [`AdoptDelta`] onto the target's base lineage (the fast-forward / +/// target-owns path). Kept separate from `publish_rewritten_merge_table` (the +/// three-way path) because the two paths diverge: commit 3 splits this Phase 1 +/// into append (new) + merge_insert (changed), and commit 6 makes its index +/// coverage incremental — neither of which the three-way path takes. +/// +/// `open_for_mutation(Merge)` opens the target's own table lineage (active +/// branch is the merge target after the caller's swap), so every write lands on +/// the target and survives source-branch deletion — GC-safe. +/// +/// TRANSITIONAL — removed by the fragment-adopt work (see [`AdoptDelta`]): the +/// multi-commit append → upsert → delete publish here (the source of the +/// partial-Phase-B recovery window the sidecar confirmation guards) collapses to +/// a single fragment-graft commit per table, so this whole function goes away. +async fn publish_adopted_delta( + target_db: &Omnigraph, + table_key: &str, + delta: &AdoptDelta, +) -> Result { + let (ds, full_path, table_branch) = target_db + .open_for_mutation(table_key, crate::db::MutationOpKind::Merge) + .await?; + let mut current_ds = ds; + + // Phase 1a: append the NEW rows. `stage_append_stream` is a streaming + // `Operation::Append` — no hash join — so it never buffers the delta and + // cannot exhaust the DataFusion memory pool (the OOM fix). It streams the + // staged rows straight into the target (Lance rolls fragments at + // `max_rows_per_file`), so memory is bounded regardless of how many rows the + // connector appended — never the whole set in one batch. New ids are absent + // from base by construction (the ordered walk only classifies a row + // `(None, Some)` when base lacks it), so they never collide on `id`. Routed + // through the staged primitive so a failure between writing fragments and + // committing leaves no Lance-HEAD drift. `appends` is `Some` only when the + // staged table is non-empty (`compute_adopt_delta`). + if let Some(append_table) = &delta.appends { + let source = SnapshotHandle::new(append_table.dataset.clone()); + let staged = target_db + .storage() + .stage_append_stream(¤t_ds, &source, &[]) + .await?; + current_ds = target_db + .storage() + .commit_staged(current_ds, staged) + .await?; + } + + // Failpoint: crash after the Phase 1a append commit, before the upsert. + // Models a partial Phase B — appends are on Lance HEAD but the upserts/deletes + // have not committed and the achieved-version intent has not been recorded, so + // recovery must roll BACK (not publish the appends-only state). See + // tests/failpoints.rs::branch_merge_adopt_partial_after_append_rolls_back. + crate::failpoints::maybe_fail("branch_merge.adopt_after_append_pre_upsert")?; + + // Phase 1b: upsert the CHANGED rows. The merge_insert hash join is now + // bounded to the genuinely-changed set, not the whole delta. It runs against + // the committed view that already includes the appends; the changed ids are + // disjoint from the appended ids (each id is classified into exactly one of + // new / changed / deleted / unchanged in the single ordered walk), so the + // join never collides with an appended row. + if let Some(upsert_table) = &delta.upserts { + if let Some(combined) = scan_staged_combined(target_db, upsert_table).await? { + let staged_merge = target_db + .storage() + .stage_merge_insert( + current_ds.clone(), + combined, + vec!["id".to_string()], + lance::dataset::WhenMatched::UpdateAll, + lance::dataset::WhenNotMatched::InsertAll, + ) + .await?; + current_ds = target_db + .storage() + .commit_staged(current_ds, staged_merge) + .await?; + } + } + + // Failpoint: crash after the Phase 1b upsert commit, before the delete. + // Models a partial Phase B — appends + upserts on Lance HEAD but the delete + // has not committed and the achieved-version intent has not been recorded, so + // recovery must roll BACK. See + // tests/failpoints.rs::branch_merge_adopt_partial_after_upsert_rolls_back. + crate::failpoints::maybe_fail("branch_merge.adopt_after_upsert_pre_delete")?; + + // Phase 2: delete removed rows via deletion vectors (inline-commit residual, + // same as the three-way path until Lance ships a public two-phase delete). + if !delta.deleted_ids.is_empty() { + let escaped: Vec = delta + .deleted_ids + .iter() + .map(|id| format!("'{}'", id.replace('\'', "''"))) + .collect(); + let filter = format!("id IN ({})", escaped.join(", ")); + let (new_ds, _) = target_db + .storage_inline_residual() + .delete_where(&full_path, current_ds, &filter) + .await?; + current_ds = new_ds; + } + + // Phase 4: index coverage is reconciler-owned on the adopt path. Unlike the + // three-way `RewriteMerged` path, this does NOT build indices inline: the + // appended/upserted rows are left uncovered (reads stay correct via + // brute-force — indexes are derived state, invariant 7) and + // `optimize` / `ensure_indices` folds them in. This keeps even the first + // merge into a freshly schema-applied (unindexed) table fast — no inline IVF + // retrain on the publish path — and is the row-level approximation of Layer + // 2's fragment-adopt, where the source branch's already-built indices carry + // over by reference. See docs/user/branching/merge.md. + let final_state = target_db + .storage() + .table_state(&full_path, ¤t_ds) + .await?; + + Ok(crate::db::SubTableUpdate { + table_key: table_key.to_string(), + table_version: final_state.version, + table_branch, + row_count: final_state.row_count, + version_metadata: final_state.version_metadata, + }) +} + impl Omnigraph { pub async fn branch_merge(&self, source: &str, target: &str) -> Result { self.branch_merge_as(source, target, None).await @@ -1262,7 +1551,16 @@ impl Omnigraph { continue; } if same_manifest_state(base_entry, target_entry) { - candidates.insert(table_key.clone(), CandidateTableState::AdoptSourceState); + let candidate = classify_adopt( + self, + &self.catalog(), + base_snapshot, + source_snapshot, + &target_snapshot, + table_key, + ) + .await?; + candidates.insert(table_key.clone(), candidate); continue; } @@ -1290,31 +1588,24 @@ impl Omnigraph { validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?; // Recovery sidecar: protect the per-table commit_staged loop. - // Pin only `RewriteMerged` candidates because they always - // advance Lance HEAD through `publish_rewritten_merge_table` - // (which runs stage_merge_insert + delete_where + index - // rebuilds — multiple commit_staged calls per table; loose - // classification handles the multi-step drift). + // Pin `RewriteMerged` and `AdoptWithDelta` candidates — both advance + // Lance HEAD before the manifest publish (RewriteMerged via + // publish_rewritten_merge_table; AdoptWithDelta via publish_adopted_delta: + // stage_append + stage_merge_insert + delete_where + index — multiple + // commit_staged calls per table, which the loose classification handles + // as multi-step drift). // // `AdoptSourceState` candidates are NOT pinned: their publish - // path is `publish_adopted_source_state`, whose subcases mostly - // don't advance Lance HEAD (pure manifest pointer switch, or - // fork via `fork_dataset_from_entry_state` which only adds a - // Lance branch ref). If those subcases were pinned, recovery - // would classify them as NoMovement and the all-or-nothing - // decision would force a rollback that destroys legitimately- - // committed work on sibling RewriteMerged tables. + // (`publish_adopted_source_state`) is a pure pointer switch or a fork + // (`fork_dataset_from_entry_state` only adds a Lance branch ref), neither + // of which advances the data HEAD. Pinning them would classify as + // NoMovement and force an all-or-nothing rollback that destroys sibling + // tables' committed work. // - // Residual: two `AdoptSourceState` subcases (when source has a - // table_branch AND the source delta is non-empty) internally - // call `publish_rewritten_merge_table` and DO advance HEAD. - // Those are not covered by this sidecar — if they fail mid- - // commit, the residual persists until the next ReadWrite open - // detects it via a subsequent ExpectedVersionMismatch from a - // later writer that touches the same table. Closing this gap - // requires pre-computing source deltas during candidate - // classification (a structural change to `CandidateTableState`) - // and is left as follow-up work. + // The former gap — adopt subcases that applied a non-empty delta advanced + // HEAD unpinned — is closed: `classify_adopt` pre-computes the delta, so a + // HEAD-advancing adopt is `AdoptWithDelta` (pinned here) and an empty-delta + // adopt stays `AdoptSourceState`. // Acquire per-(table_key, target_branch) queues for every table // touched by the merge plan. Sorted-order acquisition prevents // lock-order inversion against concurrent multi-table writers. @@ -1334,6 +1625,7 @@ impl Omnigraph { candidates.get(*table_key), Some(CandidateTableState::RewriteMerged(_)) | Some(CandidateTableState::AdoptSourceState) + | Some(CandidateTableState::AdoptWithDelta(_)) ) }) .map(|table_key| (table_key.clone(), active_branch_for_keys.clone())) @@ -1347,7 +1639,9 @@ impl Omnigraph { }; if !matches!( candidate, - CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptSourceState + CandidateTableState::RewriteMerged(_) + | CandidateTableState::AdoptSourceState + | CandidateTableState::AdoptWithDelta(_) ) { continue; } @@ -1368,7 +1662,10 @@ impl Omnigraph { .iter() .filter_map(|table_key| { let candidate = candidates.get(table_key)?; - if !matches!(candidate, CandidateTableState::RewriteMerged(_)) { + if !matches!( + candidate, + CandidateTableState::RewriteMerged(_) | CandidateTableState::AdoptWithDelta(_) + ) { return None; } let entry = target_snapshot.entry(table_key)?; @@ -1377,6 +1674,11 @@ impl Omnigraph { table_path: self.storage().dataset_uri(&entry.table_path), expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, + // Stamped after the whole per-table publish completes + // (Phase-B confirmation, just before the manifest publish). + // Until then `None` marks an unfinished publish that + // recovery must roll back, not roll forward. + confirmed_version: None, // Use the merge target branch (where commits actually // land), NOT entry.table_branch (where the table // currently lives). publish_rewritten_merge_table calls @@ -1393,7 +1695,14 @@ impl Omnigraph { }) }) .collect(); - let recovery_handle = if recovery_pins.is_empty() { + // Keep the sidecar alongside its handle: after the per-table publish + // loop completes (Phase B), we re-write it with each table's confirmed + // version before the manifest publish, so recovery can tell a finished + // publish (roll forward) from a partial one (roll back). + let mut recovery: Option<( + crate::db::manifest::RecoverySidecar, + crate::db::manifest::RecoverySidecarHandle, + )> = if recovery_pins.is_empty() { None } else { // Use the merge target branch directly, NOT a heuristic @@ -1418,14 +1727,13 @@ impl Omnigraph { // this, future merges between the same pair lose // already-up-to-date detection and merge-base correctness. sidecar.merge_source_commit_id = Some(source_head_commit_id.to_string()); - Some( - crate::db::manifest::write_sidecar( - self.root_uri(), - self.storage_adapter(), - &sidecar, - ) - .await?, + let handle = crate::db::manifest::write_sidecar( + self.root_uri(), + self.storage_adapter(), + &sidecar, ) + .await?; + Some((sidecar, handle)) }; let mut updates = Vec::new(); @@ -1436,15 +1744,11 @@ impl Omnigraph { }; let update = match candidate_state { CandidateTableState::AdoptSourceState => { - publish_adopted_source_state( - self, - &self.catalog(), - base_snapshot, - source_snapshot, - &target_snapshot, - table_key, - ) - .await? + publish_adopted_source_state(self, source_snapshot, &target_snapshot, table_key) + .await? + } + CandidateTableState::AdoptWithDelta(delta) => { + publish_adopted_delta(self, table_key, delta).await? } CandidateTableState::RewriteMerged(staged) => { publish_rewritten_merge_table(self, table_key, staged).await? @@ -1456,10 +1760,33 @@ impl Omnigraph { updates.push(update); } + // Phase-B confirmation: every table's publish finished, so stamp the + // sidecar with each table's exact achieved version before the manifest + // publish. This is the commit point of the recovery WAL: a crash from + // here on rolls FORWARD to these versions, while a crash anywhere in the + // publish loop above left the sidecar unconfirmed and rolls BACK. The + // `updates` carry the real per-table final versions (multiple + // commit_staged calls per table, so not derivable from `post_commit_pin` + // alone). A failure here leaves the unconfirmed sidecar → roll back. + if let Some((sidecar, _)) = recovery.as_mut() { + let confirmed_versions: std::collections::HashMap = updates + .iter() + .map(|u| (u.table_key.clone(), u.table_version)) + .collect(); + crate::db::manifest::confirm_sidecar_phase_b( + self.root_uri(), + self.storage_adapter(), + sidecar, + &confirmed_versions, + ) + .await?; + } + // Failpoint: pin the per-writer Phase B → Phase C residual for // branch_merge. Lance HEAD has advanced on every touched table - // (publish_*) but the manifest publish below hasn't run. Used - // by `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`. + // (publish_*) AND the sidecar is confirmed, but the manifest publish + // below hasn't run — so recovery rolls FORWARD. Used by + // `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`. crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?; let manifest_version = if updates.is_empty() { @@ -1471,7 +1798,7 @@ impl Omnigraph { // Recovery sidecar lifecycle: delete after manifest publish. // Best-effort cleanup; the merge already landed durably so // failing the user here is undesirable. - if let Some(handle) = recovery_handle { + if let Some((_, handle)) = recovery { if let Err(err) = crate::db::manifest::delete_sidecar(&handle, self.storage_adapter()).await { diff --git a/crates/omnigraph/src/exec/staging.rs b/crates/omnigraph/src/exec/staging.rs index 464ec34..31d5ce8 100644 --- a/crates/omnigraph/src/exec/staging.rs +++ b/crates/omnigraph/src/exec/staging.rs @@ -712,6 +712,9 @@ impl StagedMutation { table_path: entry.path.full_path.clone(), expected_version: entry.expected_version, post_commit_pin: entry.expected_version + 1, + // Mutation/Load use strict single-commit classification, not + // BranchMerge's Phase-B confirmation — left None. + confirmed_version: None, table_branch: entry.path.table_branch.clone(), }); } @@ -738,6 +741,7 @@ impl StagedMutation { // can advance HEAD by more than one version (e.g., // when Lance internally compacts deletion vectors). post_commit_pin: update.table_version, + confirmed_version: None, table_branch: path.table_branch.clone(), }); } diff --git a/crates/omnigraph/src/instrumentation.rs b/crates/omnigraph/src/instrumentation.rs index 98249c0..de5b7d3 100644 --- a/crates/omnigraph/src/instrumentation.rs +++ b/crates/omnigraph/src/instrumentation.rs @@ -80,6 +80,95 @@ pub(crate) fn record_probe() { let _ = current(|p| p.probe_count.fetch_add(1, Ordering::Relaxed)); } +/// Per-operation staged-write counts, installed for a task via +/// [`with_merge_write_probes`]. Lets a cost-budget test assert WHICH staged-write +/// primitive an operation invokes — e.g. that an append-only fast-forward merge +/// routes new rows through `stage_append` and does **zero** `stage_merge_insert` +/// (the full-outer hash join). Counts the publish-path primitives only; +/// merge-staging temp tables use `append_or_create_batch`, not these. +#[derive(Clone, Default)] +pub struct MergeWriteProbes { + pub stage_append_calls: Arc, + pub stage_append_rows: Arc, + pub stage_merge_insert_calls: Arc, + pub stage_merge_insert_rows: Arc, + /// Inline vector-index (IVF) builds. The fast-forward adopt path defers + /// index coverage to the reconciler, so an adopt merge must do 0 of these. + pub create_vector_index_calls: Arc, + /// Times the merge materialized a staged delta into one in-memory batch + /// (`scan_staged_combined`). The append path streams instead, so an + /// append-only fast-forward merge must do 0 of these. + pub scan_staged_combined_calls: Arc, +} + +impl MergeWriteProbes { + pub fn stage_append_calls(&self) -> u64 { + self.stage_append_calls.load(Ordering::Relaxed) + } + pub fn stage_append_rows(&self) -> u64 { + self.stage_append_rows.load(Ordering::Relaxed) + } + pub fn stage_merge_insert_calls(&self) -> u64 { + self.stage_merge_insert_calls.load(Ordering::Relaxed) + } + pub fn stage_merge_insert_rows(&self) -> u64 { + self.stage_merge_insert_rows.load(Ordering::Relaxed) + } + pub fn create_vector_index_calls(&self) -> u64 { + self.create_vector_index_calls.load(Ordering::Relaxed) + } + pub fn scan_staged_combined_calls(&self) -> u64 { + self.scan_staged_combined_calls.load(Ordering::Relaxed) + } +} + +tokio::task_local! { + static MERGE_WRITE_PROBES: MergeWriteProbes; +} + +/// Run `fut` with staged-write probes installed. Test-only entry point; nothing +/// in production sets the probes, so `record_stage_*` below are no-ops. +pub async fn with_merge_write_probes(probes: MergeWriteProbes, fut: F) -> F::Output +where + F: std::future::Future, +{ + MERGE_WRITE_PROBES.scope(probes, fut).await +} + +/// Record one `stage_append` of `rows` rows against the active probes. No-op in +/// production (no probes installed). +pub(crate) fn record_stage_append(rows: u64) { + let _ = MERGE_WRITE_PROBES.try_with(|p| { + p.stage_append_calls.fetch_add(1, Ordering::Relaxed); + p.stage_append_rows.fetch_add(rows, Ordering::Relaxed); + }); +} + +/// Record one `stage_merge_insert` of `rows` rows against the active probes. +/// No-op in production (no probes installed). +pub(crate) fn record_stage_merge_insert(rows: u64) { + let _ = MERGE_WRITE_PROBES.try_with(|p| { + p.stage_merge_insert_calls.fetch_add(1, Ordering::Relaxed); + p.stage_merge_insert_rows.fetch_add(rows, Ordering::Relaxed); + }); +} + +/// Record one inline vector-index build against the active probes. No-op in +/// production (no probes installed). +pub(crate) fn record_create_vector_index() { + let _ = MERGE_WRITE_PROBES.try_with(|p| { + p.create_vector_index_calls.fetch_add(1, Ordering::Relaxed); + }); +} + +/// Record one `scan_staged_combined` materialization against the active probes. +/// No-op in production (no probes installed). +pub(crate) fn record_scan_staged_combined() { + let _ = MERGE_WRITE_PROBES.try_with(|p| { + p.scan_staged_combined_calls.fetch_add(1, Ordering::Relaxed); + }); +} + /// Open a Lance dataset at `uri`, attaching `wrapper` (for IO counting) when /// present. With no wrapper this is exactly `Dataset::open(uri)`. The wrapper is /// set via `ObjectStoreParams` on the builder so the open itself is counted diff --git a/crates/omnigraph/src/storage_layer.rs b/crates/omnigraph/src/storage_layer.rs index 7c7685d..3ea9647 100644 --- a/crates/omnigraph/src/storage_layer.rs +++ b/crates/omnigraph/src/storage_layer.rs @@ -353,6 +353,15 @@ pub trait TableStorage: sealed::Sealed + Send + Sync + Debug { prior_stages: &[StagedHandle], ) -> Result; + /// Append `source`'s rows into `snapshot`'s table, streaming so the whole + /// row set is never materialized in memory (see `TableStore::stage_append_stream`). + async fn stage_append_stream( + &self, + snapshot: &SnapshotHandle, + source: &SnapshotHandle, + prior_stages: &[StagedHandle], + ) -> Result; + async fn stage_merge_insert( &self, snapshot: SnapshotHandle, @@ -684,6 +693,18 @@ impl TableStorage for TableStore { .map(StagedHandle::new) } + async fn stage_append_stream( + &self, + snapshot: &SnapshotHandle, + source: &SnapshotHandle, + prior_stages: &[StagedHandle], + ) -> Result { + let staged_writes = staged_handles_as_writes(prior_stages); + TableStore::stage_append_stream(self, snapshot.dataset(), source.dataset(), &staged_writes) + .await + .map(StagedHandle::new) + } + async fn stage_merge_insert( &self, snapshot: SnapshotHandle, diff --git a/crates/omnigraph/src/table_store.rs b/crates/omnigraph/src/table_store.rs index 5c99b01..0325e1e 100644 --- a/crates/omnigraph/src/table_store.rs +++ b/crates/omnigraph/src/table_store.rs @@ -2,6 +2,7 @@ use arrow_array::{ Array, ArrayRef, RecordBatch, StringArray, StructArray, UInt8Array, UInt32Array, UInt64Array, }; use arrow_schema::SchemaRef; +use datafusion::physical_plan::SendableRecordBatchStream; use futures::TryStreamExt; use lance::Dataset; use lance::blob::BlobArrayBuilder; @@ -362,6 +363,29 @@ impl TableStore { Ok(materialized) } + /// Streaming, blob-aware sibling of [`Self::scan_batches_for_rewrite`]. + /// Yields the dataset's rows lazily as a `SendableRecordBatchStream` so a + /// downstream writer (`stage_append_stream`) never materializes the whole + /// table in memory. Blob columns still need per-row rebuild, so those tables + /// fall back to the materialized path and are re-streamed from the `Vec` + /// (rare — only tables with a `Blob` property; bounded-memory blob streaming + /// is a follow-up). The non-blob path is a true lazy scan. + pub async fn scan_stream_for_rewrite(&self, ds: &Dataset) -> Result { + let has_blob_columns = ds.schema().fields_pre_order().any(|field| field.is_blob()); + if has_blob_columns { + let arrow_schema: SchemaRef = Arc::new(ds.schema().into()); + let batches = self.scan_batches_for_rewrite(ds).await?; + let reader = arrow_array::RecordBatchIterator::new( + batches.into_iter().map(Ok), + arrow_schema, + ); + return Ok(lance_datafusion::utils::reader_to_stream(Box::new(reader))); + } + // Non-blob: a true lazy scan. `DatasetRecordBatchStream` converts to the + // `SendableRecordBatchStream` that `execute_uncommitted_stream` consumes. + Ok(Self::scan_stream(ds, None, None, None, false).await?.into()) + } + pub(crate) async fn materialize_blob_batch( ds: &Dataset, batch: RecordBatch, @@ -919,6 +943,7 @@ impl TableStore { "stage_append called with empty batch".to_string(), )); } + let appended_rows = batch.num_rows() as u64; let params = WriteParams { mode: WriteMode::Append, allow_external_blob_outside_bases: true, @@ -931,6 +956,9 @@ impl TableStore { .execute_uncommitted(vec![batch]) .await .map_err(|e| OmniError::Lance(e.to_string()))?; + // Record only after the staging write succeeds, so a failed write does + // not inflate the probe (matches `stage_append_stream`'s ordering). + crate::instrumentation::record_stage_append(appended_rows); let mut new_fragments = match &transaction.operation { Operation::Append { fragments } => fragments.clone(), Operation::Overwrite { fragments, .. } => fragments.clone(), @@ -971,6 +999,71 @@ impl TableStore { }) } + /// Streaming variant of [`Self::stage_append`]: appends the rows of `source` + /// into `ds` without materializing them in memory. It scans `source` lazily + /// (`scan_stream_for_rewrite`) and hands the stream to Lance's + /// `execute_uncommitted_stream`, which rolls fragments at `max_rows_per_file` + /// — bounded memory, one Append transaction. This is the substrate-blessed + /// bulk-append path (the same one LanceDB's `Table::add` uses). Identical + /// fragment-id / stable-row-id staging as `stage_append`. + /// + /// TRANSITIONAL caller — its only caller is the row-level merge append + /// (`publish_adopted_delta`, see `AdoptDelta`), which the fragment-adopt work + /// (Lance #7263/#7185) removes: a fragment graft re-appends no rows. This + /// primitive and `scan_stream_for_rewrite` are then dead unless re-adopted as + /// a general bulk-append path (the `Table::add` shape makes that plausible). + pub async fn stage_append_stream( + &self, + ds: &Dataset, + source: &Dataset, + prior_stages: &[StagedWrite], + ) -> Result { + let stream = self.scan_stream_for_rewrite(source).await?; + let params = WriteParams { + mode: WriteMode::Append, + allow_external_blob_outside_bases: true, + auto_cleanup: None, + skip_auto_cleanup: true, + ..Default::default() + }; + let transaction = InsertBuilder::new(Arc::new(ds.clone())) + .with_params(¶ms) + .execute_uncommitted_stream(stream) + .await + .map_err(|e| OmniError::Lance(e.to_string()))?; + let mut new_fragments = match &transaction.operation { + Operation::Append { fragments } => fragments.clone(), + Operation::Overwrite { fragments, .. } => fragments.clone(), + other => { + return Err(OmniError::manifest_internal(format!( + "stage_append_stream: unexpected Lance operation {:?}", + std::mem::discriminant(other) + ))); + } + }; + let appended_rows: u64 = new_fragments + .iter() + .filter_map(|f| f.physical_rows) + .map(|r| r as u64) + .sum(); + crate::instrumentation::record_stage_append(appended_rows); + // Same commit-time fragment-id / row-id renumbering as `stage_append`. + let next_id_base = ds.manifest.max_fragment_id.unwrap_or(0) as u64 + + 1 + + prior_stages_fragment_count(prior_stages); + assign_fragment_ids(&mut new_fragments, next_id_base); + if ds.manifest.uses_stable_row_ids() { + let prior_rows = prior_stages_row_count(prior_stages)?; + let start_row_id = ds.manifest.next_row_id + prior_rows; + assign_row_id_meta(&mut new_fragments, start_row_id)?; + } + Ok(StagedWrite { + transaction, + new_fragments, + removed_fragment_ids: Vec::new(), + }) + } + /// Stage a merge_insert (upsert): write fragment files describing the /// merge result, return the uncommitted transaction plus the new /// fragments. The transaction's `Operation::Update` carries the @@ -1012,6 +1105,7 @@ impl TableStore { "stage_merge_insert called with empty batch".to_string(), )); } + let merged_rows = batch.num_rows() as u64; // Precondition for the FirstSeen workaround below: every call path that // reaches stage_merge_insert (load, MutationStaging::finalize, @@ -1052,6 +1146,9 @@ impl TableStore { .execute_uncommitted(stream) .await .map_err(|e| OmniError::Lance(e.to_string()))?; + // Record only after the staging write succeeds, so a failed write does + // not inflate the probe (matches `stage_append`/`stage_append_stream`). + crate::instrumentation::record_stage_merge_insert(merged_rows); // Operation::Update { removed_fragment_ids, updated_fragments, new_fragments, .. } — // `new_fragments` are the freshly inserted rows; `updated_fragments` // are rewrites of existing fragments that include both retained and @@ -1541,8 +1638,11 @@ impl TableStore { ds.create_index_builder(&[column], IndexType::Vector, ¶ms) .replace(true) .await - .map(|_| ()) - .map_err(|e| OmniError::Lance(e.to_string())) + .map_err(|e| OmniError::Lance(e.to_string()))?; + // Record only after the index build succeeds, so a failed build does not + // inflate the probe (matches the `stage_*` probes). + crate::instrumentation::record_create_vector_index(); + Ok(()) } pub async fn create_empty_dataset(dataset_uri: &str, schema: &SchemaRef) -> Result { diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index 38a60ae..9d65bc1 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -8,12 +8,15 @@ use omnigraph::db::Omnigraph; use omnigraph::error::{ManifestErrorKind, OmniError}; use omnigraph::failpoints::ScopedFailPoint; use omnigraph::loader::LoadMode; +use serial_test::serial; use helpers::recovery::{ FollowUpMutation, RecoveryExpectation, TableExpectation, assert_post_recovery_invariants, branch_head_commit_id, single_sidecar_operation_id, }; -use helpers::{MUTATION_QUERIES, mixed_params, mutate_main, version_main}; +use helpers::{ + MUTATION_QUERIES, collect_column_strings, mixed_params, mutate_main, read_table, version_main, +}; const SCHEMA_V1: &str = "node Person { name: String @key }\n"; const SCHEMA_V2_ADDED_TYPE: &str = @@ -3176,6 +3179,7 @@ async fn optimize_phase_b_failure_recovered_on_next_open() { } #[tokio::test] +#[serial(branch_merge_phase_b)] async fn branch_merge_phase_b_failure_recovered_on_next_open() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -3337,6 +3341,352 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { drop(db); } +/// AdoptWithDelta recovery (the gap closure): a fast-forward merge — main has +/// NOT advanced since the branch forked, so the touched table is classified +/// `AdoptWithDelta`, not `RewriteMerged` — that fails after Phase B must still +/// recover on the next open. Before the recovery-pin closure this drifted +/// silently: the adopt path advanced Lance HEAD but was unpinned, so the sweep +/// found no sidecar and the merge was lost. +#[tokio::test] +#[serial(branch_merge_phase_b)] +async fn branch_merge_adopt_with_delta_phase_b_failure_recovered_on_next_open() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // Seed main, branch off, mutate ONLY the branch. main stays at base, so the + // merge is a fast-forward and Person classifies `AdoptWithDelta` (forked + // source, target == base, non-empty delta) — NOT `RewriteMerged`. + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + db.mutate( + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Bob")], &[("$age", 40)]), + ) + .await + .unwrap(); + // main intentionally NOT mutated → fast-forward → AdoptWithDelta. + } + + let pre_failure_version = { + let db = Omnigraph::open(&uri).await.unwrap(); + version_main(&db).await.unwrap() + }; + + // Fail after the per-table publish loop, before commit_manifest_updates. + { + let db = Omnigraph::open(&uri).await.unwrap(); + let _failpoint = + ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + let err = db.branch_merge("feature", "main").await.unwrap_err(); + assert!( + err.to_string().contains( + "injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit" + ), + "unexpected error: {err}" + ); + + // The gap closure: an AdoptWithDelta merge must persist a sidecar. + let recovery_dir = dir.path().join("__recovery"); + let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert_eq!( + sidecars.len(), + 1, + "AdoptWithDelta merge must persist exactly one recovery sidecar (the closed gap)" + ); + } + + // Reopen → the recovery sweep rolls the AdoptWithDelta merge forward. + let db = Omnigraph::open(&uri).await.unwrap(); + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + let remaining: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert!( + remaining.is_empty(), + "sidecar must be deleted post-recovery; remaining: {remaining:?}" + ); + } + + let post_recovery_version = version_main(&db).await.unwrap(); + assert!( + post_recovery_version > pre_failure_version, + "manifest must advance post-recovery; pre={pre_failure_version} post={post_recovery_version}" + ); + let names = collect_column_strings(&read_table(&db, "node:Person").await, "name"); + assert!( + names.contains(&"Bob".to_string()), + "recovered AdoptWithDelta merge must include Bob; have {names:?}" + ); + drop(db); +} + +/// Which branch-merge publish path a partial-Phase-B test exercises. +enum MergeScenario { + /// main stays at base → the touched table is `AdoptWithDelta` + /// (`publish_adopted_delta`: append → upsert → delete). + Adopt, + /// main advances past base → the touched table is `RewriteMerged` + /// (`publish_rewritten_merge_table`: merge_insert → delete → index). + Rewrite, +} + +async fn sorted_person_names(db: &Omnigraph) -> Vec { + let mut names = collect_column_strings(&read_table(db, "node:Person").await, "name"); + names.sort(); + names +} + +/// THE recovery-atomicity regression gate. A branch merge whose per-table publish +/// is a multi-commit sequence (append → upsert → delete, or merge_insert → delete +/// → index) advances Lance HEAD step by step before the manifest publish. If the +/// process dies *mid*-sequence — after some commits but before the achieved-version +/// intent is recorded — recovery must roll the whole merge **back**, not publish +/// the partial and record the merge as complete. +/// +/// The delta is deliberately MIXED — a fresh id (`bob`, append), a modified base id +/// (`carol`, upsert) and a removed base id (`dave`, delete) — so every partial +/// window leaves real work undone. Proof of rollback: after recovery the target is +/// back at its base name-set, and a *re-run* of the merge re-applies the full delta +/// (the partial was not silently recorded as "already merged"). +/// +/// RED before the fix: the loose `BranchMerge` classification rolls any +/// `lance_head > manifest_pinned` forward, so the partial is published (e.g. `bob` +/// present, `dave` kept) and the merge recorded — the first assert (back at base) +/// fails. GREEN after: `achieved_version == None` → `IncompletePhaseB` → roll back. +async fn assert_partial_merge_rolls_back(scenario: MergeScenario, failpoint: &str) { + use omnigraph::loader::load_jsonl; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // Seed main {alice, carol, dave}; on `feature` add bob (append), bump carol + // (upsert), remove dave (delete). For Rewrite, also move main past base so the + // table classifies RewriteMerged instead of a fast-forward AdoptWithDelta. + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"alice\",\"age\":30}}\n\ + {\"type\":\"Person\",\"data\":{\"name\":\"carol\",\"age\":50}}\n\ + {\"type\":\"Person\",\"data\":{\"name\":\"dave\",\"age\":60}}\n", + LoadMode::Append, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + db.mutate( + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "bob")], &[("$age", 40)]), + ) + .await + .unwrap(); + db.mutate( + "feature", + MUTATION_QUERIES, + "set_age", + &mixed_params(&[("$name", "carol")], &[("$age", 55)]), + ) + .await + .unwrap(); + db.mutate( + "feature", + MUTATION_QUERIES, + "remove_person", + &mixed_params(&[("$name", "dave")], &[]), + ) + .await + .unwrap(); + if matches!(scenario, MergeScenario::Rewrite) { + db.mutate( + "main", + MUTATION_QUERIES, + "set_age", + &mixed_params(&[("$name", "alice")], &[("$age", 35)]), + ) + .await + .unwrap(); + } + } + + // Crash mid-Phase-B at the injected window. + { + let db = Omnigraph::open(&uri).await.unwrap(); + let _fp = ScopedFailPoint::new(failpoint, "return"); + let err = db.branch_merge("feature", "main").await.unwrap_err(); + assert!( + err.to_string().contains(failpoint), + "expected the injected failpoint {failpoint}, got: {err}" + ); + } + + // Reopen → the open-time sweep must ROLL BACK to base (the merge never reached + // its commit boundary), and a re-run must then apply the FULL delta. + { + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + sorted_person_names(&db).await, + vec!["alice", "carol", "dave"], + "partial Phase B at {failpoint} must roll back to base \ + (no bob, dave kept, carol's upsert reverted); the merge must NOT be recorded", + ); + db.branch_merge("feature", "main").await.unwrap(); + assert_eq!( + sorted_person_names(&db).await, + vec!["alice", "bob", "carol"], + "re-merge after rollback must re-apply the full delta \ + (bob added, dave removed) — proof the partial was not silently recorded", + ); + } +} + +#[tokio::test] +#[serial(branch_merge_phase_b)] +async fn branch_merge_adopt_partial_after_append_rolls_back() { + assert_partial_merge_rolls_back( + MergeScenario::Adopt, + "branch_merge.adopt_after_append_pre_upsert", + ) + .await; +} + +#[tokio::test] +#[serial(branch_merge_phase_b)] +async fn branch_merge_adopt_partial_after_upsert_rolls_back() { + assert_partial_merge_rolls_back( + MergeScenario::Adopt, + "branch_merge.adopt_after_upsert_pre_delete", + ) + .await; +} + +#[tokio::test] +#[serial(branch_merge_phase_b)] +async fn branch_merge_rewrite_partial_after_merge_rolls_back() { + assert_partial_merge_rolls_back( + MergeScenario::Rewrite, + "branch_merge.rewrite_after_merge_pre_delete", + ) + .await; +} + +#[tokio::test] +#[serial(branch_merge_phase_b)] +async fn branch_merge_rewrite_partial_after_delete_rolls_back() { + assert_partial_merge_rolls_back( + MergeScenario::Rewrite, + "branch_merge.rewrite_after_delete_pre_index", + ) + .await; +} + +/// Backward-compat: a `BranchMerge` sidecar written by a *pre-confirmation* +/// binary (schema_version 1, no `confirmed_version`) must NOT be misread as a +/// partial Phase B and rolled back. A pre-upgrade crash in the Phase-B→C gap can +/// leave such a sidecar over a *completed* merge; rolling it back would silently +/// discard a finished merge with no operator signal — the regression greptile / +/// Cursor flagged. +/// +/// We synthesize the pre-upgrade sidecar realistically: crash after Phase B (a +/// real sidecar + advanced Lance HEAD), then downgrade the on-disk JSON to the +/// v1 shape (`schema_version` = 1, strip every pin's `confirmed_version`) before +/// reopening — exactly what an old binary would have left. +/// +/// RED before the versioning fix: a v1 sidecar with no `confirmed_version` +/// classifies `IncompletePhaseB` → rolls back → `bob` is discarded. GREEN after: +/// the version-aware classifier reads v1 as the old loose generation → rolls +/// forward → `bob` preserved. +#[tokio::test] +#[serial(branch_merge_phase_b)] +async fn pre_upgrade_v1_branch_merge_sidecar_rolls_forward_not_back() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // main {alice}; feature adds bob → a fast-forward AdoptWithDelta merge, which + // writes a recovery sidecar. + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + "{\"type\":\"Person\",\"data\":{\"name\":\"alice\",\"age\":30}}\n", + LoadMode::Append, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + db.mutate( + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "bob")], &[("$age", 40)]), + ) + .await + .unwrap(); + } + + // Crash after Phase B (Lance HEAD advanced, manifest not published) → a real + // sidecar lands on disk. + { + let db = Omnigraph::open(&uri).await.unwrap(); + let _fp = ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + db.branch_merge("feature", "main").await.unwrap_err(); + } + + // Downgrade the sidecar to the pre-confirmation v1 shape an old binary writes. + { + let recovery_dir = std::path::Path::new(&uri).join("__recovery"); + let path = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(Result::ok) + .map(|e| e.path()) + .find(|p| p.extension().is_some_and(|x| x == "json")) + .expect("a recovery sidecar must exist after the post-Phase-B crash"); + let mut v: serde_json::Value = + serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap(); + v["schema_version"] = serde_json::json!(1); + for table in v["tables"].as_array_mut().unwrap() { + table.as_object_mut().unwrap().remove("confirmed_version"); + } + std::fs::write(&path, serde_json::to_string_pretty(&v).unwrap()).unwrap(); + } + + // Reopen → the pre-upgrade completed merge must roll FORWARD (bob kept), not + // be silently discarded. + { + let db = Omnigraph::open(&uri).await.unwrap(); + assert_eq!( + sorted_person_names(&db).await, + vec!["alice", "bob"], + "a pre-confirmation (v1) BranchMerge sidecar over a completed merge must roll \ + forward, not be misread as a partial and rolled back", + ); + } +} + /// Branch-axis variant of the branch_merge recovery test: target is a /// non-main branch. Catches the branch-specific commit-graph head bug /// (D2) — without `CommitGraph::open_at_branch`, the recovery sweep @@ -3344,6 +3694,7 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { /// target, and future merges between the same pair would lose /// already-up-to-date detection. #[tokio::test] +#[serial(branch_merge_phase_b)] async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { use omnigraph::loader::{LoadMode, load_jsonl}; @@ -3468,6 +3819,7 @@ async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { /// keeps RewriteMerged tables on active_branch), the contract assertion /// catches a regression that reverts to `entry.table_branch.clone()`. #[tokio::test] +#[serial(branch_merge_phase_b)] async fn branch_merge_sidecar_pins_table_branch_to_active_branch() { use omnigraph::loader::{LoadMode, load_jsonl}; diff --git a/crates/omnigraph/tests/merge_fast_forward.rs b/crates/omnigraph/tests/merge_fast_forward.rs new file mode 100644 index 0000000..185f45d --- /dev/null +++ b/crates/omnigraph/tests/merge_fast_forward.rs @@ -0,0 +1,213 @@ +//! Fast-forward branch-merge cost + correctness. +//! +//! The data-path fix routes *new* rows of an adopted-source merge through +//! `stage_append` (a streaming `Operation::Append`) instead of lumping new + +//! changed rows into one `stage_merge_insert` (a full-outer hash join that +//! buffers the whole delta and exhausts the DataFusion memory pool on +//! embedding-bearing tables). +//! +//! The regression gate here is *structural*, not a brittle size threshold: it +//! asserts WHICH staged-write primitive the merge invokes, via the task-local +//! write probes in `omnigraph::instrumentation`. That is deterministic and +//! machine-independent — it cannot flake on a bigger memory pool. + +// Wrapping `branch_merge` in `with_merge_write_probes` (a task-local scope) +// nests the already-deep merge future one layer deeper, overflowing rustc's +// default 128 layout-query depth. Bump it for this test crate. +#![recursion_limit = "512"] + +mod helpers; + +use omnigraph::db::{MergeOutcome, Omnigraph}; +use omnigraph::instrumentation::{MergeWriteProbes, with_merge_write_probes}; + +use helpers::*; + +/// Insert `n` brand-new persons (fresh ids) onto `branch`, forking the Person +/// table onto it. All rows are "new on source" — none collide with base ids. +async fn append_new_persons(db: &mut Omnigraph, branch: &str, n: usize) { + for i in 0..n { + mutate_branch( + db, + branch, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", &format!("ff_new_{i}"))], &[("$age", 30)]), + ) + .await + .unwrap(); + } +} + +/// THE cost-budget gate. An append-only fast-forward merge must append the new +/// rows and run **zero** `stage_merge_insert` (the full-outer hash join that is +/// the OOM). RED today (new + changed are lumped into one `stage_merge_insert`); +/// GREEN once the adopt path splits new→`stage_append`, changed→`stage_merge_insert`. +#[tokio::test] +async fn append_only_fast_forward_merge_does_no_merge_insert() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let main = init_and_load(&dir).await; + main.branch_create("feature").await.unwrap(); + + let mut feature = Omnigraph::open(uri).await.unwrap(); + append_new_persons(&mut feature, "feature", 5).await; + + let probes = MergeWriteProbes::default(); + let outcome = + with_merge_write_probes(probes.clone(), main.branch_merge("feature", "main")) + .await + .unwrap(); + assert_eq!(outcome, MergeOutcome::FastForward); + + assert_eq!( + probes.stage_merge_insert_calls(), + 0, + "append-only fast-forward merge must do 0 stage_merge_insert (the OOM hash join); did {}", + probes.stage_merge_insert_calls(), + ); + assert!( + probes.stage_append_calls() >= 1, + "append-only fast-forward merge must append the new rows via stage_append; did {}", + probes.stage_append_calls(), + ); + assert_eq!( + probes.scan_staged_combined_calls(), + 0, + "append-only merge must stream the append (stage_append_stream), not materialize the \ + whole delta into one batch via scan_staged_combined; did {}", + probes.scan_staged_combined_calls(), + ); +} + +/// Functional correctness: a fast-forward merge of an append-only branch leaves +/// main equal to the source branch. Independent of the cost-budget gate. +#[tokio::test] +async fn fast_forward_merge_yields_source_state() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let main = init_and_load(&dir).await; + let base_count = count_rows(&main, "node:Person").await; + + main.branch_create("feature").await.unwrap(); + let mut feature = Omnigraph::open(uri).await.unwrap(); + append_new_persons(&mut feature, "feature", 5).await; + let source_count = count_rows_branch(&feature, "feature", "node:Person").await; + assert_eq!(source_count, base_count + 5); + + let outcome = main.branch_merge("feature", "main").await.unwrap(); + assert_eq!(outcome, MergeOutcome::FastForward); + + // main now equals source: the 5 new persons are present, the base rows kept. + assert_eq!(count_rows(&main, "node:Person").await, source_count); + let names = collect_column_strings(&read_table(&main, "node:Person").await, "name"); + for i in 0..5 { + assert!( + names.contains(&format!("ff_new_{i}")), + "merged main missing new person ff_new_{i}; have {names:?}" + ); + } +} + +const VEC_SCHEMA: &str = "node Chunk {\n slug: String @key\n embedding: Vector(8) @index\n}\n"; + +/// Commit 6 behavior: the fast-forward adopt path does NOT build indices inline +/// — index coverage is reconciler-owned (`optimize`/`ensure_indices`). A merge +/// into a freshly-initialized (unindexed) vector table must perform **0** inline +/// vector-index (IVF) builds; reads stay correct via brute-force until +/// `optimize` covers the new rows. RED before the change (the publish path built +/// the IVF inline); GREEN after. +#[tokio::test] +async fn fast_forward_merge_defers_vector_index_to_reconciler() { + use omnigraph::loader::LoadMode; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + // Empty Chunk table → no vector index at init (KMeans can't train on 0 rows). + let main = Omnigraph::init(uri, VEC_SCHEMA).await.unwrap(); + main.branch_create("feature").await.unwrap(); + + // Load embedding-bearing chunks onto the branch. The branch builds its own + // index here (outside the probe scope) — irrelevant to the merge's cost. + let mut rows = String::new(); + for i in 0..24 { + let v: Vec = (0..8).map(|j| format!("{}.0", (i + j) % 5)).collect(); + rows.push_str(&format!( + "{{\"type\":\"Chunk\",\"data\":{{\"slug\":\"c{i}\",\"embedding\":[{}]}}}}\n", + v.join(",") + )); + } + let feature = Omnigraph::open(uri).await.unwrap(); + feature.load("feature", &rows, LoadMode::Merge).await.unwrap(); + + // Merge, counting inline vector-index builds the publish path performs. + let probes = MergeWriteProbes::default(); + let outcome = with_merge_write_probes(probes.clone(), main.branch_merge("feature", "main")) + .await + .unwrap(); + assert_eq!(outcome, MergeOutcome::FastForward); + + assert_eq!( + probes.create_vector_index_calls(), + 0, + "fast-forward adopt merge must defer vector-index coverage to the reconciler \ + (0 inline IVF builds); did {}", + probes.create_vector_index_calls(), + ); + // Correctness: the rows landed on main (reads brute-force until optimize). + assert_eq!(count_rows(&main, "node:Chunk").await, 24); +} + +const BLOB_SCHEMA: &str = "node Document {\n title: String @key\n content: Blob?\n note: String?\n}\n"; +const BLOB_INSERT: &str = r#" +query insert_doc($title: String, $content: Blob, $note: String) { + insert Document { title: $title, content: $content, note: $note } +} +"#; + +/// A fast-forward merge of a branch with a Blob column exercises the blob +/// fallback in `scan_stream_for_rewrite` (materialize → re-stream) through the +/// streaming append. main is NOT mutated, so Document is `AdoptWithDelta` (the +/// adopt/append path), not `RewriteMerged`. The blob bytes must survive the +/// materialize → stream → append round-trip. +#[tokio::test] +async fn fast_forward_merge_streams_blob_columns() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let mut main = Omnigraph::init(uri, BLOB_SCHEMA).await.unwrap(); + load_jsonl( + &mut main, + "{\"type\":\"Document\",\"data\":{\"title\":\"seed\",\"content\":\"base64:U2VlZA==\",\"note\":\"base\"}}", + LoadMode::Overwrite, + ) + .await + .unwrap(); + main.branch_create("feature").await.unwrap(); + + // Only the branch is mutated → fast-forward → adopt/append path. + let mut feature = Omnigraph::open(uri).await.unwrap(); + mutate_branch( + &mut feature, + "feature", + BLOB_INSERT, + "insert_doc", + ¶ms(&[ + ("$title", "readme"), + ("$content", "base64:SGVsbG8="), + ("$note", "branch"), + ]), + ) + .await + .unwrap(); + + let outcome = main.branch_merge("feature", "main").await.unwrap(); + assert_eq!(outcome, MergeOutcome::FastForward); + + // The appended blob row's bytes survive the streaming append; the base row stays intact. + let readme = main.read_blob("Document", "readme", "content").await.unwrap(); + assert_eq!(&readme.read().await.unwrap()[..], b"Hello"); + let seed = main.read_blob("Document", "seed", "content").await.unwrap(); + assert_eq!(&seed.read().await.unwrap()[..], b"Seed"); +} diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index b5ca58f..ed47811 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -104,8 +104,10 @@ async fn recovery_refuses_unknown_schema_version_on_open() { let _db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); drop(_db); - // A sidecar from a hypothetical future writer; the older binary must - // refuse to interpret it (resolved-decisions §3 in the design doc). + // A sidecar from a hypothetical future writer (version NEWER than this + // binary's max); the reader must refuse to interpret it — it cannot guess + // semantics a newer writer baked in. (Older versions are accepted and + // interpreted with their original semantics; see `parse_sidecar`.) let sidecar_json = r#"{ "schema_version": 99, "operation_id": "01H000000000000000000000ZZ", @@ -120,11 +122,11 @@ async fn recovery_refuses_unknown_schema_version_on_open() { let err = Omnigraph::open(uri) .await .err() - .expect("expected open to fail because of unknown sidecar schema_version"); + .expect("expected open to fail because of a future sidecar schema_version"); let msg = err.to_string(); assert!( - msg.contains("schema_version=99") && msg.contains("supports only schema_version=1"), - "expected SidecarSchemaError mentioning the version mismatch, got: {}", + msg.contains("schema_version=99") && msg.contains("newer than the maximum"), + "expected a future-version refusal, got: {}", msg, ); // Sidecar must still be on disk — we don't auto-delete unparseable files. diff --git a/docs/dev/writes.md b/docs/dev/writes.md index 01c166e..c4e174c 100644 --- a/docs/dev/writes.md +++ b/docs/dev/writes.md @@ -178,6 +178,17 @@ are left at `Lance HEAD = manifest_pinned + 1`. post_commit_pin)` it intends to commit + the writer kind + actor_id. 2. **Phase B**: writer's per-table `commit_staged` loop runs. + - **Phase-B confirmation (`BranchMerge` only)**: a `BranchMerge` writer + advances each table's HEAD by *several* commits (append → upsert → + delete), so a bare "HEAD moved" is ambiguous — it could be a complete + publish or one crashed mid-sequence. After the whole per-table loop + finishes, the writer re-writes the sidecar stamping each pin's + `confirmed_version` with the exact achieved version, then proceeds to + Phase C. This is the commit point of the recovery WAL: a crash *after* + confirmation rolls forward to those versions; a crash *during* Phase B + (sidecar still unconfirmed) rolls back. Other writers don't confirm — + their drift is derived state (index coverage, compaction) that a partial + roll-forward never corrupts. 3. **Phase C**: publisher commits the manifest. 4. **Phase D**: writer deletes the sidecar. @@ -197,7 +208,10 @@ recovery sweep in `crates/omnigraph/src/db/manifest/recovery.rs`: - For each sidecar in `__recovery/`, compare every named table's Lance HEAD to the manifest pin. Classify per the all-or-nothing decision tree (RolledPastExpected / NoMovement / UnexpectedAtP1 / - UnexpectedMultistep / InvariantViolation). + UnexpectedMultistep / IncompletePhaseB / InvariantViolation). For a + `BranchMerge` sidecar, a moved HEAD with no `confirmed_version` classifies + as `IncompletePhaseB` (a partial multi-commit publish) and forces roll-back; + with a `confirmed_version`, roll-forward targets exactly that version. - If any table is `InvariantViolation` (Lance HEAD < manifest pinned — should be impossible), **abort** with a loud error and leave the sidecar on disk for operator review. diff --git a/docs/user/branching/merge.md b/docs/user/branching/merge.md index fde2fab..cb54ed6 100644 --- a/docs/user/branching/merge.md +++ b/docs/user/branching/merge.md @@ -22,6 +22,25 @@ A merge resolves to one of three outcomes: simply advances to the source. - **Merged** — both sides diverged; a new merge commit is created with two parents. +## Indexes after a merge + +A **fast-forward** merge (the common case — the target had no conflicting +changes, so the source's rows are adopted) does not build or rebuild indexes on +the rows it brings into the target. Newly merged rows (and any index a table does +not yet have) are covered the next time `optimize` runs — indexes are derived +state, and reads stay correct in the meantime via brute-force scan over the +not-yet-covered rows. This keeps a fast-forward merge fast (it never pays an +inline vector/FTS rebuild on the publish path), at the cost of brute-force search +latency on freshly merged rows until the next `optimize`. + +A **three-way** merge (the `Merged` outcome — both branches changed the table and +the rows were reconciled) still rebuilds the table's indexes inline today, as part +of the publish. So a Merged-outcome merge of an embedding-bearing table pays the +index-build cost up front. + +Either way, run `omnigraph optimize` after a large merge to restore (or, for the +fast-forward path, establish) full index coverage. + ## Conflicts When both branches changed the same data incompatibly, the merge fails with a