Merge pull request #167 from ModernRelay/feat/cluster-stage3b

feat(cluster): Stage 3B — catalog payload verification + failpoint coverage
This commit is contained in:
Andrew Altshuler 2026-06-10 03:17:11 +03:00 committed by GitHub
commit effb9cc068
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 697 additions and 14 deletions

View file

@ -173,15 +173,18 @@ jobs:
OMNIGRAPH_UPDATE_OPENAPI: ${{ (github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) && '1' || '' }}
run: cargo test --workspace --locked
- name: Run failpoints feature test
- name: Run failpoints feature tests
if: needs.classify_changes.outputs.run_full_ci == 'true'
# Run after the workspace test so the build cache is warm —
# enabling --features failpoints is just an incremental rebuild
# of omnigraph-engine + the small `fail` crate, not the full
# of the target crate + the small `fail` crate, not the full
# dep tree (lance, datafusion). A separate job with its own
# cache key would be a fresh ~20min build on first run; this
# is ~30s on a warm cache.
run: cargo test --locked -p omnigraph-engine --features failpoints --test failpoints
# is ~30s on a warm cache. The cluster feature does not enable
# omnigraph/failpoints, so each line rebuilds only its crate.
run: |
cargo test --locked -p omnigraph-engine --features failpoints --test failpoints
cargo test --locked -p omnigraph-cluster --features failpoints --test failpoints
- name: Commit regenerated openapi.json to PR branch
if: |

1
Cargo.lock generated
View file

@ -4568,6 +4568,7 @@ dependencies = [
name = "omnigraph-cluster"
version = "0.6.2"
dependencies = [
"fail",
"omnigraph-compiler",
"omnigraph-engine",
"serde",

View file

@ -1435,6 +1435,67 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
);
}
/// Catalog payload drift self-heals across the command surface: status warns
/// read-only, refresh persists the drift and drops the digest, apply
/// republishes the blob, status comes back clean.
#[test]
fn cluster_e2e_payload_drift_self_heals() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"]
.as_str()
.unwrap()
.to_string();
let blob = temp
.path()
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{query_digest}.gq"));
fs::remove_file(&blob).unwrap();
let status = cluster_json(temp.path(), "status");
assert_eq!(status["ok"], true, "{status}");
assert!(
status["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "catalog_payload_missing"),
"{status}"
);
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
assert_eq!(
refresh["resource_statuses"]["query.knowledge.find_person"]["status"],
"drifted"
);
let heal = cluster_json(temp.path(), "apply");
assert_eq!(heal["ok"], true, "{heal}");
assert_eq!(heal["converged"], true, "{heal}");
assert!(blob.exists(), "blob republished");
let clean = cluster_json(temp.path(), "status");
assert!(
!clean["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| {
diagnostic["code"]
.as_str()
.is_some_and(|code| code.starts_with("catalog_payload"))
}),
"{clean}"
);
}
#[test]
fn short_version_flag_prints_current_cli_version() {
let output = output_success(cli().arg("-v"));

View file

@ -2,15 +2,21 @@
name = "omnigraph-cluster"
version = "0.6.2"
edition = "2024"
description = "Read-only cluster configuration validation and planning for Omnigraph."
description = "Cluster configuration validation, planning, and config-only apply for Omnigraph."
license = "MIT"
repository = "https://github.com/ModernRelay/omnigraph"
homepage = "https://github.com/ModernRelay/omnigraph"
documentation = "https://docs.rs/omnigraph-cluster"
[features]
# Fault-injection hooks for the apply protocol (crash-mid-apply, CAS-race
# tests). Deliberately does NOT enable omnigraph/failpoints.
failpoints = ["dep:fail", "fail/failpoints"]
[dependencies]
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" }
fail = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }

View file

@ -0,0 +1,56 @@
//! Fault-injection hooks for the cluster apply protocol, mirroring the
//! engine's `omnigraph::failpoints` pattern. With the `failpoints` feature
//! off, every call site compiles to `Ok(())`.
use crate::Diagnostic;
pub(crate) fn maybe_fail(_name: &str) -> Result<(), Diagnostic> {
#[cfg(feature = "failpoints")]
{
let name = _name;
fail::fail_point!(name, |_| {
return Err(Diagnostic::error(
"injected_failpoint",
name,
format!("injected failpoint triggered: {name}"),
));
});
}
Ok(())
}
#[cfg(feature = "failpoints")]
pub struct ScopedFailPoint {
name: String,
}
#[cfg(feature = "failpoints")]
impl ScopedFailPoint {
pub fn new(name: &str, action: &str) -> Self {
fail::cfg(name, action).expect("configure failpoint");
Self {
name: name.to_string(),
}
}
/// Register a callback failpoint with the same Drop-based cleanup as
/// `new`. Without the guard, a panic while the point is active would
/// leak the callback into the process-global registry and fire it under
/// later tests in the same binary.
pub fn with_callback<F>(name: &str, callback: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
fail::cfg_callback(name, callback).expect("configure callback failpoint");
Self {
name: name.to_string(),
}
}
}
#[cfg(feature = "failpoints")]
impl Drop for ScopedFailPoint {
fn drop(&mut self) {
fail::remove(&self.name);
}
}

View file

@ -16,6 +16,8 @@ use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use ulid::Ulid;
pub mod failpoints;
pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml";
pub const CLUSTER_GRAPHS_DIR: &str = "graphs";
pub const CLUSTER_STATE_DIR: &str = "__cluster";
@ -770,6 +772,21 @@ pub fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
);
}
// Crash point: payloads are on disk, state has not moved. A failure here
// must leave state.json byte-identical and acknowledge nothing; re-running
// apply repairs via the skip-if-exists blob reuse.
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_payload_phase") {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
Some(desired.config_digest),
observations,
changes,
state.resource_statuses,
diagnostics,
);
}
// State mutation. Apply owns query/policy statuses only; graph/schema
// statuses belong to refresh/import observation and must not be clobbered.
let before_value =
@ -824,7 +841,13 @@ pub fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
let mut state_write_failed = false;
if after_value != before_value {
new_state.state_revision = new_state.state_revision.saturating_add(1);
match backend.write_state(&new_state, expected_cas.as_deref(), &mut observations) {
// The failpoint error routes through state_write_failed so the
// persisted-statuses revert contract below is exercised; a cfg_callback
// on this point can mutate state.json to simulate a concurrent writer,
// making write_state's CAS check fail organically.
let write_result = failpoints::maybe_fail("cluster_apply.before_state_write")
.and_then(|()| backend.write_state(&new_state, expected_cas.as_deref(), &mut observations));
match write_result {
Ok(()) => state_written = true,
Err(diagnostic) => {
diagnostics.push(diagnostic);
@ -890,6 +913,14 @@ pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
match backend.read_state(&mut observations) {
Ok(snapshot) => {
if let Some(state) = snapshot.state {
// Read-only point-in-time catalog check: report the
// findings as diagnostics; persisting Drifted statuses
// is refresh's job. Status never writes state.
for (address, finding) in
verify_catalog_payloads(&parsed.config_dir, &state)
{
diagnostics.push(payload_finding_diagnostic(&address, &finding));
}
resource_digests = state_resource_digests(&state);
resource_statuses = state.resource_statuses;
state_observation_records = state.observations;
@ -1076,6 +1107,47 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
(StateSyncOperation::Import, None) => initial_import_state(&desired),
};
// Catalog payload verification must run BEFORE graph observation: removing
// a drifted query digest first means the live-graph composite recompute
// below already excludes it, so the persisted graph.<id> composite stays
// consistent and the next plan shows exactly the create + derived update.
for (address, finding) in verify_catalog_payloads(&desired.config_dir, &state) {
diagnostics.push(payload_finding_diagnostic(&address, &finding));
match finding {
PayloadFinding::Missing => {
state.applied_revision.resources.remove(&address);
set_resource_status(
&mut state,
&address,
ResourceLifecycleStatus::Drifted,
"payload_missing",
"catalog payload blob is missing; re-run `cluster apply` to republish",
);
}
PayloadFinding::Mismatch { .. } => {
state.applied_revision.resources.remove(&address);
set_resource_status(
&mut state,
&address,
ResourceLifecycleStatus::Drifted,
"payload_mismatch",
"catalog payload blob does not match the recorded digest; re-run `cluster apply` to republish",
);
}
// Transient IO must not trigger a spurious republish: keep the
// digest, surface the error, let a later clean refresh converge.
PayloadFinding::ReadError(error) => {
set_resource_status(
&mut state,
&address,
ResourceLifecycleStatus::Error,
"payload_read_error",
&error,
);
}
}
}
let graph_error_count = observe_declared_graphs(&desired, &mut state).await;
if graph_error_count > 0 {
diagnostics.push(Diagnostic::error(
@ -2371,6 +2443,73 @@ fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option<
}
}
#[derive(Debug, PartialEq, Eq)]
enum PayloadFinding {
Missing,
Mismatch { actual_digest: String },
ReadError(String),
}
/// Verify every catalog-backed resource digest in state against its
/// content-addressed blob under `__cluster/resources/`. Graph, schema, and
/// unknown addresses have no payloads and are skipped. Read-only; findings
/// are deterministic (BTreeMap order). Payloads are small (queries, policy
/// bundles), so a full digest re-hash is cheap.
fn verify_catalog_payloads(
config_dir: &Path,
state: &ClusterState,
) -> Vec<(String, PayloadFinding)> {
let mut findings = Vec::new();
for (address, resource) in &state.applied_revision.resources {
let kind = resource_kind(address);
let Some(path) = payload_path(config_dir, &kind, &resource.digest) else {
continue;
};
match fs::read(&path) {
Ok(bytes) => {
let actual_digest = sha256_hex(&bytes);
if actual_digest != resource.digest {
findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest }));
}
}
Err(err) if err.kind() == ErrorKind::NotFound => {
findings.push((address.clone(), PayloadFinding::Missing));
}
Err(err) => {
findings.push((
address.clone(),
PayloadFinding::ReadError(format!(
"could not read catalog payload '{}': {err}",
path.display()
)),
));
}
}
}
findings
}
fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagnostic {
match finding {
PayloadFinding::Missing => Diagnostic::warning(
"catalog_payload_missing",
address,
"catalog payload blob is missing; re-run `cluster apply` to republish",
),
PayloadFinding::Mismatch { actual_digest } => Diagnostic::warning(
"catalog_payload_mismatch",
address,
format!(
"catalog payload blob does not match the recorded digest (actual sha256:{actual_digest}); re-run `cluster apply` to republish"
),
),
// An unverifiable blob must not report healthy.
PayloadFinding::ReadError(error) => {
Diagnostic::error("catalog_payload_read_error", address, error.clone())
}
}
}
/// Write one content-addressed payload blob. Idempotent: an existing
/// digest-named file is trusted as-is. The digest re-check is the apply-side
/// TOCTOU detector — the source file changing between `load_desired` and the
@ -4317,6 +4456,171 @@ graphs:
);
}
// ---- catalog payload verification (Stage 3B) ----
/// Converge a fixture dir and return the query blob path.
fn converge_fixture(config_dir: &Path) -> std::path::PathBuf {
write_applyable_state(config_dir);
let out = apply_config_dir(config_dir);
assert!(out.ok && out.converged, "{:?}", out.diagnostics);
let desired = validate_config_dir(config_dir);
query_payload_path(
config_dir,
desired
.resource_digests
.get("query.knowledge.find_person")
.unwrap(),
)
}
#[test]
fn status_reports_missing_payload_read_only() {
let dir = fixture();
let blob = converge_fixture(dir.path());
let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap();
fs::remove_file(&blob).unwrap();
let out = status_config_dir(dir.path());
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "catalog_payload_missing"
&& diagnostic.path == "query.knowledge.find_person"
}));
// Read-only: persisted statuses and state bytes untouched.
assert_eq!(
out.resource_statuses["query.knowledge.find_person"].status,
ResourceLifecycleStatus::Applied
);
assert_eq!(
fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(),
state_before
);
}
#[tokio::test]
async fn refresh_removes_digest_and_drifts_on_missing_payload() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
fs::remove_file(&blob).unwrap();
let out = refresh_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "catalog_payload_missing")
);
let status = &out.resource_statuses["query.knowledge.find_person"];
assert_eq!(status.status, ResourceLifecycleStatus::Drifted);
assert!(status.conditions.contains(&"payload_missing".to_string()));
let state = read_state_json(dir.path());
assert!(
state["applied_revision"]["resources"]
.get("query.knowledge.find_person")
.is_none(),
"{state}"
);
}
#[tokio::test]
async fn refresh_drifts_on_corrupted_payload() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
fs::write(&blob, "corrupted content").unwrap();
let out = refresh_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
let status = &out.resource_statuses["query.knowledge.find_person"];
assert_eq!(status.status, ResourceLifecycleStatus::Drifted);
assert!(status.conditions.contains(&"payload_mismatch".to_string()));
let state = read_state_json(dir.path());
assert!(
state["applied_revision"]["resources"]
.get("query.knowledge.find_person")
.is_none()
);
}
#[tokio::test]
async fn refresh_flags_unreadable_payload_as_error() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
// A same-named directory yields a non-NotFound IO error portably.
fs::remove_file(&blob).unwrap();
fs::create_dir(&blob).unwrap();
let out = refresh_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "catalog_payload_read_error")
);
let status = &out.resource_statuses["query.knowledge.find_person"];
assert_eq!(status.status, ResourceLifecycleStatus::Error);
assert!(status.conditions.contains(&"payload_read_error".to_string()));
// Transient IO keeps the digest: no spurious republish.
let state = read_state_json(dir.path());
assert!(
state["applied_revision"]["resources"]
.get("query.knowledge.find_person")
.is_some()
);
}
#[tokio::test]
async fn payload_drift_self_heals_through_refresh_plan_apply() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
let original = fs::read_to_string(&blob).unwrap();
fs::remove_file(&blob).unwrap();
let refresh = refresh_config_dir(dir.path()).await;
assert!(refresh.ok, "{:?}", refresh.diagnostics);
let plan = plan_config_dir(dir.path());
let query_change = plan
.changes
.iter()
.find(|change| change.resource == "query.knowledge.find_person")
.expect("plan must propose recreating the query");
assert_eq!(query_change.operation, PlanOperation::Create);
assert_eq!(query_change.disposition, Some(ApplyDisposition::Applied));
let apply = apply_config_dir(dir.path());
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
assert_eq!(fs::read_to_string(&blob).unwrap(), original);
let status = status_config_dir(dir.path());
assert!(
!status
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code.starts_with("catalog_payload")),
"{:?}",
status.diagnostics
);
}
#[test]
fn verification_skips_graph_and_schema_resources() {
let dir = fixture();
write_applyable_state(dir.path()); // graph + schema digests only, no blobs
let out = status_config_dir(dir.path());
assert!(
!out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code.starts_with("catalog_payload")),
"{:?}",
out.diagnostics
);
}
#[test]
fn plan_annotates_apply_dispositions() {
let dir = fixture();

View file

@ -0,0 +1,219 @@
//! Fault-injection tests for the cluster apply protocol.
//!
//! These live in an integration binary (not in-source) deliberately: the fail
//! crate's registry is process-global, so a configured `cluster_apply.*`
//! action would fire inside any concurrently running normal apply test in the
//! lib-test process. A separate binary isolates the registry by construction —
//! same reason the engine keeps its failpoint suite in `tests/failpoints.rs`.
#![cfg(feature = "failpoints")]
use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use fail::FailScenario;
use omnigraph_cluster::failpoints::ScopedFailPoint;
use omnigraph_cluster::{apply_config_dir, validate_config_dir};
use tempfile::tempdir;
const SCHEMA: &str = r#"
node Person {
name: String @key
age: I32?
}
"#;
const QUERY: &str = r#"
query find_person($name: String) {
match { $p: Person { name: $name } }
return { $p.name, $p.age }
}
"#;
fn fixture() -> tempfile::TempDir {
let dir = tempdir().unwrap();
fs::write(dir.path().join("people.pg"), SCHEMA).unwrap();
fs::write(dir.path().join("people.gq"), QUERY).unwrap();
fs::write(dir.path().join("base.policy.yaml"), "rules: []\n").unwrap();
fs::write(
dir.path().join("cluster.yaml"),
r#"
version: 1
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
base:
file: ./base.policy.yaml
applies_to: [knowledge]
"#,
)
.unwrap();
dir
}
/// Seed a state.json where the graph/schema digests match desired, so query
/// and policy changes are applicable. Digests are borrowed from the public
/// validate output; the graph composite is a placeholder that apply converges
/// as a Derived update.
fn seed_applyable_state(config_dir: &Path) -> BTreeMap<String, String> {
let validate = validate_config_dir(config_dir);
assert!(validate.ok, "{:?}", validate.diagnostics);
let schema_digest = validate.resource_digests["schema.knowledge"].clone();
let state_dir = config_dir.join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
format!(
r#"{{
"version": 1,
"state_revision": 1,
"applied_revision": {{
"resources": {{
"graph.knowledge": {{ "digest": "seed" }},
"schema.knowledge": {{ "digest": "{schema_digest}" }}
}}
}}
}}
"#
),
)
.unwrap();
validate.resource_digests
}
fn state_path(config_dir: &Path) -> PathBuf {
config_dir.join("__cluster/state.json")
}
fn query_blob(config_dir: &Path, digests: &BTreeMap<String, String>) -> PathBuf {
config_dir
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{}.gq", digests["query.knowledge.find_person"]))
}
#[test]
fn failpoint_wiring_returns_injected_diagnostic() {
let scenario = FailScenario::setup();
let dir = fixture();
seed_applyable_state(dir.path());
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let out = apply_config_dir(dir.path());
assert!(!out.ok);
assert!(out.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "injected_failpoint"
&& diagnostic
.message
.contains("cluster_apply.after_payload_phase")
}));
drop(_failpoint);
scenario.teardown();
}
/// Crash between the payload phase and the state write: blobs are on disk,
/// state.json is byte-identical, nothing is acknowledged — and a plain re-run
/// repairs by trusting the existing content-addressed blobs.
#[test]
fn apply_crash_after_payload_phase_leaves_state_unmoved_then_recovers() {
let scenario = FailScenario::setup();
let dir = fixture();
let digests = seed_applyable_state(dir.path());
let state_before = fs::read(state_path(dir.path())).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_payload_phase", "return");
let out = apply_config_dir(dir.path());
assert!(!out.ok);
assert!(!out.state_written);
assert!(!out.converged);
assert_eq!(out.applied_count, 0);
// Persisted pre-apply snapshot: no phantom Applied statuses.
assert!(
!out.resource_statuses
.contains_key("query.knowledge.find_person"),
"{:?}",
out.resource_statuses
);
// State has not moved; payloads are inert on disk; the lock released.
assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before);
assert!(query_blob(dir.path(), &digests).exists());
assert!(!dir.path().join("__cluster/lock.json").exists());
}
// The repair is a plain re-run: existing blobs are trusted by digest.
let recovered = apply_config_dir(dir.path());
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(recovered.converged);
assert!(recovered.state_written);
assert_eq!(
recovered.resource_statuses["query.knowledge.find_person"].status,
omnigraph_cluster::ResourceLifecycleStatus::Applied
);
scenario.teardown();
}
/// A concurrent writer mutating state.json between apply's read and its write
/// (possible under `state.lock: false`) must surface `state_cas_mismatch`,
/// acknowledge nothing, and leave the concurrent writer's state on disk.
#[test]
fn apply_cas_race_surfaces_state_cas_mismatch() {
let scenario = FailScenario::setup();
let dir = fixture();
let digests = seed_applyable_state(dir.path());
// Simulate the concurrent writer at the exact race window: rewrite
// state.json (valid JSON, graph/schema digests preserved, revision 99)
// after apply read it but before apply writes. RAII-guarded so a panic
// inside apply cannot leak the callback into the global registry.
let race_path = state_path(dir.path());
let failpoint =
ScopedFailPoint::with_callback("cluster_apply.before_state_write", move || {
let mut state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&race_path).unwrap()).unwrap();
state["state_revision"] = serde_json::json!(99);
fs::write(&race_path, serde_json::to_string_pretty(&state).unwrap()).unwrap();
});
let out = apply_config_dir(dir.path());
drop(failpoint);
assert!(!out.ok);
assert!(!out.state_written);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "state_cas_mismatch"),
"{:?}",
out.diagnostics
);
// Persisted snapshot, not the unwritten in-memory mutations.
assert!(
!out.resource_statuses
.contains_key("query.knowledge.find_person")
);
// The concurrent writer's state is what's on disk; apply's mutation never landed.
let state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap();
assert_eq!(state["state_revision"], 99);
assert!(
state["applied_revision"]["resources"]
.get("query.knowledge.find_person")
.is_none()
);
// Blobs written before the race are inert.
assert!(query_blob(dir.path(), &digests).exists());
// Recovery is a plain re-run against the rewritten state.
let recovered = apply_config_dir(dir.path());
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(recovered.converged);
scenario.teardown();
}

View file

@ -663,7 +663,11 @@ Hard gates:
- Do not ship `cluster apply` until `cluster validate` and read-only
`cluster plan` have hermetic tests.
- Do not ship graph/schema-moving apply until failpoint recovery tests prove the
Phase B -> state publish gap is covered.
Phase B -> state publish gap is covered. (Stage 3B delivered the apply-side
half: `omnigraph-cluster` has failpoint infrastructure and tests for the
crash-after-payload and state-CAS-race windows of config-only apply, plus
catalog payload verification in status/refresh. Graph-moving sidecar
coverage remains Phase 4 work.)
For docs-only changes, `scripts/check-agents-md.sh` is enough. For
implementation phases, run the boundary tests above before widening to

View file

@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
|---|---|---|
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, and config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply) |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), and failpoint crash-mid-apply / CAS-race coverage |
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) |
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |
@ -54,10 +54,10 @@ The engine's `tests/` is the principal coverage surface; most graph-shaped behav
## Failpoints (fault injection)
- Cargo feature: `failpoints = ["dep:fail", "fail/failpoints"]` (in `crates/omnigraph/Cargo.toml`).
- Wrapper: `crates/omnigraph/src/failpoints.rs` exposes `maybe_fail("name")` and `ScopedFailPoint` for tests.
- Call sites are inserted at sensitive transaction boundaries (branch create, graph publish commit, etc.).
- Activated tests: `crates/omnigraph/tests/failpoints.rs`. Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints`.
- Cargo feature: `failpoints = ["dep:fail", "fail/failpoints"]` (in `crates/omnigraph/Cargo.toml` **and** `crates/omnigraph-cluster/Cargo.toml`; the cluster feature does not enable the engine's).
- Wrappers: `crates/omnigraph/src/failpoints.rs` and `crates/omnigraph-cluster/src/failpoints.rs` expose `maybe_fail("name")` and `ScopedFailPoint` for tests.
- Call sites are inserted at sensitive transaction boundaries (branch create, graph publish commit, cluster apply's payload→state-write window, etc.).
- Activated tests: `crates/omnigraph/tests/failpoints.rs` and `crates/omnigraph-cluster/tests/failpoints.rs` (crash-mid-apply + state CAS race via `fail::cfg_callback`; integration binaries, never in-source — the fail registry is process-global). Run with `cargo test -p omnigraph-engine --features failpoints --test failpoints` / `cargo test -p omnigraph-cluster --features failpoints --test failpoints`.
## RustFS / S3 integration

View file

@ -198,6 +198,16 @@ files and does not inspect live graphs. Missing `state.json` succeeds with a
warning; invalid state JSON or an unsupported state version fails. If a lock is
present, status reports its id, operation, creation time, pid, and age.
Status also verifies the catalog payloads read-only: every query/policy digest
recorded in state is checked against its content-addressed blob under
`__cluster/resources/` (existence and full digest re-hash). A missing or
mismatched blob is reported as a warning (`catalog_payload_missing` /
`catalog_payload_mismatch`); an unreadable blob is an error
(`catalog_payload_read_error`) because an unverifiable catalog must not report
healthy. Status never writes state — persisting the `drifted` condition is
refresh's job. The check runs without the state lock, so it is a point-in-time
report.
## Refresh And Import
`cluster refresh` updates an existing `state.json` from actual observations.
@ -216,8 +226,27 @@ Invalid graph roots are recorded as errors; `refresh` persists the error
observation and exits non-zero, while `import` exits non-zero without creating
initial state.
Refresh/import do not observe query or policy resources yet. Existing query and
policy state digests are preserved on refresh and are not invented on import.
Refresh also verifies the catalog payloads of every query/policy digest
recorded in state (the same check `cluster status` reports read-only), and
closes the loop:
- a **missing** or **digest-mismatched** blob marks the resource `drifted`
(condition `payload_missing` / `payload_mismatch`) and removes its digest
from state — so the next `cluster plan` proposes a create and the next
`cluster apply` republishes the blob (the self-heal loop, mirroring how a
missing graph root is handled);
- an **unreadable** blob (IO error other than not-found) keeps the digest,
marks the resource `error` (condition `payload_read_error`), and exits
non-zero — transient IO must not trigger a spurious republish.
Upgrade note: a state ledger written before catalog publish existed records
query/policy digests with no blobs on disk; the first refresh after upgrading
flags them all `payload_missing`, and a single `cluster apply` republishes
everything and converges.
Refresh/import do not observe query or policy resources beyond their catalog
payloads yet. Existing query and policy state digests are preserved on refresh
(unless their payload drifted, above) and are not invented on import.
## Force Unlock