test(cluster): failpoint coverage for graph-create crash windows

- Crash before the init (row 1): sidecar survives, nothing moved, no ack;
  the next run's sweep removes the intent and the same run creates and
  converges.
- Crash after the init, before the state CAS (row 4): the graph exists with
  the post-init manifest pin in the sidecar, state.json byte-identical; the
  next run's sweep rolls the ledger forward with a recovery_records audit
  entry and the run converges.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 04:59:48 +03:00
parent c3007369cd
commit 83d77bcb16

View file

@ -217,3 +217,131 @@ async fn apply_cas_race_surfaces_state_cas_mismatch() {
assert!(recovered.converged);
scenario.teardown();
}
fn seed_empty_state(config_dir: &Path) {
let state_dir = config_dir.join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"{
"version": 1,
"state_revision": 1,
"applied_revision": { "resources": {} }
}
"#,
)
.unwrap();
}
fn recovery_sidecars(config_dir: &Path) -> Vec<PathBuf> {
match fs::read_dir(config_dir.join("__cluster/recoveries")) {
Ok(entries) => {
let mut paths: Vec<PathBuf> = entries
.flatten()
.map(|entry| entry.path())
.filter(|path| path.extension().is_some_and(|ext| ext == "json"))
.collect();
paths.sort();
paths
}
Err(_) => Vec::new(),
}
}
/// Crash before the init: the create-intent sidecar survives, nothing moved.
/// The next run's sweep removes the intent (row 1) and the same run creates
/// the graph and converges.
#[tokio::test]
async fn create_crash_before_init_recovers_via_sweep() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_empty_state(dir.path());
{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_create", "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "injected_failpoint"
&& diagnostic
.message
.contains("cluster_apply.before_graph_create")
}));
assert_eq!(recovery_sidecars(dir.path()).len(), 1);
assert!(!dir.path().join("graphs/knowledge.omni").exists());
// No resource digest moved.
let state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap(),
)
.unwrap();
assert!(
state["applied_revision"]["resources"]
.as_object()
.unwrap()
.is_empty()
);
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(recovered.converged);
assert!(dir.path().join("graphs/knowledge.omni").exists());
assert!(recovery_sidecars(dir.path()).is_empty());
scenario.teardown();
}
/// Crash after the init but before the state CAS: the graph exists, the
/// ledger is stale, nothing was acknowledged. The next run's sweep rolls the
/// ledger forward (row 4) with an audit entry, and the run converges.
#[tokio::test]
async fn create_crash_after_init_rolls_state_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_empty_state(dir.path());
let state_before = fs::read(dir.path().join("__cluster/state.json")).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_create", "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
// The graph exists; the cluster state is byte-identical (no ack).
assert!(dir.path().join("graphs/knowledge.omni").exists());
assert_eq!(
fs::read(dir.path().join("__cluster/state.json")).unwrap(),
state_before
);
// The sidecar carries the post-init manifest pin.
let sidecars = recovery_sidecars(dir.path());
assert_eq!(sidecars.len(), 1);
let sidecar: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
assert!(
sidecar["expected_manifest_version"].is_number(),
"{sidecar}"
);
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(
recovered
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
);
assert!(recovered.converged);
assert!(recovery_sidecars(dir.path()).is_empty());
let state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(dir.path().join("__cluster/state.json")).unwrap(),
)
.unwrap();
assert!(
state["recovery_records"]
.as_object()
.unwrap()
.values()
.any(|record| record["outcome"] == "rolled_forward")
);
scenario.teardown();
}