diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index cc91b85..5cdf2d4 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -16,7 +16,8 @@ use fail::FailScenario; use omnigraph_cluster::failpoints::ScopedFailPoint; use omnigraph::db::Omnigraph; use omnigraph_cluster::{ - ApplyOptions, apply_config_dir, apply_config_dir_with_options, validate_config_dir, + ApplyOptions, apply_config_dir, apply_config_dir_with_options, approve_config_dir, + validate_config_dir, }; use tempfile::tempdir; @@ -467,3 +468,126 @@ async fn schema_crash_after_apply_rolls_state_forward() { ); scenario.teardown(); } + +/// Seed: converged state + a stale `old` graph subtree with a real root and +/// a valid approval for its delete. Returns the approval id. +async fn seed_approved_delete(dir: &Path) -> String { + let digests = seed_applyable_state(dir); + let graph_digest = digests["graph.knowledge"].clone(); + let schema_digest = digests["schema.knowledge"].clone(); + let state_dir = dir.join("__cluster"); + fs::write( + state_dir.join("state.json"), + format!( + r#"{{ + "version": 1, + "state_revision": 1, + "applied_revision": {{ + "resources": {{ + "graph.knowledge": {{ "digest": "{graph_digest}" }}, + "schema.knowledge": {{ "digest": "{schema_digest}" }}, + "graph.old": {{ "digest": "3333" }}, + "schema.old": {{ "digest": "4444" }} + }} + }} +}} +"# + ), + ) + .unwrap(); + let root = dir.join("graphs/old.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "stale").unwrap(); + let approved = approve_config_dir(dir, "graph.old", "test-actor").await; + assert!(approved.ok, "{:?}", approved.diagnostics); + approved.approval_id.unwrap() +} + +/// Crash before the removal: root intact, approval unconsumed, no ack; the +/// next run retires the stale intent (row 8) and the still-approved delete +/// completes in the same run. +#[tokio::test] +async fn delete_crash_before_removal_reproposes() { + let scenario = FailScenario::setup(); + let dir = fixture(); + let approval_id = seed_approved_delete(dir.path()).await; + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_delete", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(dir.path().join("graphs/old.omni").exists()); + assert_eq!(recovery_sidecars(dir.path()).len(), 1); + // The approval is untouched (file unconsumed). + let artifact: serde_json::Value = serde_json::from_str( + &fs::read_to_string( + dir.path() + .join("__cluster/approvals") + .join(format!("{approval_id}.json")), + ) + .unwrap(), + ) + .unwrap(); + assert!(artifact["consumed_at"].is_null()); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!( + recovered + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "graph_delete_incomplete") + ); + assert!(recovered.converged); + assert!(!dir.path().join("graphs/old.omni").exists()); + assert!(recovery_sidecars(dir.path()).is_empty()); + scenario.teardown(); +} + +/// Crash after the removal, before the state CAS: root gone, ledger stale, +/// nothing acknowledged; the next run's sweep rolls the tombstone forward, +/// consumes the approval the sidecar carries, and audits the recovery. +#[tokio::test] +async fn delete_crash_after_removal_rolls_forward() { + let scenario = FailScenario::setup(); + let dir = fixture(); + let approval_id = seed_approved_delete(dir.path()).await; + let state_before = fs::read(state_path(dir.path())).unwrap(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_delete", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(!out.state_written); + assert!(!dir.path().join("graphs/old.omni").exists()); + assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before); + let sidecars = recovery_sidecars(dir.path()); + assert_eq!(sidecars.len(), 1); + let sidecar: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); + assert_eq!(sidecar["approval_id"], approval_id.as_str()); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!( + recovered + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(recovered.converged); + let state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap(); + assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone"); + assert!(state["approval_records"][&approval_id]["consumed_at"].is_string()); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["kind"] == "graph_delete") + ); + scenario.teardown(); +}