feat(cluster): failpoint infrastructure mirroring the engine

Optional failpoints feature (dep:fail + fail/failpoints, deliberately NOT
enabling omnigraph/failpoints), a maybe_fail/ScopedFailPoint module returning
Diagnostic-typed injected errors, and two call sites in apply_config_dir:
cluster_apply.after_payload_phase (the crash point: blobs on disk, state
untouched) and cluster_apply.before_state_write (routes through the
persisted-statuses revert contract; a cfg_callback here can mutate state.json
to make the CAS check fail organically). Feature off compiles to Ok(()) —
zero behavior change. Tests live in a separate integration binary because the
fail registry is process-global. Also refresh the crate description (stale
'read-only' since Stage 3A).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 02:12:59 +03:00
parent acb3f1cc14
commit 21b531605f
4 changed files with 192 additions and 2 deletions

View file

@ -2,15 +2,21 @@
name = "omnigraph-cluster"
version = "0.6.2"
edition = "2024"
description = "Read-only cluster configuration validation and planning for Omnigraph."
description = "Cluster configuration validation, planning, and config-only apply for Omnigraph."
license = "MIT"
repository = "https://github.com/ModernRelay/omnigraph"
homepage = "https://github.com/ModernRelay/omnigraph"
documentation = "https://docs.rs/omnigraph-cluster"
[features]
# Fault-injection hooks for the apply protocol (crash-mid-apply, CAS-race
# tests). Deliberately does NOT enable omnigraph/failpoints.
failpoints = ["dep:fail", "fail/failpoints"]
[dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" }
fail = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }

View file

@ -0,0 +1,42 @@
//! Fault-injection hooks for the cluster apply protocol, mirroring the
//! engine's `omnigraph::failpoints` pattern. With the `failpoints` feature
//! off, every call site compiles to `Ok(())`.
use crate::Diagnostic;
pub(crate) fn maybe_fail(_name: &str) -> Result<(), Diagnostic> {
#[cfg(feature = "failpoints")]
{
let name = _name;
fail::fail_point!(name, |_| {
return Err(Diagnostic::error(
"injected_failpoint",
name,
format!("injected failpoint triggered: {name}"),
));
});
}
Ok(())
}
#[cfg(feature = "failpoints")]
pub struct ScopedFailPoint {
name: String,
}
#[cfg(feature = "failpoints")]
impl ScopedFailPoint {
pub fn new(name: &str, action: &str) -> Self {
fail::cfg(name, action).expect("configure failpoint");
Self {
name: name.to_string(),
}
}
}
#[cfg(feature = "failpoints")]
impl Drop for ScopedFailPoint {
fn drop(&mut self) {
fail::remove(&self.name);
}
}

View file

@ -16,6 +16,8 @@ use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use ulid::Ulid;
pub mod failpoints;
pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml";
pub const CLUSTER_GRAPHS_DIR: &str = "graphs";
pub const CLUSTER_STATE_DIR: &str = "__cluster";
@ -770,6 +772,21 @@ pub fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
);
}
// Crash point: payloads are on disk, state has not moved. A failure here
// must leave state.json byte-identical and acknowledge nothing; re-running
// apply repairs via the skip-if-exists blob reuse.
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_payload_phase") {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
Some(desired.config_digest),
observations,
changes,
state.resource_statuses,
diagnostics,
);
}
// State mutation. Apply owns query/policy statuses only; graph/schema
// statuses belong to refresh/import observation and must not be clobbered.
let before_value =
@ -824,7 +841,13 @@ pub fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
let mut state_write_failed = false;
if after_value != before_value {
new_state.state_revision = new_state.state_revision.saturating_add(1);
match backend.write_state(&new_state, expected_cas.as_deref(), &mut observations) {
// The failpoint error routes through state_write_failed so the
// persisted-statuses revert contract below is exercised; a cfg_callback
// on this point can mutate state.json to simulate a concurrent writer,
// making write_state's CAS check fail organically.
let write_result = failpoints::maybe_fail("cluster_apply.before_state_write")
.and_then(|()| backend.write_state(&new_state, expected_cas.as_deref(), &mut observations));
match write_result {
Ok(()) => state_written = true,
Err(diagnostic) => {
diagnostics.push(diagnostic);

View file

@ -0,0 +1,119 @@
//! Fault-injection tests for the cluster apply protocol.
//!
//! These live in an integration binary (not in-source) deliberately: the fail
//! crate's registry is process-global, so a configured `cluster_apply.*`
//! action would fire inside any concurrently running normal apply test in the
//! lib-test process. A separate binary isolates the registry by construction —
//! same reason the engine keeps its failpoint suite in `tests/failpoints.rs`.
#![cfg(feature = "failpoints")]
use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use fail::FailScenario;
use omnigraph_cluster::failpoints::ScopedFailPoint;
use omnigraph_cluster::{apply_config_dir, validate_config_dir};
use tempfile::tempdir;
const SCHEMA: &str = r#"
node Person {
name: String @key
age: I32?
}
"#;
const QUERY: &str = r#"
query find_person($name: String) {
match { $p: Person { name: $name } }
return { $p.name, $p.age }
}
"#;
fn fixture() -> tempfile::TempDir {
let dir = tempdir().unwrap();
fs::write(dir.path().join("people.pg"), SCHEMA).unwrap();
fs::write(dir.path().join("people.gq"), QUERY).unwrap();
fs::write(dir.path().join("base.policy.yaml"), "rules: []\n").unwrap();
fs::write(
dir.path().join("cluster.yaml"),
r#"
version: 1
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
base:
file: ./base.policy.yaml
applies_to: [knowledge]
"#,
)
.unwrap();
dir
}
/// Seed a state.json where the graph/schema digests match desired, so query
/// and policy changes are applicable. Digests are borrowed from the public
/// validate output; the graph composite is a placeholder that apply converges
/// as a Derived update.
fn seed_applyable_state(config_dir: &Path) -> BTreeMap<String, String> {
let validate = validate_config_dir(config_dir);
assert!(validate.ok, "{:?}", validate.diagnostics);
let schema_digest = validate.resource_digests["schema.knowledge"].clone();
let state_dir = config_dir.join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
format!(
r#"{{
"version": 1,
"state_revision": 1,
"applied_revision": {{
"resources": {{
"graph.knowledge": {{ "digest": "seed" }},
"schema.knowledge": {{ "digest": "{schema_digest}" }}
}}
}}
}}
"#
),
)
.unwrap();
validate.resource_digests
}
fn state_path(config_dir: &Path) -> PathBuf {
config_dir.join("__cluster/state.json")
}
fn query_blob(config_dir: &Path, digests: &BTreeMap<String, String>) -> PathBuf {
config_dir
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{}.gq", digests["query.knowledge.find_person"]))
}
#[test]
fn failpoint_wiring_returns_injected_diagnostic() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_applyable_state(dir.path());
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let out = apply_config_dir(dir.path());
assert!(!out.ok);
assert!(out.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "injected_failpoint"
&& diagnostic
.message
.contains("cluster_apply.after_payload_phase")
}));
drop(_failpoint);
scenario.teardown();
}