From 83d77bcb16d10b6eb2d28f8c6e202ab3606311fd Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 04:59:48 +0300 Subject: [PATCH] test(cluster): failpoint coverage for graph-create crash windows - Crash before the init (row 1): sidecar survives, nothing moved, no ack; the next run's sweep removes the intent and the same run creates and converges. - Crash after the init, before the state CAS (row 4): the graph exists with the post-init manifest pin in the sidecar, state.json byte-identical; the next run's sweep rolls the ledger forward with a recovery_records audit entry and the run converges. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/tests/failpoints.rs | 128 +++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index 743f1fe..ec8ddfb 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -217,3 +217,131 @@ async fn apply_cas_race_surfaces_state_cas_mismatch() { assert!(recovered.converged); scenario.teardown(); } + +fn seed_empty_state(config_dir: &Path) { + let state_dir = config_dir.join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{ + "version": 1, + "state_revision": 1, + "applied_revision": { "resources": {} } +} +"#, + ) + .unwrap(); +} + +fn recovery_sidecars(config_dir: &Path) -> Vec { + match fs::read_dir(config_dir.join("__cluster/recoveries")) { + Ok(entries) => { + let mut paths: Vec = entries + .flatten() + .map(|entry| entry.path()) + .filter(|path| path.extension().is_some_and(|ext| ext == "json")) + .collect(); + paths.sort(); + paths + } + Err(_) => Vec::new(), + } +} + +/// Crash before the init: the create-intent sidecar survives, nothing moved. +/// The next run's sweep removes the intent (row 1) and the same run creates +/// the graph and converges. +#[tokio::test] +async fn create_crash_before_init_recovers_via_sweep() { + let scenario = FailScenario::setup(); + let dir = fixture(); + seed_empty_state(dir.path()); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_create", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "injected_failpoint" + && diagnostic + .message + .contains("cluster_apply.before_graph_create") + })); + assert_eq!(recovery_sidecars(dir.path()).len(), 1); + assert!(!dir.path().join("graphs/knowledge.omni").exists()); + // No resource digest moved. + let state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap(), + ) + .unwrap(); + assert!( + state["applied_revision"]["resources"] + .as_object() + .unwrap() + .is_empty() + ); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!(recovered.converged); + assert!(dir.path().join("graphs/knowledge.omni").exists()); + assert!(recovery_sidecars(dir.path()).is_empty()); + scenario.teardown(); +} + +/// Crash after the init but before the state CAS: the graph exists, the +/// ledger is stale, nothing was acknowledged. The next run's sweep rolls the +/// ledger forward (row 4) with an audit entry, and the run converges. +#[tokio::test] +async fn create_crash_after_init_rolls_state_forward() { + let scenario = FailScenario::setup(); + let dir = fixture(); + seed_empty_state(dir.path()); + let state_before = fs::read(dir.path().join("__cluster/state.json")).unwrap(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_create", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(!out.state_written); + // The graph exists; the cluster state is byte-identical (no ack). + assert!(dir.path().join("graphs/knowledge.omni").exists()); + assert_eq!( + fs::read(dir.path().join("__cluster/state.json")).unwrap(), + state_before + ); + // The sidecar carries the post-init manifest pin. + let sidecars = recovery_sidecars(dir.path()); + assert_eq!(sidecars.len(), 1); + let sidecar: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); + assert!( + sidecar["expected_manifest_version"].is_number(), + "{sidecar}" + ); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!( + recovered + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(recovered.converged); + assert!(recovery_sidecars(dir.path()).is_empty()); + let state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap(), + ) + .unwrap(); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["outcome"] == "rolled_forward") + ); + scenario.teardown(); +}