recovery: address five outstanding review findings

A1. tests/recovery.rs: rewrite recovery_multi_sidecar_requires_fresh_snapshot_for_correctness
    to use real `append_batch` instead of fragment-preserving `delete_where("1 = 2")`.
    The previous setup made restore_table_to_version's fragment-set short-circuit
    no-op the bug path, so the load-bearing `HEAD == v3` assertion passed in both
    bug and fix paths. Real appends produce different fragment-id sets across v1,
    v2, v3 so a real restore actually runs in the bug path (HEAD becomes v4).
    Added a person_batch helper matching the post-init Lance schema (id, age, name).

A2. exec/merge.rs: filter recovery sidecar pins to `RewriteMerged` candidates
    only. `AdoptSourceState`'s pure-pointer-switch and fork subcases don't
    advance Lance HEAD; pinning them would force NoMovement on recovery and
    trigger an all-or-nothing rollback that destroys legit RewriteMerged work.
    Documented residual: AdoptSourceState subcases that internally call
    publish_rewritten_merge_table aren't covered by the sidecar; closing that
    requires pre-computing source deltas during candidate classification (a
    structural change to CandidateTableState) — left as follow-up.

A3. db/omnigraph/table_ops.rs: add the same branch filter
    (`active_branch.is_some() && entry.table_branch.is_none() => continue`)
    to the ensure_indices sidecar pin loop that the processing loop already
    has. Without this, main-branch tables that need index work get pinned but
    never committed when ensure_indices runs on a feature branch → NoMovement
    → all-or-nothing rollback destroys feature-branch work.

A4. tests/failpoints.rs: deepen schema_apply_phase_b_failure and
    branch_merge_phase_b_failure tests with post-recovery manifest-pin advance
    assertions. branch_merge test setup also mutates main so the merge
    produces at least one RewriteMerged candidate (required after A2's pin
    filter — a no-op merge with all-AdoptSourceState would write no sidecar).
    Fixed stale "BranchMerge is strict-classified" comment to reflect current
    loose classification.

A5. tests/composite_flow.rs: remove duplicate back-to-back `total_people`
    query in step 12.

Full workspace test sweep with --features failpoints passes: no regressions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-03 15:09:58 +02:00
parent 05e52f2ee0
commit 35c4b16e91
No known key found for this signature in database
5 changed files with 152 additions and 37 deletions

View file

@ -57,6 +57,15 @@ pub(super) async fn ensure_indices_for_branch(
let Some(entry) = snapshot.entry(&table_key) else {
continue;
};
// Match the processing loop's branch filter: when running on a
// feature branch, main-branch tables (table_branch = None) are
// skipped (`None => continue` at ~line 118). Pinning them here
// would force NoMovement on recovery and trigger an all-or-
// nothing rollback of legitimately-committed work on the
// feature-branch tables.
if active_branch.is_some() && entry.table_branch.is_none() {
continue;
}
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
if needs_index_work_node(
db,
@ -80,6 +89,9 @@ pub(super) async fn ensure_indices_for_branch(
let Some(entry) = snapshot.entry(&table_key) else {
continue;
};
if active_branch.is_some() && entry.table_branch.is_none() {
continue;
}
let full_path = format!("{}/{}", db.root_uri, entry.table_path);
if needs_index_work_edge(db, &table_key, &full_path, entry.table_branch.as_deref())
.await?

View file

@ -1168,17 +1168,38 @@ impl Omnigraph {
validate_merge_candidates(self, source_snapshot, &target_snapshot, &candidates).await?;
// Recovery sidecar: protect the per-table commit_staged loop.
// Pins every table that will be touched by
// `publish_adopted_source_state` or `publish_rewritten_merge_table`.
// BranchMerge uses loose classification — the publish path may
// run multiple commit_staged calls per table
// (publish_rewritten_merge_table does stage_merge_insert +
// delete_where + index rebuilds per the existing branch-merge
// code path).
// Pin only `RewriteMerged` candidates because they always
// advance Lance HEAD through `publish_rewritten_merge_table`
// (which runs stage_merge_insert + delete_where + index
// rebuilds — multiple commit_staged calls per table; loose
// classification handles the multi-step drift).
//
// `AdoptSourceState` candidates are NOT pinned: their publish
// path is `publish_adopted_source_state`, whose subcases mostly
// don't advance Lance HEAD (pure manifest pointer switch, or
// fork via `fork_dataset_from_entry_state` which only adds a
// Lance branch ref). If those subcases were pinned, recovery
// would classify them as NoMovement and the all-or-nothing
// decision would force a rollback that destroys legitimately-
// committed work on sibling RewriteMerged tables.
//
// Residual: two `AdoptSourceState` subcases (when source has a
// table_branch AND the source delta is non-empty) internally
// call `publish_rewritten_merge_table` and DO advance HEAD.
// Those are not covered by this sidecar — if they fail mid-
// commit, the residual persists until the next ReadWrite open
// detects it via a subsequent ExpectedVersionMismatch from a
// later writer that touches the same table. Closing this gap
// requires pre-computing source deltas during candidate
// classification (a structural change to `CandidateTableState`)
// and is left as follow-up work.
let recovery_pins: Vec<crate::db::manifest::SidecarTablePin> = ordered_table_keys
.iter()
.filter(|tk| candidates.contains_key(*tk))
.filter_map(|table_key| {
let candidate = candidates.get(table_key)?;
if !matches!(candidate, CandidateTableState::RewriteMerged(_)) {
return None;
}
let entry = target_snapshot.entry(table_key)?;
Some(crate::db::manifest::SidecarTablePin {
table_key: table_key.clone(),

View file

@ -368,14 +368,4 @@ async fn composite_flow_init_load_branch_merge_time_travel_optimize_cleanup() {
.await
.unwrap();
assert!(!final_total.batches().is_empty());
let final_total = query_main(
&mut db,
TEST_QUERIES,
"total_people",
&ParamMap::default(),
)
.await
.unwrap();
assert!(!final_total.batches().is_empty());
}

View file

@ -6,7 +6,7 @@ use fail::FailScenario;
use omnigraph::db::Omnigraph;
use omnigraph::failpoints::ScopedFailPoint;
use helpers::{MUTATION_QUERIES, mixed_params, mutate_main};
use helpers::{MUTATION_QUERIES, mixed_params, mutate_main, version_main};
const SCHEMA_V1: &str = "node Person { name: String @key }\n";
const SCHEMA_V2_ADDED_TYPE: &str =
@ -421,6 +421,13 @@ async fn schema_apply_phase_b_failure_recovered_on_next_open() {
.unwrap();
}
// Capture pre-failure manifest version so we can assert the recovery
// sweep advances it.
let pre_failure_version = {
let db = Omnigraph::open(&uri).await.unwrap();
version_main(&db).await.unwrap()
};
// Phase A: trigger the residual via `schema_apply.after_staging_write`.
// This failpoint fires AFTER the rewritten_tables/indexed_tables loops
// (Lance HEAD advanced) AND AFTER the schema-state staging files are
@ -479,7 +486,7 @@ edge WorksAt: Person -> Company
// SchemaApply (loose-match) — classifier accepts the multi-commit
// drift on Person, decision is RollForward, manifest extends to the
// current Lance HEAD.
let _db = Omnigraph::open(&uri).await.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
// Sidecar gone, audit row recorded.
let recovery_dir = dir.path().join("__recovery");
@ -499,6 +506,16 @@ edge WorksAt: Person -> Company
audit_dir.exists(),
"_graph_commit_recoveries.lance must exist after schema_apply recovery"
);
// Recovery sweep must have advanced the manifest pin on the rewritten
// table: roll-forward published the post-failure Lance HEAD.
let post_recovery_version = version_main(&db).await.unwrap();
assert!(
post_recovery_version > pre_failure_version,
"manifest version must advance post-recovery; pre={pre_failure_version}, \
post={post_recovery_version}",
);
drop(db);
}
#[tokio::test]
@ -509,8 +526,12 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// Seed main with a row, branch off, mutate the branch, then attempt
// a merge with the failpoint active.
// Seed main with a row, branch off, mutate BOTH sides so the merge
// produces at least one `RewriteMerged` candidate (target moved past
// base too — required for the recovery sidecar to pin anything; the
// sidecar only pins RewriteMerged candidates because they're the
// only path that always advances Lance HEAD via
// `publish_rewritten_merge_table`).
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
@ -530,8 +551,24 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() {
)
.await
.unwrap();
// Mutate main too so the merge sees target ≠ base for Person —
// forces RewriteMerged classification.
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
)
.await
.unwrap();
}
// Capture pre-failure state on main for post-recovery comparison.
let pre_failure_version = {
let db = Omnigraph::open(&uri).await.unwrap();
version_main(&db).await.unwrap()
};
// Phase A: failpoint fires after the per-table publish loop completes
// but before commit_manifest_updates. Sidecar persists.
{
@ -558,10 +595,13 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() {
);
}
// Phase B: reopen runs the sweep. BranchMerge is strict-classified
// (single commit_staged per table for the merge_insert path), so the
// sidecar's post_commit_pin should match observed Lance HEAD.
let _db = Omnigraph::open(&uri).await.unwrap();
// Phase B: reopen runs the sweep. BranchMerge uses LOOSE
// classification — `publish_rewritten_merge_table` runs multiple
// commit_staged calls per table (stage_merge_insert + delete_where +
// index rebuilds), so post_commit_pin in the sidecar is a lower
// bound; the loose-match classifier accepts any HEAD > expected_version
// when expected_version == manifest_pinned.
let db = Omnigraph::open(&uri).await.unwrap();
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
@ -580,6 +620,15 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() {
audit_dir.exists(),
"_graph_commit_recoveries.lance must exist after branch_merge recovery"
);
// Recovery must have advanced main's manifest pin (the merge published).
let post_recovery_version = version_main(&db).await.unwrap();
assert!(
post_recovery_version > pre_failure_version,
"manifest version must advance post-recovery; pre={pre_failure_version}, \
post={post_recovery_version}",
);
drop(db);
}
/// `ensure_indices` only writes a sidecar when at least one table

View file

@ -10,7 +10,10 @@
//! row recording.
use std::path::Path;
use std::sync::Arc;
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use lance::Dataset;
use omnigraph::db::Omnigraph;
@ -51,6 +54,30 @@ fn fnv1a64(bytes: &[u8]) -> u64 {
hash
}
/// Build a Person RecordBatch matching the post-init Lance schema:
/// `id: Utf8, age: Int32?, name: Utf8`. Used by tests that need to advance
/// Lance HEAD with real fragment changes (not no-op deletes) bypassing
/// `__manifest`.
fn person_batch(rows: &[(&str, &str, Option<i32>)]) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
Field::new("name", DataType::Utf8, false),
]));
let ids: Vec<&str> = rows.iter().map(|(id, _, _)| *id).collect();
let names: Vec<&str> = rows.iter().map(|(_, name, _)| *name).collect();
let ages: Vec<Option<i32>> = rows.iter().map(|(_, _, age)| *age).collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(ages)),
Arc::new(StringArray::from(names)),
],
)
.unwrap()
}
#[tokio::test]
async fn recovery_does_not_run_on_clean_open() {
let dir = tempfile::tempdir().unwrap();
@ -780,8 +807,11 @@ async fn recovery_ensure_indices_handles_empty_tables() {
/// expected=v1, post=v2.
/// - Sidecar B: kind=EnsureIndices (loose), refers to Person at
/// expected=v2, post=v3.
/// - Lance HEAD for Person sits at v3 (both writers' Phase B fragments
/// chained but neither's Phase C landed).
/// - Lance HEAD for Person sits at v3, and v1, v2, v3 have DIFFERENT
/// fragment-id sets (each version added a real row via append_batch).
/// This means `restore_table_to_version`'s fragment-set short-circuit
/// does NOT fire under the bug path — a real `Dataset::restore`
/// actually runs there, producing HEAD=v4.
///
/// Outcome paths:
///
@ -839,18 +869,31 @@ async fn recovery_multi_sidecar_requires_fresh_snapshot_for_correctness() {
let mut ds = Dataset::open(&person_uri).await.unwrap();
let v1 = ds.version().version;
// Advance Lance HEAD twice to mimic two consecutive
// would-be-publishes that didn't land:
// - First "writer" advanced HEAD v1 → v2.
// - Second "writer" advanced HEAD v2 → v3.
// Manifest stays at v1 throughout because we're synthesizing.
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
// Advance Lance HEAD twice via raw append_batch to mimic two
// consecutive would-be-publishes that didn't land. Each append adds
// a new fragment, so v1, v2, v3 have DIFFERENT fragment-id sets —
// restore_table_to_version's fragment-set short-circuit will not
// fire when classifier dispatches to rollback (the
// differentiator we rely on).
//
// Bypassing __manifest is what `delete_where` and `append_batch`
// both do (direct on Lance); using append_batch (instead of no-op
// deletes) is what makes the fragment-set differ across versions.
store
.append_batch(
&person_uri,
&mut ds,
person_batch(&[("bob-id", "bob", Some(25))]),
)
.await
.unwrap();
let v2 = ds.version().version;
let _ = store
.delete_where(&person_uri, &mut ds, "1 = 2")
store
.append_batch(
&person_uri,
&mut ds,
person_batch(&[("carol-id", "carol", Some(40))]),
)
.await
.unwrap();
let v3 = ds.version().version;