test(cluster): failpoint coverage for delete crash windows

- Crash before the removal: root intact, approval file unconsumed, sidecar
  survives, no ack; the next run retires the stale intent (row 8) and the
  still-approved delete completes in the same run.
- Crash after the removal, before the state CAS: root gone, ledger
  byte-identical, the sidecar carries the approval id; the next run's sweep
  rolls the tombstone forward, consumes the approval, audits the recovery,
  and converges (row 7b).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 14:34:54 +03:00
parent d1d04217ab
commit 87691fe9c7

View file

@ -16,7 +16,8 @@ use fail::FailScenario;
use omnigraph_cluster::failpoints::ScopedFailPoint;
use omnigraph::db::Omnigraph;
use omnigraph_cluster::{
ApplyOptions, apply_config_dir, apply_config_dir_with_options, validate_config_dir,
ApplyOptions, apply_config_dir, apply_config_dir_with_options, approve_config_dir,
validate_config_dir,
};
use tempfile::tempdir;
@ -467,3 +468,126 @@ async fn schema_crash_after_apply_rolls_state_forward() {
);
scenario.teardown();
}
/// Seed: converged state + a stale `old` graph subtree with a real root and
/// a valid approval for its delete. Returns the approval id.
async fn seed_approved_delete(dir: &Path) -> String {
let digests = seed_applyable_state(dir);
let graph_digest = digests["graph.knowledge"].clone();
let schema_digest = digests["schema.knowledge"].clone();
let state_dir = dir.join("__cluster");
fs::write(
state_dir.join("state.json"),
format!(
r#"{{
"version": 1,
"state_revision": 1,
"applied_revision": {{
"resources": {{
"graph.knowledge": {{ "digest": "{graph_digest}" }},
"schema.knowledge": {{ "digest": "{schema_digest}" }},
"graph.old": {{ "digest": "3333" }},
"schema.old": {{ "digest": "4444" }}
}}
}}
}}
"#
),
)
.unwrap();
let root = dir.join("graphs/old.omni");
fs::create_dir_all(&root).unwrap();
fs::write(root.join("_schema.pg"), "stale").unwrap();
let approved = approve_config_dir(dir, "graph.old", "test-actor").await;
assert!(approved.ok, "{:?}", approved.diagnostics);
approved.approval_id.unwrap()
}
/// Crash before the removal: root intact, approval unconsumed, no ack; the
/// next run retires the stale intent (row 8) and the still-approved delete
/// completes in the same run.
#[tokio::test]
async fn delete_crash_before_removal_reproposes() {
let scenario = FailScenario::setup();
let dir = fixture();
let approval_id = seed_approved_delete(dir.path()).await;
{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_delete", "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(dir.path().join("graphs/old.omni").exists());
assert_eq!(recovery_sidecars(dir.path()).len(), 1);
// The approval is untouched (file unconsumed).
let artifact: serde_json::Value = serde_json::from_str(
&fs::read_to_string(
dir.path()
.join("__cluster/approvals")
.join(format!("{approval_id}.json")),
)
.unwrap(),
)
.unwrap();
assert!(artifact["consumed_at"].is_null());
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(
recovered
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "graph_delete_incomplete")
);
assert!(recovered.converged);
assert!(!dir.path().join("graphs/old.omni").exists());
assert!(recovery_sidecars(dir.path()).is_empty());
scenario.teardown();
}
/// Crash after the removal, before the state CAS: root gone, ledger stale,
/// nothing acknowledged; the next run's sweep rolls the tombstone forward,
/// consumes the approval the sidecar carries, and audits the recovery.
#[tokio::test]
async fn delete_crash_after_removal_rolls_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
let approval_id = seed_approved_delete(dir.path()).await;
let state_before = fs::read(state_path(dir.path())).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_delete", "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
assert!(!dir.path().join("graphs/old.omni").exists());
assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before);
let sidecars = recovery_sidecars(dir.path());
assert_eq!(sidecars.len(), 1);
let sidecar: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
assert_eq!(sidecar["approval_id"], approval_id.as_str());
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(
recovered
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
);
assert!(recovered.converged);
let state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap();
assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone");
assert!(state["approval_records"][&approval_id]["consumed_at"].is_string());
assert!(
state["recovery_records"]
.as_object()
.unwrap()
.values()
.any(|record| record["kind"] == "graph_delete")
);
scenario.teardown();
}