diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bbe5893..1ea6c37 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -173,15 +173,18 @@ jobs: OMNIGRAPH_UPDATE_OPENAPI: ${{ (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) && '1' || '' }} run: cargo test --workspace --locked - - name: Run failpoints feature test + - name: Run failpoints feature tests if: needs.classify_changes.outputs.run_full_ci == 'true' # Run after the workspace test so the build cache is warm — # enabling --features failpoints is just an incremental rebuild - # of omnigraph-engine + the small `fail` crate, not the full + # of the target crate + the small `fail` crate, not the full # dep tree (lance, datafusion). A separate job with its own # cache key would be a fresh ~20min build on first run; this - # is ~30s on a warm cache. - run: cargo test --locked -p omnigraph-engine --features failpoints --test failpoints + # is ~30s on a warm cache. The cluster feature does not enable + # omnigraph/failpoints, so each line rebuilds only its crate. + run: | + cargo test --locked -p omnigraph-engine --features failpoints --test failpoints + cargo test --locked -p omnigraph-cluster --features failpoints --test failpoints - name: Commit regenerated openapi.json to PR branch if: | diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index 3ede30c..05d2913 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -117,3 +117,102 @@ fn failpoint_wiring_returns_injected_diagnostic() { drop(_failpoint); scenario.teardown(); } + +/// Crash between the payload phase and the state write: blobs are on disk, +/// state.json is byte-identical, nothing is acknowledged — and a plain re-run +/// repairs by trusting the existing content-addressed blobs. +#[test] +fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { + let scenario = FailScenario::setup(); + let dir = fixture(); + let digests = seed_applyable_state(dir.path()); + let state_before = fs::read(state_path(dir.path())).unwrap(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + assert!(!out.state_written); + assert!(!out.converged); + assert_eq!(out.applied_count, 0); + // Persisted pre-apply snapshot: no phantom Applied statuses. + assert!( + !out.resource_statuses + .contains_key("query.knowledge.find_person"), + "{:?}", + out.resource_statuses + ); + // State has not moved; payloads are inert on disk; the lock released. + assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before); + assert!(query_blob(dir.path(), &digests).exists()); + assert!(!dir.path().join("__cluster/lock.json").exists()); + } + + // The repair is a plain re-run: existing blobs are trusted by digest. + let recovered = apply_config_dir(dir.path()); + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!(recovered.converged); + assert!(recovered.state_written); + assert_eq!( + recovered.resource_statuses["query.knowledge.find_person"].status, + omnigraph_cluster::ResourceLifecycleStatus::Applied + ); + scenario.teardown(); +} + +/// A concurrent writer mutating state.json between apply's read and its write +/// (possible under `state.lock: false`) must surface `state_cas_mismatch`, +/// acknowledge nothing, and leave the concurrent writer's state on disk. +#[test] +fn apply_cas_race_surfaces_state_cas_mismatch() { + let scenario = FailScenario::setup(); + let dir = fixture(); + let digests = seed_applyable_state(dir.path()); + + // Simulate the concurrent writer at the exact race window: rewrite + // state.json (valid JSON, graph/schema digests preserved, revision 99) + // after apply read it but before apply writes. + let race_path = state_path(dir.path()); + fail::cfg_callback("cluster_apply.before_state_write", move || { + let mut state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap(); + state["state_revision"] = serde_json::json!(99); + fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap(); + }) + .expect("configure callback failpoint"); + + let out = apply_config_dir(dir.path()); + fail::remove("cluster_apply.before_state_write"); + + assert!(!out.ok); + assert!(!out.state_written); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_cas_mismatch"), + "{:?}", + out.diagnostics + ); + // Persisted snapshot, not the unwritten in-memory mutations. + assert!( + !out.resource_statuses + .contains_key("query.knowledge.find_person") + ); + // The concurrent writer's state is what's on disk; apply's mutation never landed. + let state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap(); + assert_eq!(state["state_revision"], 99); + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_none() + ); + // Blobs written before the race are inert. + assert!(query_blob(dir.path(), &digests).exists()); + + // Recovery is a plain re-run against the rewritten state. + let recovered = apply_config_dir(dir.path()); + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!(recovered.converged); + scenario.teardown(); +}