From 2ce4efc450889d34cf2d075c45d55bd7f4bcfcef Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Mon, 4 May 2026 11:34:18 +0200 Subject: [PATCH] recovery: four review-round-4 fixes + branch-axis test matrix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit D1. roll_forward_all returns per-table actual published versions; the audit row's `to_version` records that, not pin.post_commit_pin (the latter is a lower bound for loose-match writers SchemaApply / EnsureIndices / BranchMerge — pin.post_commit_pin = expected + 1 while actual published HEAD can be expected + N). D2. Branch-merge recovery audit uses CommitGraph::open_at_branch when sidecar.branch is Some, so the merge parent is the TARGET BRANCH's tip (not the global head). Without this, recovered branch_merge on a non-main target records the wrong merged_parent_commit_id and future merges between the same pair lose already-up-to-date detection / merge-base correctness. D3. Omnigraph::refresh now mirrors open's recovery composition: runs recover_schema_state_files BEFORE recover_manifest_drift. Without this, a SchemaApply sidecar processed via refresh would publish the manifest + delete the sidecar without renaming the staging schema files, leaving the repo with new-schema data and old `_schema.pg` (real corruption). Refresh's docstring now enumerates each open-time recovery step it maintains, so the next maintainer's diff between open() and refresh() is trivial. D4. ensure_indices sidecar pin records `active_branch` (where commits actually land), not `entry.table_branch` (where the table currently lives). On first fork-on-write, the processing loop's open_owned_dataset_for_branch_write forks to active_branch and the commit lands there — recovery's open_lance_head must check the same branch. Without this, recovery checks the wrong ref and misses Phase B drift entirely. D5. Two new branch-axis tests: * recovery_rolls_back_feature_branch_sidecar_against_feature_branch — feature-branch rollback variant; asserts post-recovery audit kind == RolledBack and the actual restore commit landed on the feature ref. * branch_merge_phase_b_failure_recovered_on_non_main_target — non-main merge target variant; reads the target branch's commit graph (Lance ref) and asserts the recovery commit has a non-null merged_parent_commit_id (pins D2). Bug pattern: all four are at composition seams between concepts that were each tested individually (writer-precision × actual-Lance-HEAD; branch-context × commit-graph-API; recovery-path × writer-kind; pin- time-branch × commit-time-branch). The branch-axis matrix is the cheapest mechanical prevention for D2/D4-class regressions. All workspace tests pass with --features failpoints. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph/src/db/manifest/recovery.rs | 47 ++++-- crates/omnigraph/src/db/omnigraph.rs | 38 ++++- .../omnigraph/src/db/omnigraph/table_ops.rs | 16 +- crates/omnigraph/tests/failpoints.rs | 140 ++++++++++++++++++ crates/omnigraph/tests/recovery.rs | 138 ++++++++++++++++- 5 files changed, 355 insertions(+), 24 deletions(-) diff --git a/crates/omnigraph/src/db/manifest/recovery.rs b/crates/omnigraph/src/db/manifest/recovery.rs index 2ede7f0..bcbe769 100644 --- a/crates/omnigraph/src/db/manifest/recovery.rs +++ b/crates/omnigraph/src/db/manifest/recovery.rs @@ -656,14 +656,22 @@ async fn process_sidecar( "recovery: rolling forward sidecar (Phase B completed; \ Phase C did not land)" ); - let new_manifest_version = roll_forward_all(root_uri, sidecar).await?; + let (new_manifest_version, published_versions) = + roll_forward_all(root_uri, sidecar).await?; + // `to_version` records the ACTUAL Lance HEAD published for + // each table (not pin.post_commit_pin, which is a lower bound + // for loose-match writers like SchemaApply / EnsureIndices / + // BranchMerge that run multiple commit_staged calls per table). let outcomes: Vec = sidecar .tables .iter() .map(|pin| TableOutcome { table_key: pin.table_key.clone(), from_version: pin.expected_version, - to_version: pin.post_commit_pin, + to_version: published_versions + .get(&pin.table_key) + .copied() + .unwrap_or(pin.post_commit_pin), }) .collect(); record_audit( @@ -691,9 +699,19 @@ async fn process_sidecar( /// contention; persistent contention surfaces the typed conflict error to /// the recovery sweep, which leaves the sidecar in place for the next /// open's retry. -async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result { +/// Returns `(new_manifest_version, per_table_published_versions)`. The +/// per-table map is what the audit row's `to_version` should record — +/// for loose-match writers the actual Lance HEAD can be higher than the +/// sidecar's `post_commit_pin` (which is a lower bound), so the pin is +/// the wrong source of truth for an operator-facing audit field. +async fn roll_forward_all( + root_uri: &str, + sidecar: &RecoverySidecar, +) -> Result<(u64, HashMap)> { let mut updates: Vec = Vec::with_capacity(sidecar.tables.len()); let mut expected: HashMap = HashMap::with_capacity(sidecar.tables.len()); + let mut published_versions: HashMap = + HashMap::with_capacity(sidecar.tables.len()); for pin in &sidecar.tables { // Open the dataset at its CURRENT Lance HEAD on the pin's branch @@ -735,11 +753,12 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result, ) -> Result<()> { - let mut graph = CommitGraph::open(root_uri).await?; // BranchMerge sidecars carry the source branch's HEAD commit id so // recovery can record this as a MERGE commit (with parent linkage) // instead of a plain commit. Without the merge parent, future // `branch_merge feature → main` between the same pair would not // recognize "already up-to-date" and merge-base computations break. + // + // For BranchMerge on a non-main target, the parent commit id is the + // TARGET BRANCH's tip — `CommitGraph::open()` returns the global + // commit graph whose `head_commit_id()` is the global head and would + // record the wrong parent. Open the per-branch instance instead. let graph_commit_id = match ( sidecar.writer_kind, sidecar.merge_source_commit_id.as_deref(), kind, ) { (SidecarKind::BranchMerge, Some(source_id), RecoveryKind::RolledForward) => { - // For BranchMerge roll-forward, fetch the current branch - // tip as the parent — at open-time recovery this is the - // pre-merge tip (no other writers have run yet). + let mut branch_graph = match sidecar.branch.as_deref() { + Some(target_branch) => { + CommitGraph::open_at_branch(root_uri, target_branch).await? + } + None => CommitGraph::open(root_uri).await?, + }; let parent_commit_id = - graph.head_commit_id().await?.unwrap_or_default(); - graph + branch_graph.head_commit_id().await?.unwrap_or_default(); + branch_graph .append_merge_commit( sidecar.branch.as_deref(), manifest_version, @@ -788,6 +814,7 @@ async fn record_audit( .await? } _ => { + let mut graph = CommitGraph::open(root_uri).await?; graph .append_commit( sidecar.branch.as_deref(), diff --git a/crates/omnigraph/src/db/omnigraph.rs b/crates/omnigraph/src/db/omnigraph.rs index 33f7f93..699d98f 100644 --- a/crates/omnigraph/src/db/omnigraph.rs +++ b/crates/omnigraph/src/db/omnigraph.rs @@ -376,16 +376,32 @@ impl Omnigraph { } /// Re-read the handle-local coordinator state from storage AND run - /// roll-forward-only recovery: closes the in-process Phase B → Phase C - /// residual (e.g. `MutationStaging::finalize` crash mid-publish in a - /// long-running server) without restart. Roll-forward uses - /// `ManifestBatchPublisher::publish`'s row-level CAS — safe under - /// concurrency. Sidecars that would require `Dataset::restore` are - /// deferred to the next ReadWrite open (restore can silently orphan - /// a concurrent writer's commit if invoked under concurrency). + /// in-process recovery. Closes the Phase B → Phase C residual (e.g. + /// `MutationStaging::finalize` crash mid-publish in a long-running + /// server) without restart. + /// + /// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s + /// recovery sequence, in the same order, with one restriction: the + /// manifest-drift sweep runs in `RollForwardOnly` mode (rollback / + /// abort cases defer to the next ReadWrite open because + /// `Dataset::restore` is unsafe under concurrency). Each step: + /// + /// 1. `coordinator.refresh()` — re-read manifest. + /// 2. `recover_schema_state_files` — complete an in-flight + /// schema_apply's staging→final rename if a SchemaApply sidecar + /// is on disk; idempotent + early-returns when no staging files + /// exist. Required BEFORE manifest-drift recovery so a + /// SchemaApply roll-forward doesn't publish the manifest while + /// the staging files remain unrenamed (which would corrupt the + /// repo: data on new schema, catalog on old). + /// 3. `recover_manifest_drift(... RollForwardOnly)` — close the + /// finalize→publisher residual via roll-forward; defer rollback + /// work to next ReadWrite open. + /// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches. /// /// Steady state cost: one `list_dir` of `__recovery/` (typically - /// returns empty → early return). No additional Lance reads. + /// returns empty → early return for both passes). No additional + /// Lance reads. /// /// Engine-internal callers that already hold an in-flight sidecar /// (e.g. `schema_apply` mid-write) MUST use @@ -393,6 +409,12 @@ impl Omnigraph { /// avoid the recovery sweep racing their own sidecar. pub async fn refresh(&mut self) -> Result<()> { self.coordinator.refresh().await?; + recover_schema_state_files( + &self.root_uri, + Arc::clone(&self.storage), + &self.coordinator.snapshot(), + ) + .await?; crate::db::manifest::recover_manifest_drift( &self.root_uri, Arc::clone(&self.storage), diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 1e8843b..97ff84d 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -81,7 +81,13 @@ pub(super) async fn ensure_indices_for_branch( table_path: full_path, expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, - table_branch: entry.table_branch.clone(), + // Use active_branch (where commits actually land), NOT + // entry.table_branch (where the table currently lives). + // open_owned_dataset_for_branch_write forks a feature + // branch from a main-branch table on first write — the + // resulting commit lands on active_branch. Recovery's + // open_lance_head must check the same branch. + table_branch: active_branch.clone(), }); } } @@ -102,7 +108,13 @@ pub(super) async fn ensure_indices_for_branch( table_path: full_path, expected_version: entry.table_version, post_commit_pin: entry.table_version + 1, - table_branch: entry.table_branch.clone(), + // Use active_branch (where commits actually land), NOT + // entry.table_branch (where the table currently lives). + // open_owned_dataset_for_branch_write forks a feature + // branch from a main-branch table on first write — the + // resulting commit lands on active_branch. Recovery's + // open_lance_head must check the same branch. + table_branch: active_branch.clone(), }); } } diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index f1352d4..b5acedb 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -926,6 +926,146 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { drop(db); } +/// Branch-axis variant of the branch_merge recovery test: target is a +/// non-main branch. Catches the branch-specific commit-graph head bug +/// (D2) — without `CommitGraph::open_at_branch`, the recovery sweep +/// would record the global head as the merge parent on a non-main +/// target, and future merges between the same pair would lose +/// already-up-to-date detection. +#[tokio::test] +async fn branch_merge_phase_b_failure_recovered_on_non_main_target() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // Setup: + // main: alice + // target_branch (off main): + bob (target moved past base) + // source_branch (off main): + carol (source moved past base) + // Merge: source_branch → target_branch + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + db.branch_create("target_branch").await.unwrap(); + db.mutate( + "target_branch", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Bob")], &[("$age", 40)]), + ) + .await + .unwrap(); + db.branch_create("source_branch").await.unwrap(); + db.mutate( + "source_branch", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Carol")], &[("$age", 50)]), + ) + .await + .unwrap(); + } + + // Phase A: failpoint fires after the per-table publish loop completes + // but before commit_manifest_updates. Sidecar persists with + // branch=Some("target_branch"). + { + let mut db = Omnigraph::open(&uri).await.unwrap(); + let _failpoint = + ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + let err = db + .branch_merge("source_branch", "target_branch") + .await + .unwrap_err(); + assert!( + err.to_string().contains( + "injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit" + ), + "unexpected error: {err}" + ); + let recovery_dir = dir.path().join("__recovery"); + let sidecar_count = std::fs::read_dir(&recovery_dir).unwrap().count(); + assert_eq!( + sidecar_count, + 1, + "exactly one sidecar must persist after non-main branch_merge failure" + ); + } + + // Phase B: reopen runs full sweep. The BranchMerge sidecar's branch + // = Some("target_branch"); D2 fix opens a per-branch CommitGraph + // for the audit append so the merge-parent linkage is correct. + let _db = Omnigraph::open(&uri).await.unwrap(); + + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + let remaining: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert!( + remaining.is_empty(), + "sidecar must be deleted; remaining: {:?}", + remaining, + ); + } + + // Audit row for a merge commit was recorded with a non-null + // merged_parent_commit_id — proves the recovery sweep used the + // branch-specific commit-graph head as parent (not the global + // head). Also assert the recovery audit's recovery_kind == + // RolledForward. + use arrow_array::{Array, StringArray}; + use futures::TryStreamExt; + let commits_dir = dir.path().join("_graph_commits.lance"); + // Recovery wrote the merge commit to the target_branch's Lance ref + // on the commit_graph dataset (per CommitGraph::open_at_branch). + // Open at that ref to find the merge commit. + let ds = lance::Dataset::open(commits_dir.to_str().unwrap()) + .await + .unwrap() + .checkout_branch("target_branch") + .await + .unwrap(); + let batches: Vec = ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + let mut found_recovery_merge = false; + for batch in batches { + let merged = batch + .column_by_name("merged_parent_commit_id") + .expect("merged_parent_commit_id column present") + .as_any() + .downcast_ref::() + .expect("merged_parent_commit_id is Utf8"); + for i in 0..merged.len() { + if !merged.is_null(i) { + found_recovery_merge = true; + break; + } + } + } + assert!( + found_recovery_merge, + "non-main branch_merge recovery must record `merged_parent_commit_id` on the \ + target branch's commit graph", + ); +} + /// `ensure_indices` only writes a sidecar when at least one table /// genuinely needs index work (per `needs_index_work_*` helpers in /// `db/omnigraph/table_ops.rs`). When all tables are steady-state diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index e5f2388..2880692 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -1121,11 +1121,141 @@ async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() { v_pin, v_head, post_entry.table_version, ); - // Audit row recorded for the recovery action. + // Audit row recorded for the recovery action — and the row's + // recovery_kind == RolledForward (proves the branch-aware classifier + // got us through the eligible path; without it, the snapshot lookup + // against main's pin would have produced NoMovement → RollBack). + let kinds = list_recovery_audit_kinds(dir.path()).await; assert_eq!( - count_recovery_audit_rows(dir.path()).await, - 1, - "feature-branch sidecar recovery must record one audit row", + kinds, vec!["RolledForward".to_string()], + "feature-branch sidecar recovery must record exactly one RolledForward audit row; got {:?}", + kinds, + ); +} + +/// Companion to the roll-forward feature-branch test: branch-axis +/// rollback. Synthesize a feature-branch sidecar that classifies as +/// rollback-eligible (UnexpectedAtP1) and assert the recovery sweep +/// processes the sidecar against the FEATURE branch (not main) and runs +/// the rollback. Without branch-aware recovery, classification would +/// happen against main's snapshot/HEAD — likely NoMovement → no-op +/// rollback that doesn't touch the actually-drifted feature ref. +#[tokio::test] +async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() { + use omnigraph::loader::{LoadMode, load_jsonl}; + use omnigraph::table_store::TableStore; + + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + + let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + db.mutate( + "feature", + helpers::MUTATION_QUERIES, + "insert_person", + &helpers::mixed_params(&[("$name", "bob")], &[("$age", 40)]), + ) + .await + .unwrap(); + + let feature_snapshot = db + .snapshot_of(omnigraph::db::ReadTarget::branch("feature")) + .await + .unwrap(); + let feature_entry = feature_snapshot + .entry("node:Person") + .expect("feature snapshot must have Person entry"); + let v_pin = feature_entry.table_version; + let feature_branch_name = feature_entry.table_branch.clone(); + drop(db); + + // Bypass the manifest: append on the feature ref to advance HEAD past + // the manifest pin. + let person_uri = node_table_uri(uri, "Person"); + let store = TableStore::new(uri); + let mut ds = store + .open_dataset_head(&person_uri, feature_branch_name.as_deref()) + .await + .unwrap(); + store + .append_batch( + &person_uri, + &mut ds, + person_batch(&[("dave-id", "dave", Some(50))]), + ) + .await + .unwrap(); + let v_head = ds.version().version; + assert_eq!(v_head, v_pin + 1); + + // Sidecar with bogus expected (mismatch) AND post_commit_pin == v_head. + // Strict Mutation classifier sees lance_head == manifest_pinned + 1 + // but expected != manifest_pinned → UnexpectedAtP1 → RollBack. + let bogus_expected = v_pin.saturating_sub(1); + let sidecar_json = format!( + r#"{{ + "schema_version": 1, + "operation_id": "01H0000000000000000000FRB1", + "started_at": "0", + "branch": "feature", + "actor_id": "act-feature-rb", + "writer_kind": "Mutation", + "tables": [ + {{ + "table_key":"node:Person", + "table_path":"{}", + "expected_version":{}, + "post_commit_pin":{}, + "table_branch":{} + }} + ] + }}"#, + person_uri, + bogus_expected, + v_head, + match &feature_branch_name { + Some(b) => format!("\"{}\"", b), + None => "null".to_string(), + }, + ); + write_sidecar_file(dir.path(), "01H0000000000000000000FRB1", &sidecar_json); + + // Reopen with full sweep — RollBack is allowed at open time. + let _db = Omnigraph::open(uri).await.unwrap(); + assert!( + list_recovery_dir(dir.path()).is_empty(), + "feature-branch rollback sidecar must be deleted after recovery" + ); + + // Audit kind == RolledBack (proves classifier saw feature's HEAD, + // not main's; main's view of Person would be NoMovement → no audit + // row attribution). + let kinds = list_recovery_audit_kinds(dir.path()).await; + assert_eq!( + kinds, vec!["RolledBack".to_string()], + "feature-branch rollback must record one RolledBack audit row; got {:?}", + kinds, + ); + + // Lance HEAD on the feature ref must have advanced (real restore ran). + let post = store + .open_dataset_head(&person_uri, feature_branch_name.as_deref()) + .await + .unwrap(); + assert!( + post.version().version > v_head, + "real restore must have appended a commit on feature; v_head={}, post={}", + v_head, + post.version().version, ); }