From 35c4b16e915c1ce86342c1e5522edcb9dc3e07d2 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sun, 3 May 2026 15:09:58 +0200 Subject: [PATCH] recovery: address five outstanding review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A1. tests/recovery.rs: rewrite recovery_multi_sidecar_requires_fresh_snapshot_for_correctness to use real `append_batch` instead of fragment-preserving `delete_where("1 = 2")`. The previous setup made restore_table_to_version's fragment-set short-circuit no-op the bug path, so the load-bearing `HEAD == v3` assertion passed in both bug and fix paths. Real appends produce different fragment-id sets across v1, v2, v3 so a real restore actually runs in the bug path (HEAD becomes v4). Added a person_batch helper matching the post-init Lance schema (id, age, name). A2. exec/merge.rs: filter recovery sidecar pins to `RewriteMerged` candidates only. `AdoptSourceState`'s pure-pointer-switch and fork subcases don't advance Lance HEAD; pinning them would force NoMovement on recovery and trigger an all-or-nothing rollback that destroys legit RewriteMerged work. Documented residual: AdoptSourceState subcases that internally call publish_rewritten_merge_table aren't covered by the sidecar; closing that requires pre-computing source deltas during candidate classification (a structural change to CandidateTableState) — left as follow-up. A3. db/omnigraph/table_ops.rs: add the same branch filter (`active_branch.is_some() && entry.table_branch.is_none() => continue`) to the ensure_indices sidecar pin loop that the processing loop already has. Without this, main-branch tables that need index work get pinned but never committed when ensure_indices runs on a feature branch → NoMovement → all-or-nothing rollback destroys feature-branch work. A4. tests/failpoints.rs: deepen schema_apply_phase_b_failure and branch_merge_phase_b_failure tests with post-recovery manifest-pin advance assertions. branch_merge test setup also mutates main so the merge produces at least one RewriteMerged candidate (required after A2's pin filter — a no-op merge with all-AdoptSourceState would write no sidecar). Fixed stale "BranchMerge is strict-classified" comment to reflect current loose classification. A5. tests/composite_flow.rs: remove duplicate back-to-back `total_people` query in step 12. Full workspace test sweep with --features failpoints passes: no regressions. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../omnigraph/src/db/omnigraph/table_ops.rs | 12 ++++ crates/omnigraph/src/exec/merge.rs | 37 ++++++++--- crates/omnigraph/tests/composite_flow.rs | 10 --- crates/omnigraph/tests/failpoints.rs | 65 ++++++++++++++++--- crates/omnigraph/tests/recovery.rs | 65 +++++++++++++++---- 5 files changed, 152 insertions(+), 37 deletions(-) diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index 138d43d..343b666 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -57,6 +57,15 @@ pub(super) async fn ensure_indices_for_branch( let Some(entry) = snapshot.entry(&table_key) else { continue; }; + // Match the processing loop's branch filter: when running on a + // feature branch, main-branch tables (table_branch = None) are + // skipped (`None => continue` at ~line 118). Pinning them here + // would force NoMovement on recovery and trigger an all-or- + // nothing rollback of legitimately-committed work on the + // feature-branch tables. + if active_branch.is_some() && entry.table_branch.is_none() { + continue; + } let full_path = format!("{}/{}", db.root_uri, entry.table_path); if needs_index_work_node( db, @@ -80,6 +89,9 @@ pub(super) async fn ensure_indices_for_branch( let Some(entry) = snapshot.entry(&table_key) else { continue; }; + if active_branch.is_some() && entry.table_branch.is_none() { + continue; + } let full_path = format!("{}/{}", db.root_uri, entry.table_path); if needs_index_work_edge(db, &table_key, &full_path, entry.table_branch.as_deref()) .await? diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index e46a634..388ff92 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1168,17 +1168,38 @@ impl Omnigraph { validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?; // Recovery sidecar: protect the per-table commit_staged loop. - // Pins every table that will be touched by - // `publish_adopted_source_state` or `publish_rewritten_merge_table`. - // BranchMerge uses loose classification — the publish path may - // run multiple commit_staged calls per table - // (publish_rewritten_merge_table does stage_merge_insert + - // delete_where + index rebuilds per the existing branch-merge - // code path). + // Pin only `RewriteMerged` candidates because they always + // advance Lance HEAD through `publish_rewritten_merge_table` + // (which runs stage_merge_insert + delete_where + index + // rebuilds — multiple commit_staged calls per table; loose + // classification handles the multi-step drift). + // + // `AdoptSourceState` candidates are NOT pinned: their publish + // path is `publish_adopted_source_state`, whose subcases mostly + // don't advance Lance HEAD (pure manifest pointer switch, or + // fork via `fork_dataset_from_entry_state` which only adds a + // Lance branch ref). If those subcases were pinned, recovery + // would classify them as NoMovement and the all-or-nothing + // decision would force a rollback that destroys legitimately- + // committed work on sibling RewriteMerged tables. + // + // Residual: two `AdoptSourceState` subcases (when source has a + // table_branch AND the source delta is non-empty) internally + // call `publish_rewritten_merge_table` and DO advance HEAD. + // Those are not covered by this sidecar — if they fail mid- + // commit, the residual persists until the next ReadWrite open + // detects it via a subsequent ExpectedVersionMismatch from a + // later writer that touches the same table. Closing this gap + // requires pre-computing source deltas during candidate + // classification (a structural change to `CandidateTableState`) + // and is left as follow-up work. let recovery_pins: Vec = ordered_table_keys .iter() - .filter(|tk| candidates.contains_key(*tk)) .filter_map(|table_key| { + let candidate = candidates.get(table_key)?; + if !matches!(candidate, CandidateTableState::RewriteMerged(_)) { + return None; + } let entry = target_snapshot.entry(table_key)?; Some(crate::db::manifest::SidecarTablePin { table_key: table_key.clone(), diff --git a/crates/omnigraph/tests/composite_flow.rs b/crates/omnigraph/tests/composite_flow.rs index d7a8725..6ddab68 100644 --- a/crates/omnigraph/tests/composite_flow.rs +++ b/crates/omnigraph/tests/composite_flow.rs @@ -368,14 +368,4 @@ async fn composite_flow_init_load_branch_merge_time_travel_optimize_cleanup() { .await .unwrap(); assert!(!final_total.batches().is_empty()); - - let final_total = query_main( - &mut db, - TEST_QUERIES, - "total_people", - &ParamMap::default(), - ) - .await - .unwrap(); - assert!(!final_total.batches().is_empty()); } diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index f782419..f9cdfb5 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -6,7 +6,7 @@ use fail::FailScenario; use omnigraph::db::Omnigraph; use omnigraph::failpoints::ScopedFailPoint; -use helpers::{MUTATION_QUERIES, mixed_params, mutate_main}; +use helpers::{MUTATION_QUERIES, mixed_params, mutate_main, version_main}; const SCHEMA_V1: &str = "node Person { name: String @key }\n"; const SCHEMA_V2_ADDED_TYPE: &str = @@ -421,6 +421,13 @@ async fn schema_apply_phase_b_failure_recovered_on_next_open() { .unwrap(); } + // Capture pre-failure manifest version so we can assert the recovery + // sweep advances it. + let pre_failure_version = { + let db = Omnigraph::open(&uri).await.unwrap(); + version_main(&db).await.unwrap() + }; + // Phase A: trigger the residual via `schema_apply.after_staging_write`. // This failpoint fires AFTER the rewritten_tables/indexed_tables loops // (Lance HEAD advanced) AND AFTER the schema-state staging files are @@ -479,7 +486,7 @@ edge WorksAt: Person -> Company // SchemaApply (loose-match) — classifier accepts the multi-commit // drift on Person, decision is RollForward, manifest extends to the // current Lance HEAD. - let _db = Omnigraph::open(&uri).await.unwrap(); + let db = Omnigraph::open(&uri).await.unwrap(); // Sidecar gone, audit row recorded. let recovery_dir = dir.path().join("__recovery"); @@ -499,6 +506,16 @@ edge WorksAt: Person -> Company audit_dir.exists(), "_graph_commit_recoveries.lance must exist after schema_apply recovery" ); + + // Recovery sweep must have advanced the manifest pin on the rewritten + // table: roll-forward published the post-failure Lance HEAD. + let post_recovery_version = version_main(&db).await.unwrap(); + assert!( + post_recovery_version > pre_failure_version, + "manifest version must advance post-recovery; pre={pre_failure_version}, \ + post={post_recovery_version}", + ); + drop(db); } #[tokio::test] @@ -509,8 +526,12 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap().to_string(); - // Seed main with a row, branch off, mutate the branch, then attempt - // a merge with the failpoint active. + // Seed main with a row, branch off, mutate BOTH sides so the merge + // produces at least one `RewriteMerged` candidate (target moved past + // base too — required for the recovery sidecar to pin anything; the + // sidecar only pins RewriteMerged candidates because they're the + // only path that always advances Lance HEAD via + // `publish_rewritten_merge_table`). { let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); load_jsonl( @@ -530,8 +551,24 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { ) .await .unwrap(); + // Mutate main too so the merge sees target ≠ base for Person — + // forces RewriteMerged classification. + mutate_main( + &mut db, + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Carol")], &[("$age", 50)]), + ) + .await + .unwrap(); } + // Capture pre-failure state on main for post-recovery comparison. + let pre_failure_version = { + let db = Omnigraph::open(&uri).await.unwrap(); + version_main(&db).await.unwrap() + }; + // Phase A: failpoint fires after the per-table publish loop completes // but before commit_manifest_updates. Sidecar persists. { @@ -558,10 +595,13 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { ); } - // Phase B: reopen runs the sweep. BranchMerge is strict-classified - // (single commit_staged per table for the merge_insert path), so the - // sidecar's post_commit_pin should match observed Lance HEAD. - let _db = Omnigraph::open(&uri).await.unwrap(); + // Phase B: reopen runs the sweep. BranchMerge uses LOOSE + // classification — `publish_rewritten_merge_table` runs multiple + // commit_staged calls per table (stage_merge_insert + delete_where + + // index rebuilds), so post_commit_pin in the sidecar is a lower + // bound; the loose-match classifier accepts any HEAD > expected_version + // when expected_version == manifest_pinned. + let db = Omnigraph::open(&uri).await.unwrap(); let recovery_dir = dir.path().join("__recovery"); if recovery_dir.exists() { @@ -580,6 +620,15 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() { audit_dir.exists(), "_graph_commit_recoveries.lance must exist after branch_merge recovery" ); + + // Recovery must have advanced main's manifest pin (the merge published). + let post_recovery_version = version_main(&db).await.unwrap(); + assert!( + post_recovery_version > pre_failure_version, + "manifest version must advance post-recovery; pre={pre_failure_version}, \ + post={post_recovery_version}", + ); + drop(db); } /// `ensure_indices` only writes a sidecar when at least one table diff --git a/crates/omnigraph/tests/recovery.rs b/crates/omnigraph/tests/recovery.rs index edfbd16..cef1257 100644 --- a/crates/omnigraph/tests/recovery.rs +++ b/crates/omnigraph/tests/recovery.rs @@ -10,7 +10,10 @@ //! row recording. use std::path::Path; +use std::sync::Arc; +use arrow_array::{Int32Array, RecordBatch, StringArray}; +use arrow_schema::{DataType, Field, Schema}; use lance::Dataset; use omnigraph::db::Omnigraph; @@ -51,6 +54,30 @@ fn fnv1a64(bytes: &[u8]) -> u64 { hash } +/// Build a Person RecordBatch matching the post-init Lance schema: +/// `id: Utf8, age: Int32?, name: Utf8`. Used by tests that need to advance +/// Lance HEAD with real fragment changes (not no-op deletes) bypassing +/// `__manifest`. +fn person_batch(rows: &[(&str, &str, Option)]) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Utf8, false), + Field::new("age", DataType::Int32, true), + Field::new("name", DataType::Utf8, false), + ])); + let ids: Vec<&str> = rows.iter().map(|(id, _, _)| *id).collect(); + let names: Vec<&str> = rows.iter().map(|(_, name, _)| *name).collect(); + let ages: Vec> = rows.iter().map(|(_, _, age)| *age).collect(); + RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(ids)), + Arc::new(Int32Array::from(ages)), + Arc::new(StringArray::from(names)), + ], + ) + .unwrap() +} + #[tokio::test] async fn recovery_does_not_run_on_clean_open() { let dir = tempfile::tempdir().unwrap(); @@ -780,8 +807,11 @@ async fn recovery_ensure_indices_handles_empty_tables() { /// expected=v1, post=v2. /// - Sidecar B: kind=EnsureIndices (loose), refers to Person at /// expected=v2, post=v3. -/// - Lance HEAD for Person sits at v3 (both writers' Phase B fragments -/// chained but neither's Phase C landed). +/// - Lance HEAD for Person sits at v3, and v1, v2, v3 have DIFFERENT +/// fragment-id sets (each version added a real row via append_batch). +/// This means `restore_table_to_version`'s fragment-set short-circuit +/// does NOT fire under the bug path — a real `Dataset::restore` +/// actually runs there, producing HEAD=v4. /// /// Outcome paths: /// @@ -839,18 +869,31 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() { let mut ds = Dataset::open(&person_uri).await.unwrap(); let v1 = ds.version().version; - // Advance Lance HEAD twice to mimic two consecutive - // would-be-publishes that didn't land: - // - First "writer" advanced HEAD v1 → v2. - // - Second "writer" advanced HEAD v2 → v3. - // Manifest stays at v1 throughout because we're synthesizing. - let _ = store - .delete_where(&person_uri, &mut ds, "1 = 2") + // Advance Lance HEAD twice via raw append_batch to mimic two + // consecutive would-be-publishes that didn't land. Each append adds + // a new fragment, so v1, v2, v3 have DIFFERENT fragment-id sets — + // restore_table_to_version's fragment-set short-circuit will not + // fire when classifier dispatches to rollback (the + // differentiator we rely on). + // + // Bypassing __manifest is what `delete_where` and `append_batch` + // both do (direct on Lance); using append_batch (instead of no-op + // deletes) is what makes the fragment-set differ across versions. + store + .append_batch( + &person_uri, + &mut ds, + person_batch(&[("bob-id", "bob", Some(25))]), + ) .await .unwrap(); let v2 = ds.version().version; - let _ = store - .delete_where(&person_uri, &mut ds, "1 = 2") + store + .append_batch( + &person_uri, + &mut ds, + person_batch(&[("carol-id", "carol", Some(40))]), + ) .await .unwrap(); let v3 = ds.version().version;