test(cluster): failpoint tests for crash-mid-apply and state CAS race

The apply-side coverage the implementation spec's hard gate requires before
Phase 4 graph-moving apply:

- crash after the payload phase: state.json byte-identical, blobs inert on
  disk, lock released, no phantom statuses, nothing acknowledged; a plain
  re-run repairs via skip-if-exists blob reuse.
- CAS race: a cfg_callback rewrites state.json at the exact read->write
  window (the state.lock:false concurrent-writer scenario); apply surfaces
  state_cas_mismatch, acknowledges nothing, reports the persisted status
  snapshot, leaves the concurrent writer's state on disk; a re-run converges.

CI's failpoints step now runs both the engine and cluster suites.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 02:14:06 +03:00
parent 21b531605f
commit 211b37e6de
2 changed files with 106 additions and 4 deletions

View file

@ -173,15 +173,18 @@ jobs:
OMNIGRAPH_UPDATE_OPENAPI: ${{ (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) && '1' || '' }}
run: cargo test --workspace --locked
- name: Run failpoints feature test
- name: Run failpoints feature tests
if: needs.classify_changes.outputs.run_full_ci == 'true'
# Run after the workspace test so the build cache is warm —
# enabling --features failpoints is just an incremental rebuild
# of omnigraph-engine + the small `fail` crate, not the full
# of the target crate + the small `fail` crate, not the full
# dep tree (lance, datafusion). A separate job with its own
# cache key would be a fresh ~20min build on first run; this
# is ~30s on a warm cache.
run: cargo test --locked -p omnigraph-engine --features failpoints --test failpoints
# is ~30s on a warm cache. The cluster feature does not enable
# omnigraph/failpoints, so each line rebuilds only its crate.
run: |
cargo test --locked -p omnigraph-engine --features failpoints --test failpoints
cargo test --locked -p omnigraph-cluster --features failpoints --test failpoints
- name: Commit regenerated openapi.json to PR branch
if: |

View file

@ -117,3 +117,102 @@ fn failpoint_wiring_returns_injected_diagnostic() {
drop(_failpoint);
scenario.teardown();
}
/// Crash between the payload phase and the state write: blobs are on disk,
/// state.json is byte-identical, nothing is acknowledged — and a plain re-run
/// repairs by trusting the existing content-addressed blobs.
#[test]
fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
let scenario = FailScenario::setup();
let dir = fixture();
let digests = seed_applyable_state(dir.path());
let state_before = fs::read(state_path(dir.path())).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let out = apply_config_dir(dir.path());
assert!(!out.ok);
assert!(!out.state_written);
assert!(!out.converged);
assert_eq!(out.applied_count, 0);
// Persisted pre-apply snapshot: no phantom Applied statuses.
assert!(
!out.resource_statuses
.contains_key("query.knowledge.find_person"),
"{:?}",
out.resource_statuses
);
// State has not moved; payloads are inert on disk; the lock released.
assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before);
assert!(query_blob(dir.path(), &digests).exists());
assert!(!dir.path().join("__cluster/lock.json").exists());
}
// The repair is a plain re-run: existing blobs are trusted by digest.
let recovered = apply_config_dir(dir.path());
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(recovered.converged);
assert!(recovered.state_written);
assert_eq!(
recovered.resource_statuses["query.knowledge.find_person"].status,
omnigraph_cluster::ResourceLifecycleStatus::Applied
);
scenario.teardown();
}
/// A concurrent writer mutating state.json between apply's read and its write
/// (possible under `state.lock: false`) must surface `state_cas_mismatch`,
/// acknowledge nothing, and leave the concurrent writer's state on disk.
#[test]
fn apply_cas_race_surfaces_state_cas_mismatch() {
let scenario = FailScenario::setup();
let dir = fixture();
let digests = seed_applyable_state(dir.path());
// Simulate the concurrent writer at the exact race window: rewrite
// state.json (valid JSON, graph/schema digests preserved, revision 99)
// after apply read it but before apply writes.
let race_path = state_path(dir.path());
fail::cfg_callback("cluster_apply.before_state_write", move || {
let mut state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap();
state["state_revision"] = serde_json::json!(99);
fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap();
})
.expect("configure callback failpoint");
let out = apply_config_dir(dir.path());
fail::remove("cluster_apply.before_state_write");
assert!(!out.ok);
assert!(!out.state_written);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "state_cas_mismatch"),
"{:?}",
out.diagnostics
);
// Persisted snapshot, not the unwritten in-memory mutations.
assert!(
!out.resource_statuses
.contains_key("query.knowledge.find_person")
);
// The concurrent writer's state is what's on disk; apply's mutation never landed.
let state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap();
assert_eq!(state["state_revision"], 99);
assert!(
state["applied_revision"]["resources"]
.get("query.knowledge.find_person")
.is_none()
);
// Blobs written before the race are inert.
assert!(query_blob(dir.path(), &digests).exists());
// Recovery is a plain re-run against the rewritten state.
let recovered = apply_config_dir(dir.path());
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(recovered.converged);
scenario.teardown();
}