From 80cae4e8e1cf947b17f57826d6cd43b1bf71c54a Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 13:13:15 +0300 Subject: [PATCH] test(cluster): failpoint coverage for schema-apply crash windows - Crash before the engine call: sidecar (carrying the --as actor) survives, live schema and ledger untouched, no ack; the next run's sweep retires the stale intent and the same run applies and converges. - Crash after the engine call, before the state CAS: the manifest moved with the post-op pin in the sidecar, state.json byte-identical; the next run's sweep rolls the ledger forward with a schema_apply audit entry and the run converges. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/tests/failpoints.rs | 124 ++++++++++++++++++- 1 file changed, 123 insertions(+), 1 deletion(-) diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index ec8ddfb..cc91b85 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -14,7 +14,10 @@ use std::path::{Path, PathBuf}; use fail::FailScenario; use omnigraph_cluster::failpoints::ScopedFailPoint; -use omnigraph_cluster::{apply_config_dir, validate_config_dir}; +use omnigraph::db::Omnigraph; +use omnigraph_cluster::{ + ApplyOptions, apply_config_dir, apply_config_dir_with_options, validate_config_dir, +}; use tempfile::tempdir; const SCHEMA: &str = r#" @@ -345,3 +348,122 @@ async fn create_crash_after_init_rolls_state_forward() { ); scenario.teardown(); } + +const SCHEMA_V2: &str = r#" +node Person { + name: String @key + age: I32? + bio: String? +} +"#; + +async fn converge_with_live_graph(dir: &Path) { + let graph_dir = dir.join("graphs"); + fs::create_dir_all(&graph_dir).unwrap(); + Omnigraph::init( + graph_dir.join("knowledge.omni").to_string_lossy().as_ref(), + SCHEMA, + ) + .await + .unwrap(); + seed_applyable_state(dir); + let out = apply_config_dir(dir).await; + assert!(out.ok && out.converged, "{:?}", out.diagnostics); +} + +async fn live_schema_digest(dir: &Path) -> String { + let uri = dir.join("graphs/knowledge.omni"); + let db = Omnigraph::open_read_only(uri.to_string_lossy().as_ref()) + .await + .unwrap(); + use sha2::{Digest, Sha256}; + let digest = Sha256::digest(db.schema_source().as_bytes()); + digest.iter().map(|byte| format!("{byte:02x}")).collect() +} + +/// Crash before the engine schema apply: sidecar (with actor) survives, the +/// live schema and ledger are untouched; the next run's sweep retires the +/// stale intent and the same run applies and converges. +#[tokio::test] +async fn schema_crash_before_apply_recovers_via_sweep() { + let scenario = FailScenario::setup(); + let dir = fixture(); + converge_with_live_graph(dir.path()).await; + let pre_digest = live_schema_digest(dir.path()).await; + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.before_schema_apply", "return"); + let out = apply_config_dir_with_options( + dir.path(), + ApplyOptions { + actor: Some("test-actor".to_string()), + }, + ) + .await; + assert!(!out.ok); + assert_eq!(out.actor.as_deref(), Some("test-actor")); + let sidecars = recovery_sidecars(dir.path()); + assert_eq!(sidecars.len(), 1); + let sidecar: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); + assert_eq!(sidecar["kind"], "schema_apply"); + assert_eq!(sidecar["actor"], "test-actor"); + // Nothing moved. + assert_eq!(live_schema_digest(dir.path()).await, pre_digest); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!(recovered.converged); + assert!(recovery_sidecars(dir.path()).is_empty()); + assert_ne!(live_schema_digest(dir.path()).await, pre_digest); + scenario.teardown(); +} + +/// Crash after the engine schema apply, before the state CAS: the manifest +/// moved, the ledger is stale, nothing acknowledged; the next run's sweep +/// rolls the ledger forward with an audit entry and the run converges. +#[tokio::test] +async fn schema_crash_after_apply_rolls_state_forward() { + let scenario = FailScenario::setup(); + let dir = fixture(); + converge_with_live_graph(dir.path()).await; + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + let state_before = fs::read(state_path(dir.path())).unwrap(); + let desired = validate_config_dir(dir.path()); + let v2_digest = desired.resource_digests["schema.knowledge"].clone(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.after_schema_apply", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(!out.state_written); + // The live schema moved; the ledger is byte-identical (no ack). + assert_eq!(live_schema_digest(dir.path()).await, v2_digest); + assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before); + let sidecars = recovery_sidecars(dir.path()); + assert_eq!(sidecars.len(), 1); + let sidecar: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); + assert!(sidecar["expected_manifest_version"].is_number(), "{sidecar}"); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!( + recovered + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(recovered.converged); + assert!(recovery_sidecars(dir.path()).is_empty()); + let state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap(); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + v2_digest + ); + scenario.teardown(); +}