test(cluster): failpoint coverage for schema-apply crash windows

- Crash before the engine call: sidecar (carrying the --as actor) survives,
  live schema and ledger untouched, no ack; the next run's sweep retires the
  stale intent and the same run applies and converges.
- Crash after the engine call, before the state CAS: the manifest moved with
  the post-op pin in the sidecar, state.json byte-identical; the next run's
  sweep rolls the ledger forward with a schema_apply audit entry and the run
  converges.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 13:13:15 +03:00
parent a1ba4dc413
commit 80cae4e8e1

View file

@ -14,7 +14,10 @@ use std::path::{Path, PathBuf};
use fail::FailScenario;
use omnigraph_cluster::failpoints::ScopedFailPoint;
use omnigraph_cluster::{apply_config_dir, validate_config_dir};
use omnigraph::db::Omnigraph;
use omnigraph_cluster::{
ApplyOptions, apply_config_dir, apply_config_dir_with_options, validate_config_dir,
};
use tempfile::tempdir;
const SCHEMA: &str = r#"
@ -345,3 +348,122 @@ async fn create_crash_after_init_rolls_state_forward() {
);
scenario.teardown();
}
const SCHEMA_V2: &str = r#"
node Person {
name: String @key
age: I32?
bio: String?
}
"#;
async fn converge_with_live_graph(dir: &Path) {
let graph_dir = dir.join("graphs");
fs::create_dir_all(&graph_dir).unwrap();
Omnigraph::init(
graph_dir.join("knowledge.omni").to_string_lossy().as_ref(),
SCHEMA,
)
.await
.unwrap();
seed_applyable_state(dir);
let out = apply_config_dir(dir).await;
assert!(out.ok && out.converged, "{:?}", out.diagnostics);
}
async fn live_schema_digest(dir: &Path) -> String {
let uri = dir.join("graphs/knowledge.omni");
let db = Omnigraph::open_read_only(uri.to_string_lossy().as_ref())
.await
.unwrap();
use sha2::{Digest, Sha256};
let digest = Sha256::digest(db.schema_source().as_bytes());
digest.iter().map(|byte| format!("{byte:02x}")).collect()
}
/// Crash before the engine schema apply: sidecar (with actor) survives, the
/// live schema and ledger are untouched; the next run's sweep retires the
/// stale intent and the same run applies and converges.
#[tokio::test]
async fn schema_crash_before_apply_recovers_via_sweep() {
let scenario = FailScenario::setup();
let dir = fixture();
converge_with_live_graph(dir.path()).await;
let pre_digest = live_schema_digest(dir.path()).await;
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_schema_apply", "return");
let out = apply_config_dir_with_options(
dir.path(),
ApplyOptions {
actor: Some("test-actor".to_string()),
},
)
.await;
assert!(!out.ok);
assert_eq!(out.actor.as_deref(), Some("test-actor"));
let sidecars = recovery_sidecars(dir.path());
assert_eq!(sidecars.len(), 1);
let sidecar: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
assert_eq!(sidecar["kind"], "schema_apply");
assert_eq!(sidecar["actor"], "test-actor");
// Nothing moved.
assert_eq!(live_schema_digest(dir.path()).await, pre_digest);
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(recovered.converged);
assert!(recovery_sidecars(dir.path()).is_empty());
assert_ne!(live_schema_digest(dir.path()).await, pre_digest);
scenario.teardown();
}
/// Crash after the engine schema apply, before the state CAS: the manifest
/// moved, the ledger is stale, nothing acknowledged; the next run's sweep
/// rolls the ledger forward with an audit entry and the run converges.
#[tokio::test]
async fn schema_crash_after_apply_rolls_state_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
converge_with_live_graph(dir.path()).await;
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
let state_before = fs::read(state_path(dir.path())).unwrap();
let desired = validate_config_dir(dir.path());
let v2_digest = desired.resource_digests["schema.knowledge"].clone();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_schema_apply", "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
// The live schema moved; the ledger is byte-identical (no ack).
assert_eq!(live_schema_digest(dir.path()).await, v2_digest);
assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before);
let sidecars = recovery_sidecars(dir.path());
assert_eq!(sidecars.len(), 1);
let sidecar: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
assert!(sidecar["expected_manifest_version"].is_number(), "{sidecar}");
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(
recovered
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
);
assert!(recovered.converged);
assert!(recovery_sidecars(dir.path()).is_empty());
let state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap();
assert_eq!(
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
v2_digest
);
scenario.teardown();
}