feat(cluster): verify catalog payload blobs in status and refresh

Closes the Stage 3A product gap where a deleted or corrupted blob under
__cluster/resources/ went unnoticed forever (status reported converged and
apply could not repair it because the digests matched).

verify_catalog_payloads checks every query/policy digest in state against its
content-addressed blob (existence + full sha256 re-hash; graph/schema/unknown
addresses have no payloads and are skipped). status reports findings read-only
(warnings catalog_payload_missing/_mismatch; error catalog_payload_read_error
— an unverifiable catalog must not report healthy). refresh closes the
self-heal loop: missing/mismatched blobs mark the resource drifted and remove
its digest from state so the next plan proposes a create and the next apply
republishes; unreadable blobs keep the digest (no spurious republish), mark
error, and exit non-zero. Verification runs before graph observation so the
recomputed graph composite already excludes removed query digests.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 02:07:08 +03:00
parent b6d228ff54
commit 15868972ff
2 changed files with 312 additions and 2 deletions

View file

@ -890,6 +890,14 @@ pub fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
match backend.read_state(&mut observations) {
Ok(snapshot) => {
if let Some(state) = snapshot.state {
// Read-only point-in-time catalog check: report the
// findings as diagnostics; persisting Drifted statuses
// is refresh's job. Status never writes state.
for (address, finding) in
verify_catalog_payloads(&parsed.config_dir, &state)
{
diagnostics.push(payload_finding_diagnostic(&address, &finding));
}
resource_digests = state_resource_digests(&state);
resource_statuses = state.resource_statuses;
state_observation_records = state.observations;
@ -1076,6 +1084,47 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St
(StateSyncOperation::Import, None) => initial_import_state(&desired),
};
// Catalog payload verification must run BEFORE graph observation: removing
// a drifted query digest first means the live-graph composite recompute
// below already excludes it, so the persisted graph.<id> composite stays
// consistent and the next plan shows exactly the create + derived update.
for (address, finding) in verify_catalog_payloads(&desired.config_dir, &state) {
diagnostics.push(payload_finding_diagnostic(&address, &finding));
match finding {
PayloadFinding::Missing => {
state.applied_revision.resources.remove(&address);
set_resource_status(
&mut state,
&address,
ResourceLifecycleStatus::Drifted,
"payload_missing",
"catalog payload blob is missing; re-run `cluster apply` to republish",
);
}
PayloadFinding::Mismatch { .. } => {
state.applied_revision.resources.remove(&address);
set_resource_status(
&mut state,
&address,
ResourceLifecycleStatus::Drifted,
"payload_mismatch",
"catalog payload blob does not match the recorded digest; re-run `cluster apply` to republish",
);
}
// Transient IO must not trigger a spurious republish: keep the
// digest, surface the error, let a later clean refresh converge.
PayloadFinding::ReadError(error) => {
set_resource_status(
&mut state,
&address,
ResourceLifecycleStatus::Error,
"payload_read_error",
&error,
);
}
}
}
let graph_error_count = observe_declared_graphs(&desired, &mut state).await;
if graph_error_count > 0 {
diagnostics.push(Diagnostic::error(
@ -2371,6 +2420,73 @@ fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option<
}
}
#[derive(Debug, PartialEq, Eq)]
enum PayloadFinding {
Missing,
Mismatch { actual_digest: String },
ReadError(String),
}
/// Verify every catalog-backed resource digest in state against its
/// content-addressed blob under `__cluster/resources/`. Graph, schema, and
/// unknown addresses have no payloads and are skipped. Read-only; findings
/// are deterministic (BTreeMap order). Payloads are small (queries, policy
/// bundles), so a full digest re-hash is cheap.
fn verify_catalog_payloads(
config_dir: &Path,
state: &ClusterState,
) -> Vec<(String, PayloadFinding)> {
let mut findings = Vec::new();
for (address, resource) in &state.applied_revision.resources {
let kind = resource_kind(address);
let Some(path) = payload_path(config_dir, &kind, &resource.digest) else {
continue;
};
match fs::read(&path) {
Ok(bytes) => {
let actual_digest = sha256_hex(&bytes);
if actual_digest != resource.digest {
findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest }));
}
}
Err(err) if err.kind() == ErrorKind::NotFound => {
findings.push((address.clone(), PayloadFinding::Missing));
}
Err(err) => {
findings.push((
address.clone(),
PayloadFinding::ReadError(format!(
"could not read catalog payload '{}': {err}",
path.display()
)),
));
}
}
}
findings
}
fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagnostic {
match finding {
PayloadFinding::Missing => Diagnostic::warning(
"catalog_payload_missing",
address,
"catalog payload blob is missing; re-run `cluster apply` to republish",
),
PayloadFinding::Mismatch { actual_digest } => Diagnostic::warning(
"catalog_payload_mismatch",
address,
format!(
"catalog payload blob does not match the recorded digest (actual sha256:{actual_digest}); re-run `cluster apply` to republish"
),
),
// An unverifiable blob must not report healthy.
PayloadFinding::ReadError(error) => {
Diagnostic::error("catalog_payload_read_error", address, error.clone())
}
}
}
/// Write one content-addressed payload blob. Idempotent: an existing
/// digest-named file is trusted as-is. The digest re-check is the apply-side
/// TOCTOU detector — the source file changing between `load_desired` and the
@ -4317,6 +4433,171 @@ graphs:
);
}
// ---- catalog payload verification (Stage 3B) ----
/// Converge a fixture dir and return the query blob path.
fn converge_fixture(config_dir: &Path) -> std::path::PathBuf {
write_applyable_state(config_dir);
let out = apply_config_dir(config_dir);
assert!(out.ok && out.converged, "{:?}", out.diagnostics);
let desired = validate_config_dir(config_dir);
query_payload_path(
config_dir,
desired
.resource_digests
.get("query.knowledge.find_person")
.unwrap(),
)
}
#[test]
fn status_reports_missing_payload_read_only() {
let dir = fixture();
let blob = converge_fixture(dir.path());
let state_before = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap();
fs::remove_file(&blob).unwrap();
let out = status_config_dir(dir.path());
assert!(out.ok, "{:?}", out.diagnostics);
assert!(out.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "catalog_payload_missing"
&& diagnostic.path == "query.knowledge.find_person"
}));
// Read-only: persisted statuses and state bytes untouched.
assert_eq!(
out.resource_statuses["query.knowledge.find_person"].status,
ResourceLifecycleStatus::Applied
);
assert_eq!(
fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(),
state_before
);
}
#[tokio::test]
async fn refresh_removes_digest_and_drifts_on_missing_payload() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
fs::remove_file(&blob).unwrap();
let out = refresh_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "catalog_payload_missing")
);
let status = &out.resource_statuses["query.knowledge.find_person"];
assert_eq!(status.status, ResourceLifecycleStatus::Drifted);
assert!(status.conditions.contains(&"payload_missing".to_string()));
let state = read_state_json(dir.path());
assert!(
state["applied_revision"]["resources"]
.get("query.knowledge.find_person")
.is_none(),
"{state}"
);
}
#[tokio::test]
async fn refresh_drifts_on_corrupted_payload() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
fs::write(&blob, "corrupted content").unwrap();
let out = refresh_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
let status = &out.resource_statuses["query.knowledge.find_person"];
assert_eq!(status.status, ResourceLifecycleStatus::Drifted);
assert!(status.conditions.contains(&"payload_mismatch".to_string()));
let state = read_state_json(dir.path());
assert!(
state["applied_revision"]["resources"]
.get("query.knowledge.find_person")
.is_none()
);
}
#[tokio::test]
async fn refresh_flags_unreadable_payload_as_error() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
// A same-named directory yields a non-NotFound IO error portably.
fs::remove_file(&blob).unwrap();
fs::create_dir(&blob).unwrap();
let out = refresh_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "catalog_payload_read_error")
);
let status = &out.resource_statuses["query.knowledge.find_person"];
assert_eq!(status.status, ResourceLifecycleStatus::Error);
assert!(status.conditions.contains(&"payload_read_error".to_string()));
// Transient IO keeps the digest: no spurious republish.
let state = read_state_json(dir.path());
assert!(
state["applied_revision"]["resources"]
.get("query.knowledge.find_person")
.is_some()
);
}
#[tokio::test]
async fn payload_drift_self_heals_through_refresh_plan_apply() {
let dir = fixture();
init_derived_graph(dir.path()).await;
let blob = converge_fixture(dir.path());
let original = fs::read_to_string(&blob).unwrap();
fs::remove_file(&blob).unwrap();
let refresh = refresh_config_dir(dir.path()).await;
assert!(refresh.ok, "{:?}", refresh.diagnostics);
let plan = plan_config_dir(dir.path());
let query_change = plan
.changes
.iter()
.find(|change| change.resource == "query.knowledge.find_person")
.expect("plan must propose recreating the query");
assert_eq!(query_change.operation, PlanOperation::Create);
assert_eq!(query_change.disposition, Some(ApplyDisposition::Applied));
let apply = apply_config_dir(dir.path());
assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics);
assert_eq!(fs::read_to_string(&blob).unwrap(), original);
let status = status_config_dir(dir.path());
assert!(
!status
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code.starts_with("catalog_payload")),
"{:?}",
status.diagnostics
);
}
#[test]
fn verification_skips_graph_and_schema_resources() {
let dir = fixture();
write_applyable_state(dir.path()); // graph + schema digests only, no blobs
let out = status_config_dir(dir.path());
assert!(
!out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code.starts_with("catalog_payload")),
"{:?}",
out.diagnostics
);
}
#[test]
fn plan_annotates_apply_dispositions() {
let dir = fixture();