From 15868972ff5ad492ababba36bade9a52fe3cd5fb Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 02:07:08 +0300 Subject: [PATCH 1/7] feat(cluster): verify catalog payload blobs in status and refresh MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the Stage 3A product gap where a deleted or corrupted blob under __cluster/resources/ went unnoticed forever (status reported converged and apply could not repair it because the digests matched). verify_catalog_payloads checks every query/policy digest in state against its content-addressed blob (existence + full sha256 re-hash; graph/schema/unknown addresses have no payloads and are skipped). status reports findings read-only (warnings catalog_payload_missing/_mismatch; error catalog_payload_read_error — an unverifiable catalog must not report healthy). refresh closes the self-heal loop: missing/mismatched blobs mark the resource drifted and remove its digest from state so the next plan proposes a create and the next apply republishes; unreadable blobs keep the digest (no spurious republish), mark error, and exit non-zero. Verification runs before graph observation so the recomputed graph composite already excludes removed query digests. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/lib.rs | 281 ++++++++++++++++++++++++++++ docs/user/cluster-config.md | 33 +++- 2 files changed, 312 insertions(+), 2 deletions(-) diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 3673194..84968a7 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -890,6 +890,14 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { match backend.read_state(&mut observations) { Ok(snapshot) => { if let Some(state) = snapshot.state { + // Read-only point-in-time catalog check: report the + // findings as diagnostics; persisting Drifted statuses + // is refresh's job. Status never writes state. + for (address, finding) in + verify_catalog_payloads(&parsed.config_dir, &state) + { + diagnostics.push(payload_finding_diagnostic(&address, &finding)); + } resource_digests = state_resource_digests(&state); resource_statuses = state.resource_statuses; state_observation_records = state.observations; @@ -1076,6 +1084,47 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St (StateSyncOperation::Import, None) => initial_import_state(&desired), }; + // Catalog payload verification must run BEFORE graph observation: removing + // a drifted query digest first means the live-graph composite recompute + // below already excludes it, so the persisted graph. composite stays + // consistent and the next plan shows exactly the create + derived update. + for (address, finding) in verify_catalog_payloads(&desired.config_dir, &state) { + diagnostics.push(payload_finding_diagnostic(&address, &finding)); + match finding { + PayloadFinding::Missing => { + state.applied_revision.resources.remove(&address); + set_resource_status( + &mut state, + &address, + ResourceLifecycleStatus::Drifted, + "payload_missing", + "catalog payload blob is missing; re-run `cluster apply` to republish", + ); + } + PayloadFinding::Mismatch { .. } => { + state.applied_revision.resources.remove(&address); + set_resource_status( + &mut state, + &address, + ResourceLifecycleStatus::Drifted, + "payload_mismatch", + "catalog payload blob does not match the recorded digest; re-run `cluster apply` to republish", + ); + } + // Transient IO must not trigger a spurious republish: keep the + // digest, surface the error, let a later clean refresh converge. + PayloadFinding::ReadError(error) => { + set_resource_status( + &mut state, + &address, + ResourceLifecycleStatus::Error, + "payload_read_error", + &error, + ); + } + } + } + let graph_error_count = observe_declared_graphs(&desired, &mut state).await; if graph_error_count > 0 { diagnostics.push(Diagnostic::error( @@ -2371,6 +2420,73 @@ fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option< } } +#[derive(Debug, PartialEq, Eq)] +enum PayloadFinding { + Missing, + Mismatch { actual_digest: String }, + ReadError(String), +} + +/// Verify every catalog-backed resource digest in state against its +/// content-addressed blob under `__cluster/resources/`. Graph, schema, and +/// unknown addresses have no payloads and are skipped. Read-only; findings +/// are deterministic (BTreeMap order). Payloads are small (queries, policy +/// bundles), so a full digest re-hash is cheap. +fn verify_catalog_payloads( + config_dir: &Path, + state: &ClusterState, +) -> Vec<(String, PayloadFinding)> { + let mut findings = Vec::new(); + for (address, resource) in &state.applied_revision.resources { + let kind = resource_kind(address); + let Some(path) = payload_path(config_dir, &kind, &resource.digest) else { + continue; + }; + match fs::read(&path) { + Ok(bytes) => { + let actual_digest = sha256_hex(&bytes); + if actual_digest != resource.digest { + findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest })); + } + } + Err(err) if err.kind() == ErrorKind::NotFound => { + findings.push((address.clone(), PayloadFinding::Missing)); + } + Err(err) => { + findings.push(( + address.clone(), + PayloadFinding::ReadError(format!( + "could not read catalog payload '{}': {err}", + path.display() + )), + )); + } + } + } + findings +} + +fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagnostic { + match finding { + PayloadFinding::Missing => Diagnostic::warning( + "catalog_payload_missing", + address, + "catalog payload blob is missing; re-run `cluster apply` to republish", + ), + PayloadFinding::Mismatch { actual_digest } => Diagnostic::warning( + "catalog_payload_mismatch", + address, + format!( + "catalog payload blob does not match the recorded digest (actual sha256:{actual_digest}); re-run `cluster apply` to republish" + ), + ), + // An unverifiable blob must not report healthy. + PayloadFinding::ReadError(error) => { + Diagnostic::error("catalog_payload_read_error", address, error.clone()) + } + } +} + /// Write one content-addressed payload blob. Idempotent: an existing /// digest-named file is trusted as-is. The digest re-check is the apply-side /// TOCTOU detector — the source file changing between `load_desired` and the @@ -4317,6 +4433,171 @@ graphs: ); } + // ---- catalog payload verification (Stage 3B) ---- + + /// Converge a fixture dir and return the query blob path. + fn converge_fixture(config_dir: &Path) -> std::path::PathBuf { + write_applyable_state(config_dir); + let out = apply_config_dir(config_dir); + assert!(out.ok && out.converged, "{:?}", out.diagnostics); + let desired = validate_config_dir(config_dir); + query_payload_path( + config_dir, + desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap(), + ) + } + + #[test] + fn status_reports_missing_payload_read_only() { + let dir = fixture(); + let blob = converge_fixture(dir.path()); + let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); + fs::remove_file(&blob).unwrap(); + + let out = status_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "catalog_payload_missing" + && diagnostic.path == "query.knowledge.find_person" + })); + // Read-only: persisted statuses and state bytes untouched. + assert_eq!( + out.resource_statuses["query.knowledge.find_person"].status, + ResourceLifecycleStatus::Applied + ); + assert_eq!( + fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(), + state_before + ); + } + + #[tokio::test] + async fn refresh_removes_digest_and_drifts_on_missing_payload() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let blob = converge_fixture(dir.path()); + fs::remove_file(&blob).unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "catalog_payload_missing") + ); + let status = &out.resource_statuses["query.knowledge.find_person"]; + assert_eq!(status.status, ResourceLifecycleStatus::Drifted); + assert!(status.conditions.contains(&"payload_missing".to_string())); + let state = read_state_json(dir.path()); + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_none(), + "{state}" + ); + } + + #[tokio::test] + async fn refresh_drifts_on_corrupted_payload() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let blob = converge_fixture(dir.path()); + fs::write(&blob, "corrupted content").unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + let status = &out.resource_statuses["query.knowledge.find_person"]; + assert_eq!(status.status, ResourceLifecycleStatus::Drifted); + assert!(status.conditions.contains(&"payload_mismatch".to_string())); + let state = read_state_json(dir.path()); + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_none() + ); + } + + #[tokio::test] + async fn refresh_flags_unreadable_payload_as_error() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let blob = converge_fixture(dir.path()); + // A same-named directory yields a non-NotFound IO error portably. + fs::remove_file(&blob).unwrap(); + fs::create_dir(&blob).unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "catalog_payload_read_error") + ); + let status = &out.resource_statuses["query.knowledge.find_person"]; + assert_eq!(status.status, ResourceLifecycleStatus::Error); + assert!(status.conditions.contains(&"payload_read_error".to_string())); + // Transient IO keeps the digest: no spurious republish. + let state = read_state_json(dir.path()); + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_some() + ); + } + + #[tokio::test] + async fn payload_drift_self_heals_through_refresh_plan_apply() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let blob = converge_fixture(dir.path()); + let original = fs::read_to_string(&blob).unwrap(); + fs::remove_file(&blob).unwrap(); + + let refresh = refresh_config_dir(dir.path()).await; + assert!(refresh.ok, "{:?}", refresh.diagnostics); + + let plan = plan_config_dir(dir.path()); + let query_change = plan + .changes + .iter() + .find(|change| change.resource == "query.knowledge.find_person") + .expect("plan must propose recreating the query"); + assert_eq!(query_change.operation, PlanOperation::Create); + assert_eq!(query_change.disposition, Some(ApplyDisposition::Applied)); + + let apply = apply_config_dir(dir.path()); + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + assert_eq!(fs::read_to_string(&blob).unwrap(), original); + + let status = status_config_dir(dir.path()); + assert!( + !status + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code.starts_with("catalog_payload")), + "{:?}", + status.diagnostics + ); + } + + #[test] + fn verification_skips_graph_and_schema_resources() { + let dir = fixture(); + write_applyable_state(dir.path()); // graph + schema digests only, no blobs + + let out = status_config_dir(dir.path()); + assert!( + !out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code.starts_with("catalog_payload")), + "{:?}", + out.diagnostics + ); + } + #[test] fn plan_annotates_apply_dispositions() { let dir = fixture(); diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 912f307..9a2597b 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -198,6 +198,16 @@ files and does not inspect live graphs. Missing `state.json` succeeds with a warning; invalid state JSON or an unsupported state version fails. If a lock is present, status reports its id, operation, creation time, pid, and age. +Status also verifies the catalog payloads read-only: every query/policy digest +recorded in state is checked against its content-addressed blob under +`__cluster/resources/` (existence and full digest re-hash). A missing or +mismatched blob is reported as a warning (`catalog_payload_missing` / +`catalog_payload_mismatch`); an unreadable blob is an error +(`catalog_payload_read_error`) because an unverifiable catalog must not report +healthy. Status never writes state — persisting the `drifted` condition is +refresh's job. The check runs without the state lock, so it is a point-in-time +report. + ## Refresh And Import `cluster refresh` updates an existing `state.json` from actual observations. @@ -216,8 +226,27 @@ Invalid graph roots are recorded as errors; `refresh` persists the error observation and exits non-zero, while `import` exits non-zero without creating initial state. -Refresh/import do not observe query or policy resources yet. Existing query and -policy state digests are preserved on refresh and are not invented on import. +Refresh also verifies the catalog payloads of every query/policy digest +recorded in state (the same check `cluster status` reports read-only), and +closes the loop: + +- a **missing** or **digest-mismatched** blob marks the resource `drifted` + (condition `payload_missing` / `payload_mismatch`) and removes its digest + from state — so the next `cluster plan` proposes a create and the next + `cluster apply` republishes the blob (the self-heal loop, mirroring how a + missing graph root is handled); +- an **unreadable** blob (IO error other than not-found) keeps the digest, + marks the resource `error` (condition `payload_read_error`), and exits + non-zero — transient IO must not trigger a spurious republish. + +Upgrade note: a state ledger written before catalog publish existed records +query/policy digests with no blobs on disk; the first refresh after upgrading +flags them all `payload_missing`, and a single `cluster apply` republishes +everything and converges. + +Refresh/import do not observe query or policy resources beyond their catalog +payloads yet. Existing query and policy state digests are preserved on refresh +(unless their payload drifted, above) and are not invented on import. ## Force Unlock From acb3f1cc14552d4b55a713de9c773d100fd3a25d Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 02:08:14 +0300 Subject: [PATCH 2/7] test(cli): e2e for catalog payload drift self-heal loop status warns read-only -> refresh persists drift and drops the digest -> apply republishes the blob -> status clean. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/tests/cli.rs | 61 +++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index f60ffbe..d47e13c 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1435,6 +1435,67 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { ); } +/// Catalog payload drift self-heals across the command surface: status warns +/// read-only, refresh persists the drift and drops the digest, apply +/// republishes the blob, status comes back clean. +#[test] +fn cluster_e2e_payload_drift_self_heals() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["converged"], true, "{apply}"); + + let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"] + .as_str() + .unwrap() + .to_string(); + let blob = temp + .path() + .join("__cluster/resources/query/knowledge/find_person") + .join(format!("{query_digest}.gq")); + fs::remove_file(&blob).unwrap(); + + let status = cluster_json(temp.path(), "status"); + assert_eq!(status["ok"], true, "{status}"); + assert!( + status["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "catalog_payload_missing"), + "{status}" + ); + + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + assert_eq!( + refresh["resource_statuses"]["query.knowledge.find_person"]["status"], + "drifted" + ); + + let heal = cluster_json(temp.path(), "apply"); + assert_eq!(heal["ok"], true, "{heal}"); + assert_eq!(heal["converged"], true, "{heal}"); + assert!(blob.exists(), "blob republished"); + + let clean = cluster_json(temp.path(), "status"); + assert!( + !clean["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| { + diagnostic["code"] + .as_str() + .is_some_and(|code| code.starts_with("catalog_payload")) + }), + "{clean}" + ); +} + #[test] fn short_version_flag_prints_current_cli_version() { let output = output_success(cli().arg("-v")); From 21b531605fed16b486280ba3e2944e7f15a1604e Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 02:12:59 +0300 Subject: [PATCH 3/7] feat(cluster): failpoint infrastructure mirroring the engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Optional failpoints feature (dep:fail + fail/failpoints, deliberately NOT enabling omnigraph/failpoints), a maybe_fail/ScopedFailPoint module returning Diagnostic-typed injected errors, and two call sites in apply_config_dir: cluster_apply.after_payload_phase (the crash point: blobs on disk, state untouched) and cluster_apply.before_state_write (routes through the persisted-statuses revert contract; a cfg_callback here can mutate state.json to make the CAS check fail organically). Feature off compiles to Ok(()) — zero behavior change. Tests live in a separate integration binary because the fail registry is process-global. Also refresh the crate description (stale 'read-only' since Stage 3A). Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/Cargo.toml | 8 +- crates/omnigraph-cluster/src/failpoints.rs | 42 +++++++ crates/omnigraph-cluster/src/lib.rs | 25 +++- crates/omnigraph-cluster/tests/failpoints.rs | 119 +++++++++++++++++++ 4 files changed, 192 insertions(+), 2 deletions(-) create mode 100644 crates/omnigraph-cluster/src/failpoints.rs create mode 100644 crates/omnigraph-cluster/tests/failpoints.rs diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml index 9280c42..b5f99c9 100644 --- a/crates/omnigraph-cluster/Cargo.toml +++ b/crates/omnigraph-cluster/Cargo.toml @@ -2,15 +2,21 @@ name = "omnigraph-cluster" version = "0.6.2" edition = "2024" -description = "Read-only cluster configuration validation and planning for Omnigraph." +description = "Cluster configuration validation, planning, and config-only apply for Omnigraph." license = "MIT" repository = "https://github.com/ModernRelay/omnigraph" homepage = "https://github.com/ModernRelay/omnigraph" documentation = "https://docs.rs/omnigraph-cluster" +[features] +# Fault-injection hooks for the apply protocol (crash-mid-apply, CAS-race +# tests). Deliberately does NOT enable omnigraph/failpoints. +failpoints = ["dep:fail", "fail/failpoints"] + [dependencies] omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" } omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" } +fail = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } diff --git a/crates/omnigraph-cluster/src/failpoints.rs b/crates/omnigraph-cluster/src/failpoints.rs new file mode 100644 index 0000000..c6d445b --- /dev/null +++ b/crates/omnigraph-cluster/src/failpoints.rs @@ -0,0 +1,42 @@ +//! Fault-injection hooks for the cluster apply protocol, mirroring the +//! engine's `omnigraph::failpoints` pattern. With the `failpoints` feature +//! off, every call site compiles to `Ok(())`. + +use crate::Diagnostic; + +pub(crate) fn maybe_fail(_name: &str) -> Result<(), Diagnostic> { + #[cfg(feature = "failpoints")] + { + let name = _name; + fail::fail_point!(name, |_| { + return Err(Diagnostic::error( + "injected_failpoint", + name, + format!("injected failpoint triggered: {name}"), + )); + }); + } + Ok(()) +} + +#[cfg(feature = "failpoints")] +pub struct ScopedFailPoint { + name: String, +} + +#[cfg(feature = "failpoints")] +impl ScopedFailPoint { + pub fn new(name: &str, action: &str) -> Self { + fail::cfg(name, action).expect("configure failpoint"); + Self { + name: name.to_string(), + } + } +} + +#[cfg(feature = "failpoints")] +impl Drop for ScopedFailPoint { + fn drop(&mut self) { + fail::remove(&self.name); + } +} diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 84968a7..660f34c 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -16,6 +16,8 @@ use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; use ulid::Ulid; +pub mod failpoints; + pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml"; pub const CLUSTER_GRAPHS_DIR: &str = "graphs"; pub const CLUSTER_STATE_DIR: &str = "__cluster"; @@ -770,6 +772,21 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ); } + // Crash point: payloads are on disk, state has not moved. A failure here + // must leave state.json byte-identical and acknowledge nothing; re-running + // apply repairs via the skip-if-exists blob reuse. + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_payload_phase") { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + // State mutation. Apply owns query/policy statuses only; graph/schema // statuses belong to refresh/import observation and must not be clobbered. let before_value = @@ -824,7 +841,13 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { let mut state_write_failed = false; if after_value != before_value { new_state.state_revision = new_state.state_revision.saturating_add(1); - match backend.write_state(&new_state, expected_cas.as_deref(), &mut observations) { + // The failpoint error routes through state_write_failed so the + // persisted-statuses revert contract below is exercised; a cfg_callback + // on this point can mutate state.json to simulate a concurrent writer, + // making write_state's CAS check fail organically. + let write_result = failpoints::maybe_fail("cluster_apply.before_state_write") + .and_then(|()| backend.write_state(&new_state, expected_cas.as_deref(), &mut observations)); + match write_result { Ok(()) => state_written = true, Err(diagnostic) => { diagnostics.push(diagnostic); diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs new file mode 100644 index 0000000..3ede30c --- /dev/null +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -0,0 +1,119 @@ +//! Fault-injection tests for the cluster apply protocol. +//! +//! These live in an integration binary (not in-source) deliberately: the fail +//! crate's registry is process-global, so a configured `cluster_apply.*` +//! action would fire inside any concurrently running normal apply test in the +//! lib-test process. A separate binary isolates the registry by construction — +//! same reason the engine keeps its failpoint suite in `tests/failpoints.rs`. + +#![cfg(feature = "failpoints")] + +use std::collections::BTreeMap; +use std::fs; +use std::path::{Path, PathBuf}; + +use fail::FailScenario; +use omnigraph_cluster::failpoints::ScopedFailPoint; +use omnigraph_cluster::{apply_config_dir, validate_config_dir}; +use tempfile::tempdir; + +const SCHEMA: &str = r#" +node Person { + name: String @key + age: I32? +} +"#; + +const QUERY: &str = r#" +query find_person($name: String) { + match { $p: Person { name: $name } } + return { $p.name, $p.age } +} +"#; + +fn fixture() -> tempfile::TempDir { + let dir = tempdir().unwrap(); + fs::write(dir.path().join("people.pg"), SCHEMA).unwrap(); + fs::write(dir.path().join("people.gq"), QUERY).unwrap(); + fs::write(dir.path().join("base.policy.yaml"), "rules: []\n").unwrap(); + fs::write( + dir.path().join("cluster.yaml"), + r#" +version: 1 +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +policies: + base: + file: ./base.policy.yaml + applies_to: [knowledge] +"#, + ) + .unwrap(); + dir +} + +/// Seed a state.json where the graph/schema digests match desired, so query +/// and policy changes are applicable. Digests are borrowed from the public +/// validate output; the graph composite is a placeholder that apply converges +/// as a Derived update. +fn seed_applyable_state(config_dir: &Path) -> BTreeMap { + let validate = validate_config_dir(config_dir); + assert!(validate.ok, "{:?}", validate.diagnostics); + let schema_digest = validate.resource_digests["schema.knowledge"].clone(); + let state_dir = config_dir.join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + format!( + r#"{{ + "version": 1, + "state_revision": 1, + "applied_revision": {{ + "resources": {{ + "graph.knowledge": {{ "digest": "seed" }}, + "schema.knowledge": {{ "digest": "{schema_digest}" }} + }} + }} +}} +"# + ), + ) + .unwrap(); + validate.resource_digests +} + +fn state_path(config_dir: &Path) -> PathBuf { + config_dir.join("__cluster/state.json") +} + +fn query_blob(config_dir: &Path, digests: &BTreeMap) -> PathBuf { + config_dir + .join("__cluster/resources/query/knowledge/find_person") + .join(format!("{}.gq", digests["query.knowledge.find_person"])) +} + +#[test] +fn failpoint_wiring_returns_injected_diagnostic() { + let scenario = FailScenario::setup(); + let dir = fixture(); + seed_applyable_state(dir.path()); + + let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "injected_failpoint" + && diagnostic + .message + .contains("cluster_apply.after_payload_phase") + })); + drop(_failpoint); + scenario.teardown(); +} From 211b37e6de8977b5264a179c10e6383f66d6b711 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 02:14:06 +0300 Subject: [PATCH 4/7] test(cluster): failpoint tests for crash-mid-apply and state CAS race The apply-side coverage the implementation spec's hard gate requires before Phase 4 graph-moving apply: - crash after the payload phase: state.json byte-identical, blobs inert on disk, lock released, no phantom statuses, nothing acknowledged; a plain re-run repairs via skip-if-exists blob reuse. - CAS race: a cfg_callback rewrites state.json at the exact read->write window (the state.lock:false concurrent-writer scenario); apply surfaces state_cas_mismatch, acknowledges nothing, reports the persisted status snapshot, leaves the concurrent writer's state on disk; a re-run converges. CI's failpoints step now runs both the engine and cluster suites. Co-Authored-By: Claude Fable 5 --- .github/workflows/ci.yml | 11 ++- crates/omnigraph-cluster/tests/failpoints.rs | 99 ++++++++++++++++++++ 2 files changed, 106 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bbe5893..1ea6c37 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -173,15 +173,18 @@ jobs: OMNIGRAPH_UPDATE_OPENAPI: ${{ (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) && '1' || '' }} run: cargo test --workspace --locked - - name: Run failpoints feature test + - name: Run failpoints feature tests if: needs.classify_changes.outputs.run_full_ci == 'true' # Run after the workspace test so the build cache is warm — # enabling --features failpoints is just an incremental rebuild - # of omnigraph-engine + the small `fail` crate, not the full + # of the target crate + the small `fail` crate, not the full # dep tree (lance, datafusion). A separate job with its own # cache key would be a fresh ~20min build on first run; this - # is ~30s on a warm cache. - run: cargo test --locked -p omnigraph-engine --features failpoints --test failpoints + # is ~30s on a warm cache. The cluster feature does not enable + # omnigraph/failpoints, so each line rebuilds only its crate. + run: | + cargo test --locked -p omnigraph-engine --features failpoints --test failpoints + cargo test --locked -p omnigraph-cluster --features failpoints --test failpoints - name: Commit regenerated openapi.json to PR branch if: | diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index 3ede30c..05d2913 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -117,3 +117,102 @@ fn failpoint_wiring_returns_injected_diagnostic() { drop(_failpoint); scenario.teardown(); } + +/// Crash between the payload phase and the state write: blobs are on disk, +/// state.json is byte-identical, nothing is acknowledged — and a plain re-run +/// repairs by trusting the existing content-addressed blobs. +#[test] +fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() { + let scenario = FailScenario::setup(); + let dir = fixture(); + let digests = seed_applyable_state(dir.path()); + let state_before = fs::read(state_path(dir.path())).unwrap(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return"); + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + assert!(!out.state_written); + assert!(!out.converged); + assert_eq!(out.applied_count, 0); + // Persisted pre-apply snapshot: no phantom Applied statuses. + assert!( + !out.resource_statuses + .contains_key("query.knowledge.find_person"), + "{:?}", + out.resource_statuses + ); + // State has not moved; payloads are inert on disk; the lock released. + assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before); + assert!(query_blob(dir.path(), &digests).exists()); + assert!(!dir.path().join("__cluster/lock.json").exists()); + } + + // The repair is a plain re-run: existing blobs are trusted by digest. + let recovered = apply_config_dir(dir.path()); + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!(recovered.converged); + assert!(recovered.state_written); + assert_eq!( + recovered.resource_statuses["query.knowledge.find_person"].status, + omnigraph_cluster::ResourceLifecycleStatus::Applied + ); + scenario.teardown(); +} + +/// A concurrent writer mutating state.json between apply's read and its write +/// (possible under `state.lock: false`) must surface `state_cas_mismatch`, +/// acknowledge nothing, and leave the concurrent writer's state on disk. +#[test] +fn apply_cas_race_surfaces_state_cas_mismatch() { + let scenario = FailScenario::setup(); + let dir = fixture(); + let digests = seed_applyable_state(dir.path()); + + // Simulate the concurrent writer at the exact race window: rewrite + // state.json (valid JSON, graph/schema digests preserved, revision 99) + // after apply read it but before apply writes. + let race_path = state_path(dir.path()); + fail::cfg_callback("cluster_apply.before_state_write", move || { + let mut state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap(); + state["state_revision"] = serde_json::json!(99); + fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap(); + }) + .expect("configure callback failpoint"); + + let out = apply_config_dir(dir.path()); + fail::remove("cluster_apply.before_state_write"); + + assert!(!out.ok); + assert!(!out.state_written); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_cas_mismatch"), + "{:?}", + out.diagnostics + ); + // Persisted snapshot, not the unwritten in-memory mutations. + assert!( + !out.resource_statuses + .contains_key("query.knowledge.find_person") + ); + // The concurrent writer's state is what's on disk; apply's mutation never landed. + let state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap(); + assert_eq!(state["state_revision"], 99); + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_none() + ); + // Blobs written before the race are inert. + assert!(query_blob(dir.path(), &digests).exists()); + + // Recovery is a plain re-run against the rewritten state. + let recovered = apply_config_dir(dir.path()); + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!(recovered.converged); + scenario.teardown(); +} From 50543a8ce073444f7e68886a65a14e08b4dafbe9 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 02:15:13 +0300 Subject: [PATCH 5/7] docs(cluster): record Stage 3B failpoint + verification coverage Co-Authored-By: Claude Fable 5 --- docs/dev/cluster-config-implementation-spec.md | 6 +++++- docs/dev/testing.md | 10 +++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/docs/dev/cluster-config-implementation-spec.md b/docs/dev/cluster-config-implementation-spec.md index ff3dd7e..f3c5b68 100644 --- a/docs/dev/cluster-config-implementation-spec.md +++ b/docs/dev/cluster-config-implementation-spec.md @@ -663,7 +663,11 @@ Hard gates: - Do not ship `cluster apply` until `cluster validate` and read-only `cluster plan` have hermetic tests. - Do not ship graph/schema-moving apply until failpoint recovery tests prove the - Phase B -> state publish gap is covered. + Phase B -> state publish gap is covered. (Stage 3B delivered the apply-side + half: `omnigraph-cluster` has failpoint infrastructure and tests for the + crash-after-payload and state-CAS-race windows of config-only apply, plus + catalog payload verification in status/refresh. Graph-moving sidecar + coverage remains Phase 4 work.) For docs-only changes, `scripts/check-agents-md.sh` is enough. For implementation phases, run the boundary tests above before widening to diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 1eebeb2..5c88a37 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, and config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply) | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), and failpoint crash-mid-apply / CAS-race coverage | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | @@ -54,10 +54,10 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav ## Failpoints (fault injection) -- Cargo feature: `failpoints = ["dep:fail", "fail/failpoints"]` (in `crates/omnigraph/Cargo.toml`). -- Wrapper: `crates/omnigraph/src/failpoints.rs` exposes `maybe_fail("name")` and `ScopedFailPoint` for tests. -- Call sites are inserted at sensitive transaction boundaries (branch create, graph publish commit, etc.). -- Activated tests: `crates/omnigraph/tests/failpoints.rs`. Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints`. +- Cargo feature: `failpoints = ["dep:fail", "fail/failpoints"]` (in `crates/omnigraph/Cargo.toml` **and** `crates/omnigraph-cluster/Cargo.toml`; the cluster feature does not enable the engine's). +- Wrappers: `crates/omnigraph/src/failpoints.rs` and `crates/omnigraph-cluster/src/failpoints.rs` expose `maybe_fail("name")` and `ScopedFailPoint` for tests. +- Call sites are inserted at sensitive transaction boundaries (branch create, graph publish commit, cluster apply's payload→state-write window, etc.). +- Activated tests: `crates/omnigraph/tests/failpoints.rs` and `crates/omnigraph-cluster/tests/failpoints.rs` (crash-mid-apply + state CAS race via `fail::cfg_callback`; integration binaries, never in-source — the fail registry is process-global). Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints` / `cargo test -p omnigraph-cluster --features failpoints --test failpoints`. ## RustFS / S3 integration From 08ea659c9bcc39902233f0f771eb7883f9b97876 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 02:21:10 +0300 Subject: [PATCH 6/7] build: commit Cargo.lock for omnigraph-cluster's optional fail dependency The failpoints feature added fail = { workspace = true, optional = true } to the crate manifest; the lockfile edge belongs with it (--locked CI gate). Co-Authored-By: Claude Fable 5 --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index 79760b0..f6a1b8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4568,6 +4568,7 @@ dependencies = [ name = "omnigraph-cluster" version = "0.6.2" dependencies = [ + "fail", "omnigraph-compiler", "omnigraph-engine", "serde", From 16759b28b9c13b445e49db9363756d24d6a26aaa Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 02:36:24 +0300 Subject: [PATCH 7/7] fix(cluster): RAII-guard the callback failpoint ScopedFailPoint::with_callback gives cfg_callback the same Drop-based cleanup as cfg actions; a panic while the point is active no longer leaks the callback into the process-global registry where it would fire under later tests (greptile review, PR #167). Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/failpoints.rs | 14 ++++++++++++++ crates/omnigraph-cluster/tests/failpoints.rs | 19 ++++++++++--------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/crates/omnigraph-cluster/src/failpoints.rs b/crates/omnigraph-cluster/src/failpoints.rs index c6d445b..f1799d7 100644 --- a/crates/omnigraph-cluster/src/failpoints.rs +++ b/crates/omnigraph-cluster/src/failpoints.rs @@ -32,6 +32,20 @@ impl ScopedFailPoint { name: name.to_string(), } } + + /// Register a callback failpoint with the same Drop-based cleanup as + /// `new`. Without the guard, a panic while the point is active would + /// leak the callback into the process-global registry and fire it under + /// later tests in the same binary. + pub fn with_callback(name: &str, callback: F) -> Self + where + F: Fn() + Send + Sync + 'static, + { + fail::cfg_callback(name, callback).expect("configure callback failpoint"); + Self { + name: name.to_string(), + } + } } #[cfg(feature = "failpoints")] diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index 05d2913..db7b82d 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -171,18 +171,19 @@ fn apply_cas_race_surfaces_state_cas_mismatch() { // Simulate the concurrent writer at the exact race window: rewrite // state.json (valid JSON, graph/schema digests preserved, revision 99) - // after apply read it but before apply writes. + // after apply read it but before apply writes. RAII-guarded so a panic + // inside apply cannot leak the callback into the global registry. let race_path = state_path(dir.path()); - fail::cfg_callback("cluster_apply.before_state_write", move || { - let mut state: serde_json::Value = - serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap(); - state["state_revision"] = serde_json::json!(99); - fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap(); - }) - .expect("configure callback failpoint"); + let failpoint = + ScopedFailPoint::with_callback("cluster_apply.before_state_write", move || { + let mut state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap(); + state["state_revision"] = serde_json::json!(99); + fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap(); + }); let out = apply_config_dir(dir.path()); - fail::remove("cluster_apply.before_state_write"); + drop(failpoint); assert!(!out.ok); assert!(!out.state_written);