recovery: four review-round-4 fixes + branch-axis test matrix

D1. roll_forward_all returns per-table actual published versions; the
    audit row's `to_version` records that, not pin.post_commit_pin
    (the latter is a lower bound for loose-match writers SchemaApply /
    EnsureIndices / BranchMerge — pin.post_commit_pin = expected + 1
    while actual published HEAD can be expected + N).

D2. Branch-merge recovery audit uses CommitGraph::open_at_branch when
    sidecar.branch is Some, so the merge parent is the TARGET BRANCH's
    tip (not the global head). Without this, recovered branch_merge
    on a non-main target records the wrong merged_parent_commit_id and
    future merges between the same pair lose already-up-to-date
    detection / merge-base correctness.

D3. Omnigraph::refresh now mirrors open's recovery composition: runs
    recover_schema_state_files BEFORE recover_manifest_drift. Without
    this, a SchemaApply sidecar processed via refresh would publish
    the manifest + delete the sidecar without renaming the staging
    schema files, leaving the repo with new-schema data and old
    `_schema.pg` (real corruption). Refresh's docstring now enumerates
    each open-time recovery step it maintains, so the next maintainer's
    diff between open() and refresh() is trivial.

D4. ensure_indices sidecar pin records `active_branch` (where commits
    actually land), not `entry.table_branch` (where the table currently
    lives). On first fork-on-write, the processing loop's
    open_owned_dataset_for_branch_write forks to active_branch and the
    commit lands there — recovery's open_lance_head must check the
    same branch. Without this, recovery checks the wrong ref and
    misses Phase B drift entirely.

D5. Two new branch-axis tests:
    * recovery_rolls_back_feature_branch_sidecar_against_feature_branch
      — feature-branch rollback variant; asserts post-recovery audit
      kind == RolledBack and the actual restore commit landed on the
      feature ref.
    * branch_merge_phase_b_failure_recovered_on_non_main_target
      — non-main merge target variant; reads the target branch's
      commit graph (Lance ref) and asserts the recovery commit has
      a non-null merged_parent_commit_id (pins D2).

Bug pattern: all four are at composition seams between concepts that
were each tested individually (writer-precision × actual-Lance-HEAD;
branch-context × commit-graph-API; recovery-path × writer-kind; pin-
time-branch × commit-time-branch). The branch-axis matrix is the
cheapest mechanical prevention for D2/D4-class regressions.

All workspace tests pass with --features failpoints.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-04 11:34:18 +02:00
parent aaa031e834
commit 2ce4efc450
No known key found for this signature in database
5 changed files with 355 additions and 24 deletions

View file

@ -656,14 +656,22 @@ async fn process_sidecar(
"recovery: rolling forward sidecar (Phase B completed; \
Phase C did not land)"
);
let new_manifest_version = roll_forward_all(root_uri, sidecar).await?;
let (new_manifest_version, published_versions) =
roll_forward_all(root_uri, sidecar).await?;
// `to_version` records the ACTUAL Lance HEAD published for
// each table (not pin.post_commit_pin, which is a lower bound
// for loose-match writers like SchemaApply / EnsureIndices /
// BranchMerge that run multiple commit_staged calls per table).
let outcomes: Vec<TableOutcome> = sidecar
.tables
.iter()
.map(|pin| TableOutcome {
table_key: pin.table_key.clone(),
from_version: pin.expected_version,
to_version: pin.post_commit_pin,
to_version: published_versions
.get(&pin.table_key)
.copied()
.unwrap_or(pin.post_commit_pin),
})
.collect();
record_audit(
@ -691,9 +699,19 @@ async fn process_sidecar(
/// contention; persistent contention surfaces the typed conflict error to
/// the recovery sweep, which leaves the sidecar in place for the next
/// open's retry.
async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result<u64> {
/// Returns `(new_manifest_version, per_table_published_versions)`. The
/// per-table map is what the audit row's `to_version` should record —
/// for loose-match writers the actual Lance HEAD can be higher than the
/// sidecar's `post_commit_pin` (which is a lower bound), so the pin is
/// the wrong source of truth for an operator-facing audit field.
async fn roll_forward_all(
root_uri: &str,
sidecar: &RecoverySidecar,
) -> Result<(u64, HashMap<String, u64>)> {
let mut updates: Vec<ManifestChange> = Vec::with_capacity(sidecar.tables.len());
let mut expected: HashMap<String, u64> = HashMap::with_capacity(sidecar.tables.len());
let mut published_versions: HashMap<String, u64> =
HashMap::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
// Open the dataset at its CURRENT Lance HEAD on the pin's branch
@ -735,11 +753,12 @@ async fn roll_forward_all(root_uri: &str, sidecar: &RecoverySidecar) -> Result<u
version_metadata,
}));
expected.insert(pin.table_key.clone(), pin.expected_version);
published_versions.insert(pin.table_key.clone(), head_version);
}
let publisher = GraphNamespacePublisher::new(root_uri, sidecar.branch.as_deref());
let new_dataset = publisher.publish(&updates, &expected).await?;
Ok(new_dataset.version().version)
Ok((new_dataset.version().version, published_versions))
}
/// Append the audit row describing this recovery action.
@ -760,24 +779,31 @@ async fn record_audit(
kind: RecoveryKind,
outcomes: Vec<TableOutcome>,
) -> Result<()> {
let mut graph = CommitGraph::open(root_uri).await?;
// BranchMerge sidecars carry the source branch's HEAD commit id so
// recovery can record this as a MERGE commit (with parent linkage)
// instead of a plain commit. Without the merge parent, future
// `branch_merge feature → main` between the same pair would not
// recognize "already up-to-date" and merge-base computations break.
//
// For BranchMerge on a non-main target, the parent commit id is the
// TARGET BRANCH's tip — `CommitGraph::open()` returns the global
// commit graph whose `head_commit_id()` is the global head and would
// record the wrong parent. Open the per-branch instance instead.
let graph_commit_id = match (
sidecar.writer_kind,
sidecar.merge_source_commit_id.as_deref(),
kind,
) {
(SidecarKind::BranchMerge, Some(source_id), RecoveryKind::RolledForward) => {
// For BranchMerge roll-forward, fetch the current branch
// tip as the parent — at open-time recovery this is the
// pre-merge tip (no other writers have run yet).
let mut branch_graph = match sidecar.branch.as_deref() {
Some(target_branch) => {
CommitGraph::open_at_branch(root_uri, target_branch).await?
}
None => CommitGraph::open(root_uri).await?,
};
let parent_commit_id =
graph.head_commit_id().await?.unwrap_or_default();
graph
branch_graph.head_commit_id().await?.unwrap_or_default();
branch_graph
.append_merge_commit(
sidecar.branch.as_deref(),
manifest_version,
@ -788,6 +814,7 @@ async fn record_audit(
.await?
}
_ => {
let mut graph = CommitGraph::open(root_uri).await?;
graph
.append_commit(
sidecar.branch.as_deref(),

View file

@ -376,16 +376,32 @@ impl Omnigraph {
}
/// Re-read the handle-local coordinator state from storage AND run
/// roll-forward-only recovery: closes the in-process Phase B → Phase C
/// residual (e.g. `MutationStaging::finalize` crash mid-publish in a
/// long-running server) without restart. Roll-forward uses
/// `ManifestBatchPublisher::publish`'s row-level CAS — safe under
/// concurrency. Sidecars that would require `Dataset::restore` are
/// deferred to the next ReadWrite open (restore can silently orphan
/// a concurrent writer's commit if invoked under concurrency).
/// in-process recovery. Closes the Phase B → Phase C residual (e.g.
/// `MutationStaging::finalize` crash mid-publish in a long-running
/// server) without restart.
///
/// Composition mirrors `Omnigraph::open_with_storage_and_mode`'s
/// recovery sequence, in the same order, with one restriction: the
/// manifest-drift sweep runs in `RollForwardOnly` mode (rollback /
/// abort cases defer to the next ReadWrite open because
/// `Dataset::restore` is unsafe under concurrency). Each step:
///
/// 1. `coordinator.refresh()` — re-read manifest.
/// 2. `recover_schema_state_files` — complete an in-flight
/// schema_apply's staging→final rename if a SchemaApply sidecar
/// is on disk; idempotent + early-returns when no staging files
/// exist. Required BEFORE manifest-drift recovery so a
/// SchemaApply roll-forward doesn't publish the manifest while
/// the staging files remain unrenamed (which would corrupt the
/// repo: data on new schema, catalog on old).
/// 3. `recover_manifest_drift(... RollForwardOnly)` — close the
/// finalize→publisher residual via roll-forward; defer rollback
/// work to next ReadWrite open.
/// 4. `runtime_cache.invalidate_all` — drop stale per-snapshot caches.
///
/// Steady state cost: one `list_dir` of `__recovery/` (typically
/// returns empty → early return). No additional Lance reads.
/// returns empty → early return for both passes). No additional
/// Lance reads.
///
/// Engine-internal callers that already hold an in-flight sidecar
/// (e.g. `schema_apply` mid-write) MUST use
@ -393,6 +409,12 @@ impl Omnigraph {
/// avoid the recovery sweep racing their own sidecar.
pub async fn refresh(&mut self) -> Result<()> {
self.coordinator.refresh().await?;
recover_schema_state_files(
&self.root_uri,
Arc::clone(&self.storage),
&self.coordinator.snapshot(),
)
.await?;
crate::db::manifest::recover_manifest_drift(
&self.root_uri,
Arc::clone(&self.storage),

View file

@ -81,7 +81,13 @@ pub(super) async fn ensure_indices_for_branch(
table_path: full_path,
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
table_branch: entry.table_branch.clone(),
// Use active_branch (where commits actually land), NOT
// entry.table_branch (where the table currently lives).
// open_owned_dataset_for_branch_write forks a feature
// branch from a main-branch table on first write — the
// resulting commit lands on active_branch. Recovery's
// open_lance_head must check the same branch.
table_branch: active_branch.clone(),
});
}
}
@ -102,7 +108,13 @@ pub(super) async fn ensure_indices_for_branch(
table_path: full_path,
expected_version: entry.table_version,
post_commit_pin: entry.table_version + 1,
table_branch: entry.table_branch.clone(),
// Use active_branch (where commits actually land), NOT
// entry.table_branch (where the table currently lives).
// open_owned_dataset_for_branch_write forks a feature
// branch from a main-branch table on first write — the
// resulting commit lands on active_branch. Recovery's
// open_lance_head must check the same branch.
table_branch: active_branch.clone(),
});
}
}

View file

@ -926,6 +926,146 @@ async fn branch_merge_phase_b_failure_recovered_on_next_open() {
drop(db);
}
/// Branch-axis variant of the branch_merge recovery test: target is a
/// non-main branch. Catches the branch-specific commit-graph head bug
/// (D2) — without `CommitGraph::open_at_branch`, the recovery sweep
/// would record the global head as the merge parent on a non-main
/// target, and future merges between the same pair would lose
/// already-up-to-date detection.
#[tokio::test]
async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// Setup:
// main: alice
// target_branch (off main): + bob (target moved past base)
// source_branch (off main): + carol (source moved past base)
// Merge: source_branch → target_branch
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("target_branch").await.unwrap();
db.mutate(
"target_branch",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
)
.await
.unwrap();
db.branch_create("source_branch").await.unwrap();
db.mutate(
"source_branch",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
)
.await
.unwrap();
}
// Phase A: failpoint fires after the per-table publish loop completes
// but before commit_manifest_updates. Sidecar persists with
// branch=Some("target_branch").
{
let mut db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
let err = db
.branch_merge("source_branch", "target_branch")
.await
.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecar_count = std::fs::read_dir(&recovery_dir).unwrap().count();
assert_eq!(
sidecar_count,
1,
"exactly one sidecar must persist after non-main branch_merge failure"
);
}
// Phase B: reopen runs full sweep. The BranchMerge sidecar's branch
// = Some("target_branch"); D2 fix opens a per-branch CommitGraph
// for the audit append so the merge-parent linkage is correct.
let _db = Omnigraph::open(&uri).await.unwrap();
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted; remaining: {:?}",
remaining,
);
}
// Audit row for a merge commit was recorded with a non-null
// merged_parent_commit_id — proves the recovery sweep used the
// branch-specific commit-graph head as parent (not the global
// head). Also assert the recovery audit's recovery_kind ==
// RolledForward.
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let commits_dir = dir.path().join("_graph_commits.lance");
// Recovery wrote the merge commit to the target_branch's Lance ref
// on the commit_graph dataset (per CommitGraph::open_at_branch).
// Open at that ref to find the merge commit.
let ds = lance::Dataset::open(commits_dir.to_str().unwrap())
.await
.unwrap()
.checkout_branch("target_branch")
.await
.unwrap();
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let mut found_recovery_merge = false;
for batch in batches {
let merged = batch
.column_by_name("merged_parent_commit_id")
.expect("merged_parent_commit_id column present")
.as_any()
.downcast_ref::<StringArray>()
.expect("merged_parent_commit_id is Utf8");
for i in 0..merged.len() {
if !merged.is_null(i) {
found_recovery_merge = true;
break;
}
}
}
assert!(
found_recovery_merge,
"non-main branch_merge recovery must record `merged_parent_commit_id` on the \
target branch's commit graph",
);
}
/// `ensure_indices` only writes a sidecar when at least one table
/// genuinely needs index work (per `needs_index_work_*` helpers in
/// `db/omnigraph/table_ops.rs`). When all tables are steady-state

View file

@ -1121,11 +1121,141 @@ async fn recovery_classifies_feature_branch_sidecar_against_feature_branch() {
v_pin, v_head, post_entry.table_version,
);
// Audit row recorded for the recovery action.
// Audit row recorded for the recovery action — and the row's
// recovery_kind == RolledForward (proves the branch-aware classifier
// got us through the eligible path; without it, the snapshot lookup
// against main's pin would have produced NoMovement → RollBack).
let kinds = list_recovery_audit_kinds(dir.path()).await;
assert_eq!(
count_recovery_audit_rows(dir.path()).await,
1,
"feature-branch sidecar recovery must record one audit row",
kinds, vec!["RolledForward".to_string()],
"feature-branch sidecar recovery must record exactly one RolledForward audit row; got {:?}",
kinds,
);
}
/// Companion to the roll-forward feature-branch test: branch-axis
/// rollback. Synthesize a feature-branch sidecar that classifies as
/// rollback-eligible (UnexpectedAtP1) and assert the recovery sweep
/// processes the sidecar against the FEATURE branch (not main) and runs
/// the rollback. Without branch-aware recovery, classification would
/// happen against main's snapshot/HEAD — likely NoMovement → no-op
/// rollback that doesn't touch the actually-drifted feature ref.
#[tokio::test]
async fn recovery_rolls_back_feature_branch_sidecar_against_feature_branch() {
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
helpers::MUTATION_QUERIES,
"insert_person",
&helpers::mixed_params(&[("$name", "bob")], &[("$age", 40)]),
)
.await
.unwrap();
let feature_snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("feature"))
.await
.unwrap();
let feature_entry = feature_snapshot
.entry("node:Person")
.expect("feature snapshot must have Person entry");
let v_pin = feature_entry.table_version;
let feature_branch_name = feature_entry.table_branch.clone();
drop(db);
// Bypass the manifest: append on the feature ref to advance HEAD past
// the manifest pin.
let person_uri = node_table_uri(uri, "Person");
let store = TableStore::new(uri);
let mut ds = store
.open_dataset_head(&person_uri, feature_branch_name.as_deref())
.await
.unwrap();
store
.append_batch(
&person_uri,
&mut ds,
person_batch(&[("dave-id", "dave", Some(50))]),
)
.await
.unwrap();
let v_head = ds.version().version;
assert_eq!(v_head, v_pin + 1);
// Sidecar with bogus expected (mismatch) AND post_commit_pin == v_head.
// Strict Mutation classifier sees lance_head == manifest_pinned + 1
// but expected != manifest_pinned → UnexpectedAtP1 → RollBack.
let bogus_expected = v_pin.saturating_sub(1);
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H0000000000000000000FRB1",
"started_at": "0",
"branch": "feature",
"actor_id": "act-feature-rb",
"writer_kind": "Mutation",
"tables": [
{{
"table_key":"node:Person",
"table_path":"{}",
"expected_version":{},
"post_commit_pin":{},
"table_branch":{}
}}
]
}}"#,
person_uri,
bogus_expected,
v_head,
match &feature_branch_name {
Some(b) => format!("\"{}\"", b),
None => "null".to_string(),
},
);
write_sidecar_file(dir.path(), "01H0000000000000000000FRB1", &sidecar_json);
// Reopen with full sweep — RollBack is allowed at open time.
let _db = Omnigraph::open(uri).await.unwrap();
assert!(
list_recovery_dir(dir.path()).is_empty(),
"feature-branch rollback sidecar must be deleted after recovery"
);
// Audit kind == RolledBack (proves classifier saw feature's HEAD,
// not main's; main's view of Person would be NoMovement → no audit
// row attribution).
let kinds = list_recovery_audit_kinds(dir.path()).await;
assert_eq!(
kinds, vec!["RolledBack".to_string()],
"feature-branch rollback must record one RolledBack audit row; got {:?}",
kinds,
);
// Lance HEAD on the feature ref must have advanced (real restore ran).
let post = store
.open_dataset_head(&person_uri, feature_branch_name.as_deref())
.await
.unwrap();
assert!(
post.version().version > v_head,
"real restore must have appended a commit on feature; v_head={}, post={}",
v_head,
post.version().version,
);
}