From 21b531605fed16b486280ba3e2944e7f15a1604e Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 02:12:59 +0300 Subject: [PATCH] feat(cluster): failpoint infrastructure mirroring the engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Optional failpoints feature (dep:fail + fail/failpoints, deliberately NOT enabling omnigraph/failpoints), a maybe_fail/ScopedFailPoint module returning Diagnostic-typed injected errors, and two call sites in apply_config_dir: cluster_apply.after_payload_phase (the crash point: blobs on disk, state untouched) and cluster_apply.before_state_write (routes through the persisted-statuses revert contract; a cfg_callback here can mutate state.json to make the CAS check fail organically). Feature off compiles to Ok(()) — zero behavior change. Tests live in a separate integration binary because the fail registry is process-global. Also refresh the crate description (stale 'read-only' since Stage 3A). Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/Cargo.toml | 8 +- crates/omnigraph-cluster/src/failpoints.rs | 42 +++++++ crates/omnigraph-cluster/src/lib.rs | 25 +++- crates/omnigraph-cluster/tests/failpoints.rs | 119 +++++++++++++++++++ 4 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 crates/omnigraph-cluster/src/failpoints.rs create mode 100644 crates/omnigraph-cluster/tests/failpoints.rs diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml index 9280c42..b5f99c9 100644 --- a/crates/omnigraph-cluster/Cargo.toml +++ b/crates/omnigraph-cluster/Cargo.toml @@ -2,15 +2,21 @@ name = "omnigraph-cluster" version = "0.6.2" edition = "2024" -description = "Read-only cluster configuration validation and planning for Omnigraph." +description = "Cluster configuration validation, planning, and config-only apply for Omnigraph." license = "MIT" repository = "https://github.com/ModernRelay/omnigraph" homepage = "https://github.com/ModernRelay/omnigraph" documentation = "https://docs.rs/omnigraph-cluster" +[features] +# Fault-injection hooks for the apply protocol (crash-mid-apply, CAS-race +# tests). Deliberately does NOT enable omnigraph/failpoints. +failpoints = ["dep:fail", "fail/failpoints"] + [dependencies] omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" } omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" } +fail = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } diff --git a/crates/omnigraph-cluster/src/failpoints.rs b/crates/omnigraph-cluster/src/failpoints.rs new file mode 100644 index 0000000..c6d445b --- /dev/null +++ b/crates/omnigraph-cluster/src/failpoints.rs @@ -0,0 +1,42 @@ +//! Fault-injection hooks for the cluster apply protocol, mirroring the +//! engine's `omnigraph::failpoints` pattern. With the `failpoints` feature +//! off, every call site compiles to `Ok(())`. + +use crate::Diagnostic; + +pub(crate) fn maybe_fail(_name: &str) -> Result<(), Diagnostic> { + #[cfg(feature = "failpoints")] + { + let name = _name; + fail::fail_point!(name, |_| { + return Err(Diagnostic::error( + "injected_failpoint", + name, + format!("injected failpoint triggered: {name}"), + )); + }); + } + Ok(()) +} + +#[cfg(feature = "failpoints")] +pub struct ScopedFailPoint { + name: String, +} + +#[cfg(feature = "failpoints")] +impl ScopedFailPoint { + pub fn new(name: &str, action: &str) -> Self { + fail::cfg(name, action).expect("configure failpoint"); + Self { + name: name.to_string(), + } + } +} + +#[cfg(feature = "failpoints")] +impl Drop for ScopedFailPoint { + fn drop(&mut self) { + fail::remove(&self.name); + } +} diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 84968a7..660f34c 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -16,6 +16,8 @@ use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; use ulid::Ulid; +pub mod failpoints; + pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml"; pub const CLUSTER_GRAPHS_DIR: &str = "graphs"; pub const CLUSTER_STATE_DIR: &str = "__cluster"; @@ -770,6 +772,21 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ); } + // Crash point: payloads are on disk, state has not moved. A failure here + // must leave state.json byte-identical and acknowledge nothing; re-running + // apply repairs via the skip-if-exists blob reuse. + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_payload_phase") { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + // State mutation. Apply owns query/policy statuses only; graph/schema // statuses belong to refresh/import observation and must not be clobbered. let before_value = @@ -824,7 +841,13 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { let mut state_write_failed = false; if after_value != before_value { new_state.state_revision = new_state.state_revision.saturating_add(1); - match backend.write_state(&new_state, expected_cas.as_deref(), &mut observations) { + // The failpoint error routes through state_write_failed so the + // persisted-statuses revert contract below is exercised; a cfg_callback + // on this point can mutate state.json to simulate a concurrent writer, + // making write_state's CAS check fail organically. + let write_result = failpoints::maybe_fail("cluster_apply.before_state_write") + .and_then(|()| backend.write_state(&new_state, expected_cas.as_deref(), &mut observations)); + match write_result { Ok(()) => state_written = true, Err(diagnostic) => { diagnostics.push(diagnostic); diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs new file mode 100644 index 0000000..3ede30c --- /dev/null +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -0,0 +1,119 @@ +//! Fault-injection tests for the cluster apply protocol. +//! +//! These live in an integration binary (not in-source) deliberately: the fail +//! crate's registry is process-global, so a configured `cluster_apply.*` +//! action would fire inside any concurrently running normal apply test in the +//! lib-test process. A separate binary isolates the registry by construction — +//! same reason the engine keeps its failpoint suite in `tests/failpoints.rs`. + +#![cfg(feature = "failpoints")] + +use std::collections::BTreeMap; +use std::fs; +use std::path::{Path, PathBuf}; + +use fail::FailScenario; +use omnigraph_cluster::failpoints::ScopedFailPoint; +use omnigraph_cluster::{apply_config_dir, validate_config_dir}; +use tempfile::tempdir; + +const SCHEMA: &str = r#" +node Person { + name: String @key + age: I32? +} +"#; + +const QUERY: &str = r#" +query find_person($name: String) { + match { $p: Person { name: $name } } + return { $p.name, $p.age } +} +"#; + +fn fixture() -> tempfile::TempDir { + let dir = tempdir().unwrap(); + fs::write(dir.path().join("people.pg"), SCHEMA).unwrap(); + fs::write(dir.path().join("people.gq"), QUERY).unwrap(); + fs::write(dir.path().join("base.policy.yaml"), "rules: []\n").unwrap(); + fs::write( + dir.path().join("cluster.yaml"), + r#" +version: 1 +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +policies: + base: + file: ./base.policy.yaml + applies_to: [knowledge] +"#, + ) + .unwrap(); + dir +} + +/// Seed a state.json where the graph/schema digests match desired, so query +/// and policy changes are applicable. Digests are borrowed from the public +/// validate output; the graph composite is a placeholder that apply converges +/// as a Derived update. +fn seed_applyable_state(config_dir: &Path) -> BTreeMap { + let validate = validate_config_dir(config_dir); + assert!(validate.ok, "{:?}", validate.diagnostics); + let schema_digest = validate.resource_digests["schema.knowledge"].clone(); + let state_dir = config_dir.join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + format!( + r#"{{ + "version": 1, + "state_revision": 1, + "applied_revision": {{ + "resources": {{ + "graph.knowledge": {{ "digest": "seed" }}, + "schema.knowledge": {{ "digest": "{schema_digest}" }} + }} + }} +}} +"# + ), + ) + .unwrap(); + validate.resource_digests +} + +fn state_path(config_dir: &Path) -> PathBuf { + config_dir.join("__cluster/state.json") +} + +fn query_blob(config_dir: &Path, digests: &BTreeMap) -> PathBuf { + config_dir + .join("__cluster/resources/query/knowledge/find_person") + .join(format!("{}.gq", digests["query.knowledge.find_person"])) +} + +#[test] +fn failpoint_wiring_returns_injected_diagnostic() { + let scenario = FailScenario::setup(); + let dir = fixture(); + seed_applyable_state(dir.path()); + + let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "injected_failpoint" + && diagnostic + .message + .contains("cluster_apply.after_payload_phase") + })); + drop(_failpoint); + scenario.teardown(); +}