diff --git a/crates/omnigraph/src/db/omnigraph/schema_apply.rs b/crates/omnigraph/src/db/omnigraph/schema_apply.rs index 00d1da9..10cc7cf 100644 --- a/crates/omnigraph/src/db/omnigraph/schema_apply.rs +++ b/crates/omnigraph/src/db/omnigraph/schema_apply.rs @@ -176,9 +176,16 @@ pub(super) async fn apply_schema_with_lock( let recovery_handle = if recovery_pins.is_empty() { None } else { + // `branch=None` because schema_apply publishes against main — + // the `__schema_apply_lock__` branch is purely a serialization + // sentinel (acquire_schema_apply_lock creates it; the manifest + // publish via coordinator.commit_changes_with_actor below targets + // the coordinator's active branch, which is the pre-lock branch). + // If the lock release fires before recovery, the lock branch is + // gone — the sidecar must not reference it. let sidecar = crate::db::manifest::new_sidecar( crate::db::manifest::SidecarKind::SchemaApply, - Some("__schema_apply_lock__".to_string()), + None, db.audit_actor_id.clone(), recovery_pins, ); diff --git a/crates/omnigraph/src/db/omnigraph/table_ops.rs b/crates/omnigraph/src/db/omnigraph/table_ops.rs index a02b4b1..cbdeb1a 100644 --- a/crates/omnigraph/src/db/omnigraph/table_ops.rs +++ b/crates/omnigraph/src/db/omnigraph/table_ops.rs @@ -181,6 +181,13 @@ pub(super) async fn ensure_indices_for_branch( } } + // MR-847 failpoint: pin the per-writer Phase B → Phase C residual for + // ensure_indices. Lance HEAD has advanced on every touched table + // (one commit_staged per index built) but the manifest publish below + // hasn't run. Used by + // `tests/failpoints.rs::ensure_indices_phase_b_failure_recovered_on_next_open`. + crate::failpoints::maybe_fail("ensure_indices.post_phase_b_pre_manifest_commit")?; + if !updates.is_empty() { commit_prepared_updates_on_branch(db, branch, &updates).await?; } diff --git a/crates/omnigraph/src/exec/merge.rs b/crates/omnigraph/src/exec/merge.rs index 818be93..afef21a 100644 --- a/crates/omnigraph/src/exec/merge.rs +++ b/crates/omnigraph/src/exec/merge.rs @@ -1236,6 +1236,12 @@ impl Omnigraph { updates.push(update); } + // MR-847 failpoint: pin the per-writer Phase B → Phase C residual + // for branch_merge. Lance HEAD has advanced on every touched + // table (publish_*) but the manifest publish below hasn't run. + // Used by `tests/failpoints.rs::branch_merge_phase_b_failure_recovered_on_next_open`. + crate::failpoints::maybe_fail("branch_merge.post_phase_b_pre_manifest_commit")?; + let manifest_version = if updates.is_empty() { self.version() } else { diff --git a/crates/omnigraph/tests/failpoints.rs b/crates/omnigraph/tests/failpoints.rs index 9d9edb9..8961085 100644 --- a/crates/omnigraph/tests/failpoints.rs +++ b/crates/omnigraph/tests/failpoints.rs @@ -382,3 +382,274 @@ fn assert_no_staging_files(repo: &std::path::Path) { ); } } + +// ===================================================================== +// MR-847 Phase 9 — per-writer Phase B → Phase C recovery integration +// ===================================================================== +// +// Each of the four migrated writers writes a sidecar BEFORE its per-table +// commit_staged loop and deletes it AFTER the manifest publish. The +// `recovery_rolls_forward_after_finalize_publisher_failure` test above +// covers MutationStaging::finalize. The three tests below cover the +// other three writers: schema_apply, branch_merge, ensure_indices. +// +// Each follows the same shape: trigger the writer with a failpoint +// active in the Phase B → Phase C window, drop the engine, reopen, +// assert recovery rolled forward (manifest pin advanced, audit row +// recorded, sidecar deleted) and a follow-up operation succeeds without +// ExpectedVersionMismatch. + +#[tokio::test] +async fn schema_apply_phase_b_failure_recovered_on_next_open() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // Seed: a Person table with one row so the schema-apply rewritten_tables + // loop has actual work to do. + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + } + + // Phase A: trigger the residual via `schema_apply.after_staging_write`. + // This failpoint fires AFTER the rewritten_tables/indexed_tables loops + // (Lance HEAD advanced) AND AFTER the schema-state staging files are + // written, but BEFORE the manifest publish. The MR-847 sidecar persists. + { + let mut db = Omnigraph::open(&uri).await.unwrap(); + let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return"); + // v2 schema: add a `city` property to Person AND add a new + // `Tag` node type. The new property triggers the rewritten_tables + // path (Phase B sidecar coverage). The new type changes the + // overall table set — required to keep `recover_schema_state_files` + // (which runs BEFORE recover_manifest_drift) happy: it can't + // disambiguate property-only migrations and would reject the + // open before the MR-847 sweep ever ran. + let v2_schema = r#"node Person { + name: String @key + age: I32? + city: String? +} + +node Company { + name: String @key +} + +node Tag { + label: String @key +} + +edge Knows: Person -> Person { + since: Date? +} + +edge WorksAt: Person -> Company +"#; + let err = db.apply_schema(v2_schema).await.unwrap_err(); + assert!( + err.to_string() + .contains("injected failpoint triggered: schema_apply.after_staging_write"), + "unexpected error: {err}" + ); + + // Sidecar must still exist. + let recovery_dir = dir.path().join("__recovery"); + let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert_eq!( + sidecars.len(), + 1, + "exactly one sidecar must persist after schema_apply failure" + ); + } + + // Phase B: reopen runs the recovery sweep. Sidecar's writer_kind is + // SchemaApply (loose-match) — classifier accepts the multi-commit + // drift on Person, decision is RollForward, manifest extends to the + // current Lance HEAD. + let _db = Omnigraph::open(&uri).await.unwrap(); + + // Sidecar gone, audit row recorded. + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + let remaining: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert!( + remaining.is_empty(), + "sidecar must be deleted; remaining: {:?}", + remaining, + ); + } + let audit_dir = dir.path().join("_graph_commit_recoveries.lance"); + assert!( + audit_dir.exists(), + "_graph_commit_recoveries.lance must exist after schema_apply recovery" + ); +} + +#[tokio::test] +async fn branch_merge_phase_b_failure_recovered_on_next_open() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // Seed main with a row, branch off, mutate the branch, then attempt + // a merge with the failpoint active. + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + db.branch_create("feature").await.unwrap(); + db.mutate( + "feature", + MUTATION_QUERIES, + "insert_person", + &mixed_params(&[("$name", "Bob")], &[("$age", 40)]), + ) + .await + .unwrap(); + } + + // Phase A: failpoint fires after the per-table publish loop completes + // but before commit_manifest_updates. Sidecar persists. + { + let mut db = Omnigraph::open(&uri).await.unwrap(); + let _failpoint = + ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return"); + let err = db.branch_merge("feature", "main").await.unwrap_err(); + assert!( + err.to_string().contains( + "injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit" + ), + "unexpected error: {err}" + ); + + let recovery_dir = dir.path().join("__recovery"); + let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert_eq!( + sidecars.len(), + 1, + "exactly one sidecar must persist after branch_merge failure" + ); + } + + // Phase B: reopen runs the sweep. BranchMerge is strict-classified + // (single commit_staged per table for the merge_insert path), so the + // sidecar's post_commit_pin should match observed Lance HEAD. + let _db = Omnigraph::open(&uri).await.unwrap(); + + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + let remaining: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert!( + remaining.is_empty(), + "sidecar must be deleted; remaining: {:?}", + remaining, + ); + } + let audit_dir = dir.path().join("_graph_commit_recoveries.lance"); + assert!( + audit_dir.exists(), + "_graph_commit_recoveries.lance must exist after branch_merge recovery" + ); +} + +#[tokio::test] +async fn ensure_indices_phase_b_failure_recovered_on_next_open() { + use omnigraph::loader::{LoadMode, load_jsonl}; + + let _scenario = FailScenario::setup(); + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap().to_string(); + + // Seed: load some rows so ensure_indices has actual indices to build. + { + let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap(); + load_jsonl( + &mut db, + r#"{"type":"Person","data":{"name":"alice","age":30}} +{"type":"Person","data":{"name":"bob","age":25}} +"#, + LoadMode::Append, + ) + .await + .unwrap(); + } + + // Phase A: trigger the residual via the post-Phase-B failpoint. + { + let mut db = Omnigraph::open(&uri).await.unwrap(); + let _failpoint = ScopedFailPoint::new( + "ensure_indices.post_phase_b_pre_manifest_commit", + "return", + ); + let err = db.ensure_indices().await.unwrap_err(); + assert!( + err.to_string().contains( + "injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit" + ), + "unexpected error: {err}" + ); + + let recovery_dir = dir.path().join("__recovery"); + let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert_eq!( + sidecars.len(), + 1, + "exactly one sidecar must persist after ensure_indices failure" + ); + } + + // Phase B: reopen runs the sweep. EnsureIndices is loose-match + // (multiple commit_staged calls per table — one per built index). + let _db = Omnigraph::open(&uri).await.unwrap(); + + let recovery_dir = dir.path().join("__recovery"); + if recovery_dir.exists() { + let remaining: Vec<_> = std::fs::read_dir(&recovery_dir) + .unwrap() + .filter_map(|e| e.ok()) + .collect(); + assert!( + remaining.is_empty(), + "sidecar must be deleted; remaining: {:?}", + remaining, + ); + } + let audit_dir = dir.path().join("_graph_commit_recoveries.lance"); + assert!( + audit_dir.exists(), + "_graph_commit_recoveries.lance must exist after ensure_indices recovery" + ); +}