diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 3673194..84968a7 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -890,6 +890,14 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { match backend.read_state(&mut observations) { Ok(snapshot) => { if let Some(state) = snapshot.state { + // Read-only point-in-time catalog check: report the + // findings as diagnostics; persisting Drifted statuses + // is refresh's job. Status never writes state. + for (address, finding) in + verify_catalog_payloads(&parsed.config_dir, &state) + { + diagnostics.push(payload_finding_diagnostic(&address, &finding)); + } resource_digests = state_resource_digests(&state); resource_statuses = state.resource_statuses; state_observation_records = state.observations; @@ -1076,6 +1084,47 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St (StateSyncOperation::Import, None) => initial_import_state(&desired), }; + // Catalog payload verification must run BEFORE graph observation: removing + // a drifted query digest first means the live-graph composite recompute + // below already excludes it, so the persisted graph. composite stays + // consistent and the next plan shows exactly the create + derived update. + for (address, finding) in verify_catalog_payloads(&desired.config_dir, &state) { + diagnostics.push(payload_finding_diagnostic(&address, &finding)); + match finding { + PayloadFinding::Missing => { + state.applied_revision.resources.remove(&address); + set_resource_status( + &mut state, + &address, + ResourceLifecycleStatus::Drifted, + "payload_missing", + "catalog payload blob is missing; re-run `cluster apply` to republish", + ); + } + PayloadFinding::Mismatch { .. } => { + state.applied_revision.resources.remove(&address); + set_resource_status( + &mut state, + &address, + ResourceLifecycleStatus::Drifted, + "payload_mismatch", + "catalog payload blob does not match the recorded digest; re-run `cluster apply` to republish", + ); + } + // Transient IO must not trigger a spurious republish: keep the + // digest, surface the error, let a later clean refresh converge. + PayloadFinding::ReadError(error) => { + set_resource_status( + &mut state, + &address, + ResourceLifecycleStatus::Error, + "payload_read_error", + &error, + ); + } + } + } + let graph_error_count = observe_declared_graphs(&desired, &mut state).await; if graph_error_count > 0 { diagnostics.push(Diagnostic::error( @@ -2371,6 +2420,73 @@ fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option< } } +#[derive(Debug, PartialEq, Eq)] +enum PayloadFinding { + Missing, + Mismatch { actual_digest: String }, + ReadError(String), +} + +/// Verify every catalog-backed resource digest in state against its +/// content-addressed blob under `__cluster/resources/`. Graph, schema, and +/// unknown addresses have no payloads and are skipped. Read-only; findings +/// are deterministic (BTreeMap order). Payloads are small (queries, policy +/// bundles), so a full digest re-hash is cheap. +fn verify_catalog_payloads( + config_dir: &Path, + state: &ClusterState, +) -> Vec<(String, PayloadFinding)> { + let mut findings = Vec::new(); + for (address, resource) in &state.applied_revision.resources { + let kind = resource_kind(address); + let Some(path) = payload_path(config_dir, &kind, &resource.digest) else { + continue; + }; + match fs::read(&path) { + Ok(bytes) => { + let actual_digest = sha256_hex(&bytes); + if actual_digest != resource.digest { + findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest })); + } + } + Err(err) if err.kind() == ErrorKind::NotFound => { + findings.push((address.clone(), PayloadFinding::Missing)); + } + Err(err) => { + findings.push(( + address.clone(), + PayloadFinding::ReadError(format!( + "could not read catalog payload '{}': {err}", + path.display() + )), + )); + } + } + } + findings +} + +fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagnostic { + match finding { + PayloadFinding::Missing => Diagnostic::warning( + "catalog_payload_missing", + address, + "catalog payload blob is missing; re-run `cluster apply` to republish", + ), + PayloadFinding::Mismatch { actual_digest } => Diagnostic::warning( + "catalog_payload_mismatch", + address, + format!( + "catalog payload blob does not match the recorded digest (actual sha256:{actual_digest}); re-run `cluster apply` to republish" + ), + ), + // An unverifiable blob must not report healthy. + PayloadFinding::ReadError(error) => { + Diagnostic::error("catalog_payload_read_error", address, error.clone()) + } + } +} + /// Write one content-addressed payload blob. Idempotent: an existing /// digest-named file is trusted as-is. The digest re-check is the apply-side /// TOCTOU detector — the source file changing between `load_desired` and the @@ -4317,6 +4433,171 @@ graphs: ); } + // ---- catalog payload verification (Stage 3B) ---- + + /// Converge a fixture dir and return the query blob path. + fn converge_fixture(config_dir: &Path) -> std::path::PathBuf { + write_applyable_state(config_dir); + let out = apply_config_dir(config_dir); + assert!(out.ok && out.converged, "{:?}", out.diagnostics); + let desired = validate_config_dir(config_dir); + query_payload_path( + config_dir, + desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap(), + ) + } + + #[test] + fn status_reports_missing_payload_read_only() { + let dir = fixture(); + let blob = converge_fixture(dir.path()); + let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); + fs::remove_file(&blob).unwrap(); + + let out = status_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "catalog_payload_missing" + && diagnostic.path == "query.knowledge.find_person" + })); + // Read-only: persisted statuses and state bytes untouched. + assert_eq!( + out.resource_statuses["query.knowledge.find_person"].status, + ResourceLifecycleStatus::Applied + ); + assert_eq!( + fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(), + state_before + ); + } + + #[tokio::test] + async fn refresh_removes_digest_and_drifts_on_missing_payload() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let blob = converge_fixture(dir.path()); + fs::remove_file(&blob).unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "catalog_payload_missing") + ); + let status = &out.resource_statuses["query.knowledge.find_person"]; + assert_eq!(status.status, ResourceLifecycleStatus::Drifted); + assert!(status.conditions.contains(&"payload_missing".to_string())); + let state = read_state_json(dir.path()); + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_none(), + "{state}" + ); + } + + #[tokio::test] + async fn refresh_drifts_on_corrupted_payload() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let blob = converge_fixture(dir.path()); + fs::write(&blob, "corrupted content").unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + let status = &out.resource_statuses["query.knowledge.find_person"]; + assert_eq!(status.status, ResourceLifecycleStatus::Drifted); + assert!(status.conditions.contains(&"payload_mismatch".to_string())); + let state = read_state_json(dir.path()); + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_none() + ); + } + + #[tokio::test] + async fn refresh_flags_unreadable_payload_as_error() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let blob = converge_fixture(dir.path()); + // A same-named directory yields a non-NotFound IO error portably. + fs::remove_file(&blob).unwrap(); + fs::create_dir(&blob).unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "catalog_payload_read_error") + ); + let status = &out.resource_statuses["query.knowledge.find_person"]; + assert_eq!(status.status, ResourceLifecycleStatus::Error); + assert!(status.conditions.contains(&"payload_read_error".to_string())); + // Transient IO keeps the digest: no spurious republish. + let state = read_state_json(dir.path()); + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_some() + ); + } + + #[tokio::test] + async fn payload_drift_self_heals_through_refresh_plan_apply() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let blob = converge_fixture(dir.path()); + let original = fs::read_to_string(&blob).unwrap(); + fs::remove_file(&blob).unwrap(); + + let refresh = refresh_config_dir(dir.path()).await; + assert!(refresh.ok, "{:?}", refresh.diagnostics); + + let plan = plan_config_dir(dir.path()); + let query_change = plan + .changes + .iter() + .find(|change| change.resource == "query.knowledge.find_person") + .expect("plan must propose recreating the query"); + assert_eq!(query_change.operation, PlanOperation::Create); + assert_eq!(query_change.disposition, Some(ApplyDisposition::Applied)); + + let apply = apply_config_dir(dir.path()); + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + assert_eq!(fs::read_to_string(&blob).unwrap(), original); + + let status = status_config_dir(dir.path()); + assert!( + !status + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code.starts_with("catalog_payload")), + "{:?}", + status.diagnostics + ); + } + + #[test] + fn verification_skips_graph_and_schema_resources() { + let dir = fixture(); + write_applyable_state(dir.path()); // graph + schema digests only, no blobs + + let out = status_config_dir(dir.path()); + assert!( + !out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code.starts_with("catalog_payload")), + "{:?}", + out.diagnostics + ); + } + #[test] fn plan_annotates_apply_dispositions() { let dir = fixture(); diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 912f307..9a2597b 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -198,6 +198,16 @@ files and does not inspect live graphs. Missing `state.json` succeeds with a warning; invalid state JSON or an unsupported state version fails. If a lock is present, status reports its id, operation, creation time, pid, and age. +Status also verifies the catalog payloads read-only: every query/policy digest +recorded in state is checked against its content-addressed blob under +`__cluster/resources/` (existence and full digest re-hash). A missing or +mismatched blob is reported as a warning (`catalog_payload_missing` / +`catalog_payload_mismatch`); an unreadable blob is an error +(`catalog_payload_read_error`) because an unverifiable catalog must not report +healthy. Status never writes state — persisting the `drifted` condition is +refresh's job. The check runs without the state lock, so it is a point-in-time +report. + ## Refresh And Import `cluster refresh` updates an existing `state.json` from actual observations. @@ -216,8 +226,27 @@ Invalid graph roots are recorded as errors; `refresh` persists the error observation and exits non-zero, while `import` exits non-zero without creating initial state. -Refresh/import do not observe query or policy resources yet. Existing query and -policy state digests are preserved on refresh and are not invented on import. +Refresh also verifies the catalog payloads of every query/policy digest +recorded in state (the same check `cluster status` reports read-only), and +closes the loop: + +- a **missing** or **digest-mismatched** blob marks the resource `drifted` + (condition `payload_missing` / `payload_mismatch`) and removes its digest + from state — so the next `cluster plan` proposes a create and the next + `cluster apply` republishes the blob (the self-heal loop, mirroring how a + missing graph root is handled); +- an **unreadable** blob (IO error other than not-found) keeps the digest, + marks the resource `error` (condition `payload_read_error`), and exits + non-zero — transient IO must not trigger a spurious republish. + +Upgrade note: a state ledger written before catalog publish existed records +query/policy digests with no blobs on disk; the first refresh after upgrading +flags them all `payload_missing`, and a single `cluster apply` republishes +everything and converges. + +Refresh/import do not observe query or policy resources beyond their catalog +payloads yet. Existing query and policy state digests are preserved on refresh +(unless their payload drifted, above) and are not invented on import. ## Force Unlock