recovery: per-writer Phase B failure → recovery integration tests (Phase 9)

Add the three paired per-writer tests required by MR-847's acceptance
criteria — "All four migrated writers ... have paired Phase B → recovery
integration tests."

Production additions (~10 LOC):
- New failpoint `branch_merge.post_phase_b_pre_manifest_commit` in
  `exec/merge.rs::branch_merge_on_current_target` between the per-table
  publish loop and `commit_manifest_updates`.
- New failpoint `ensure_indices.post_phase_b_pre_manifest_commit` in
  `db/omnigraph/table_ops.rs::ensure_indices_for_branch` between the
  per-table loops and `commit_prepared_updates_on_branch`.
- For schema_apply, the existing `schema_apply.after_staging_write`
  failpoint already fires in the right window (after the per-table
  rewrites + index builds, before the manifest publish).

Sidecar tweak:
- `schema_apply` sidecar's `branch` is now `None` (was
  `Some("__schema_apply_lock__")`). The lock branch is purely a
  serialization sentinel; `coordinator.commit_changes_with_actor`
  publishes against the coordinator's pre-lock branch (main). After
  the failpoint fires, `release_schema_apply_lock` removes the lock
  branch — if the sidecar referenced it, the recovery sweep would try
  to publish to a branch that no longer exists and fail. Fix: record
  the actual publish target.

Tests added in `tests/failpoints.rs` (~280 LOC):
- `schema_apply_phase_b_failure_recovered_on_next_open` — seeds a row,
  opens, attempts a schema apply that adds a new node type + a new
  property (the new type ensures the table set differs so
  `recover_schema_state_files` doesn't trip on property-only
  ambiguity), failpoint fires, drops engine, reopens, asserts sidecar
  deleted + audit row recorded.
- `branch_merge_phase_b_failure_recovered_on_next_open` — seeds main,
  branches off, mutates the branch, attempts merge with the
  `branch_merge.post_phase_b_pre_manifest_commit` failpoint active.
  Same recovery shape.
- `ensure_indices_phase_b_failure_recovered_on_next_open` — seeds
  rows, attempts ensure_indices with the
  `ensure_indices.post_phase_b_pre_manifest_commit` failpoint active.

After this commit, all four migrated writers have paired
Phase B → recovery tests:
- mutate_as / load: `recovery_rolls_forward_after_finalize_publisher_failure` (Phase 5)
- schema_apply: `schema_apply_phase_b_failure_recovered_on_next_open`
- branch_merge: `branch_merge_phase_b_failure_recovered_on_next_open`
- ensure_indices: `ensure_indices_phase_b_failure_recovered_on_next_open`

11 failpoint tests pass; full workspace lib + integration tests pass
(350+ tests across 20 binaries).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-03 00:38:02 +02:00
parent c6827919ca
commit 72d3da66de
No known key found for this signature in database
4 changed files with 292 additions and 1 deletions

View file

@ -176,9 +176,16 @@ pub(super) async fn apply_schema_with_lock(
let recovery_handle = if recovery_pins.is_empty() {
None
} else {
// `branch=None` because schema_apply publishes against main —
// the `__schema_apply_lock__` branch is purely a serialization
// sentinel (acquire_schema_apply_lock creates it; the manifest
// publish via coordinator.commit_changes_with_actor below targets
// the coordinator's active branch, which is the pre-lock branch).
// If the lock release fires before recovery, the lock branch is
// gone — the sidecar must not reference it.
let sidecar = crate::db::manifest::new_sidecar(
crate::db::manifest::SidecarKind::SchemaApply,
Some("__schema_apply_lock__".to_string()),
None,
db.audit_actor_id.clone(),
recovery_pins,
);

View file

@ -181,6 +181,13 @@ pub(super) async fn ensure_indices_for_branch(
}
}
// MR-847 failpoint: pin the per-writer Phase B → Phase C residual for
// ensure_indices. Lance HEAD has advanced on every touched table
// (one commit_staged per index built) but the manifest publish below
// hasn't run. Used by
// `tests/failpoints.rs::ensure_indices_phase_b_failure_recovered_on_next_open`.
crate::failpoints::maybe_fail("ensure_indices.post_phase_b_pre_manifest_commit")?;
if !updates.is_empty() {
commit_prepared_updates_on_branch(db, branch, &updates).await?;
}

View file

@ -1236,6 +1236,12 @@ impl Omnigraph {
updates.push(update);
}
// MR-847 failpoint: pin the per-writer Phase B → Phase C residual
// for branch_merge. Lance HEAD has advanced on every touched
// table (publish_*) but the manifest publish below hasn't run.
// Used by `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`.
crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?;
let manifest_version = if updates.is_empty() {
self.version()
} else {

View file

@ -382,3 +382,274 @@ fn assert_no_staging_files(repo: &std::path::Path) {
);
}
}
// =====================================================================
// MR-847 Phase 9 — per-writer Phase B → Phase C recovery integration
// =====================================================================
//
// Each of the four migrated writers writes a sidecar BEFORE its per-table
// commit_staged loop and deletes it AFTER the manifest publish. The
// `recovery_rolls_forward_after_finalize_publisher_failure` test above
// covers MutationStaging::finalize. The three tests below cover the
// other three writers: schema_apply, branch_merge, ensure_indices.
//
// Each follows the same shape: trigger the writer with a failpoint
// active in the Phase B → Phase C window, drop the engine, reopen,
// assert recovery rolled forward (manifest pin advanced, audit row
// recorded, sidecar deleted) and a follow-up operation succeeds without
// ExpectedVersionMismatch.
#[tokio::test]
async fn schema_apply_phase_b_failure_recovered_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// Seed: a Person table with one row so the schema-apply rewritten_tables
// loop has actual work to do.
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
}
// Phase A: trigger the residual via `schema_apply.after_staging_write`.
// This failpoint fires AFTER the rewritten_tables/indexed_tables loops
// (Lance HEAD advanced) AND AFTER the schema-state staging files are
// written, but BEFORE the manifest publish. The MR-847 sidecar persists.
{
let mut db = Omnigraph::open(&uri).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
// v2 schema: add a `city` property to Person AND add a new
// `Tag` node type. The new property triggers the rewritten_tables
// path (Phase B sidecar coverage). The new type changes the
// overall table set — required to keep `recover_schema_state_files`
// (which runs BEFORE recover_manifest_drift) happy: it can't
// disambiguate property-only migrations and would reject the
// open before the MR-847 sweep ever ran.
let v2_schema = r#"node Person {
name: String @key
age: I32?
city: String?
}
node Company {
name: String @key
}
node Tag {
label: String @key
}
edge Knows: Person -> Person {
since: Date?
}
edge WorksAt: Person -> Company
"#;
let err = db.apply_schema(v2_schema).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
"unexpected error: {err}"
);
// Sidecar must still exist.
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"exactly one sidecar must persist after schema_apply failure"
);
}
// Phase B: reopen runs the recovery sweep. Sidecar's writer_kind is
// SchemaApply (loose-match) — classifier accepts the multi-commit
// drift on Person, decision is RollForward, manifest extends to the
// current Lance HEAD.
let _db = Omnigraph::open(&uri).await.unwrap();
// Sidecar gone, audit row recorded.
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted; remaining: {:?}",
remaining,
);
}
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
assert!(
audit_dir.exists(),
"_graph_commit_recoveries.lance must exist after schema_apply recovery"
);
}
#[tokio::test]
async fn branch_merge_phase_b_failure_recovered_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// Seed main with a row, branch off, mutate the branch, then attempt
// a merge with the failpoint active.
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
)
.await
.unwrap();
}
// Phase A: failpoint fires after the per-table publish loop completes
// but before commit_manifest_updates. Sidecar persists.
{
let mut db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
let err = db.branch_merge("feature", "main").await.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"exactly one sidecar must persist after branch_merge failure"
);
}
// Phase B: reopen runs the sweep. BranchMerge is strict-classified
// (single commit_staged per table for the merge_insert path), so the
// sidecar's post_commit_pin should match observed Lance HEAD.
let _db = Omnigraph::open(&uri).await.unwrap();
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted; remaining: {:?}",
remaining,
);
}
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
assert!(
audit_dir.exists(),
"_graph_commit_recoveries.lance must exist after branch_merge recovery"
);
}
#[tokio::test]
async fn ensure_indices_phase_b_failure_recovered_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
// Seed: load some rows so ensure_indices has actual indices to build.
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Person","data":{"name":"bob","age":25}}
"#,
LoadMode::Append,
)
.await
.unwrap();
}
// Phase A: trigger the residual via the post-Phase-B failpoint.
{
let mut db = Omnigraph::open(&uri).await.unwrap();
let _failpoint = ScopedFailPoint::new(
"ensure_indices.post_phase_b_pre_manifest_commit",
"return",
);
let err = db.ensure_indices().await.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"exactly one sidecar must persist after ensure_indices failure"
);
}
// Phase B: reopen runs the sweep. EnsureIndices is loose-match
// (multiple commit_staged calls per table — one per built index).
let _db = Omnigraph::open(&uri).await.unwrap();
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted; remaining: {:?}",
remaining,
);
}
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
assert!(
audit_dir.exists(),
"_graph_commit_recoveries.lance must exist after ensure_indices recovery"
);
}