From 5e1dede08f20403eb6c87fd8f21a17267669d908 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 00:35:03 +0300 Subject: [PATCH] =?UTF-8?q?fix(cluster,cli):=20apply=20failure=20output=20?= =?UTF-8?q?=E2=80=94=20persisted=20statuses=20only,=20changes=20list=20pri?= =?UTF-8?q?nted?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two review findings (greptile, PR #165): - ApplyOutput.resource_statuses on a failed state write now carries the pre-apply on-disk snapshot instead of the in-memory mutations that were never persisted, so automation reading the field independently of `ok` cannot see phantom applied/blocked statuses. Regression test forces the state write to fail via a read-only __cluster dir (unix-only, skips when permissions are not enforced). - Human-mode `cluster apply` prints the classified changes list on failure too, so an operator debugging a partial apply without --json sees what was attempted. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 44 ++++++++------ crates/omnigraph-cluster/src/lib.rs | 89 ++++++++++++++++++++++++++++- 2 files changed, 113 insertions(+), 20 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 42bbed8..37db77f 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -820,34 +820,42 @@ fn print_cluster_apply_human(output: &ApplyOutput) { "cluster apply: {} applied, {} deferred/blocked", output.applied_count, output.deferred_count ); - for change in &output.changes { - match (&change.disposition, change.reason.as_deref()) { - (Some(disposition), Some(reason)) => println!( - " {:?} {} [{disposition:?}: {reason}]", - change.operation, change.resource - ), - (Some(disposition), None) => println!( - " {:?} {} [{disposition:?}]", - change.operation, change.resource - ), - _ => println!(" {:?} {}", change.operation, change.resource), - } - } - if output.changes.is_empty() { - println!(" no changes"); - } + } else { + println!("cluster apply failed"); + } + // The change list prints on failure too: an operator debugging a partial + // apply (payload or state-write error) needs to see what was attempted. + print_cluster_apply_changes(&output.changes); + if output.ok { let state = &output.state_observations; println!( " state: revision {}, converged: {}, written: {}", state.state_revision, output.converged, output.state_written ); println!(" note: applied = recorded in the cluster catalog; the server still boots from omnigraph.yaml"); - } else { - println!("cluster apply failed"); } print_cluster_diagnostics(&output.diagnostics); } +fn print_cluster_apply_changes(changes: &[omnigraph_cluster::PlanChange]) { + for change in changes { + match (&change.disposition, change.reason.as_deref()) { + (Some(disposition), Some(reason)) => println!( + " {:?} {} [{disposition:?}: {reason}]", + change.operation, change.resource + ), + (Some(disposition), None) => println!( + " {:?} {} [{disposition:?}]", + change.operation, change.resource + ), + _ => println!(" {:?} {}", change.operation, change.resource), + } + } + if changes.is_empty() { + println!(" no changes"); + } +} + fn print_cluster_status_human(output: &StatusOutput) { if output.ok { let state = &output.state_observations; diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 01ad171..3673194 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -276,6 +276,8 @@ pub struct ApplyOutput { pub converged: bool, /// False for a no-op re-apply: state bytes (and revision) were left untouched. pub state_written: bool, + /// The statuses as persisted: post-apply on success, the pre-apply on-disk + /// snapshot when the state write fails (never unpersisted in-memory state). pub resource_statuses: BTreeMap, pub diagnostics: Vec, } @@ -819,13 +821,26 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { let after_value = serde_json::to_value(&new_state).expect("cluster state must serialize deterministically"); let mut state_written = false; + let mut state_write_failed = false; if after_value != before_value { new_state.state_revision = new_state.state_revision.saturating_add(1); match backend.write_state(&new_state, expected_cas.as_deref(), &mut observations) { Ok(()) => state_written = true, - Err(diagnostic) => diagnostics.push(diagnostic), + Err(diagnostic) => { + diagnostics.push(diagnostic); + state_write_failed = true; + } } } + // On a failed state write, report the statuses that are actually on disk + // (the pre-apply snapshot), not the in-memory mutations that were never + // persisted — automation reading `resource_statuses` independently of `ok` + // must not see phantom status updates. + let resource_statuses = if state_write_failed { + state.resource_statuses + } else { + new_state.resource_statuses + }; let applied_count = changes .iter() @@ -853,7 +868,7 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { deferred_count, converged, state_written, - resource_statuses: new_state.resource_statuses, + resource_statuses, diagnostics, } } @@ -4232,6 +4247,76 @@ graphs: assert!(!dir.path().join(CLUSTER_STATE_DIR).exists()); } + /// When the state write fails after payloads landed, the output must + /// report the statuses actually on disk — not the unpersisted in-memory + /// mutations (phantom `applied` entries would mislead automation that + /// reads `resource_statuses` independently of `ok`). + #[cfg(unix)] + #[test] + fn apply_state_write_failure_reports_persisted_statuses() { + use std::os::unix::fs::PermissionsExt; + + let dir = fixture(); + // lock: false so the only write into __cluster/ is state.json itself. + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +state: + backend: cluster + lock: false +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +"#, + ) + .unwrap(); + write_applyable_state(dir.path()); + // Pre-create the payload blob so the payload phase is a no-op and the + // failure lands exactly at the state write. + let desired = validate_config_dir(dir.path()); + let query_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap(); + let blob = query_payload_path(dir.path(), query_digest); + fs::create_dir_all(blob.parent().unwrap()).unwrap(); + fs::write(&blob, QUERY).unwrap(); + + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o555)).unwrap(); + // Running as root ignores permission bits; skip rather than flake. + if fs::write(state_dir.join("probe"), b"x").is_ok() { + let _ = fs::remove_file(state_dir.join("probe")); + fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o755)).unwrap(); + eprintln!("skipping: permissions are not enforced (running as root)"); + return; + } + + let out = apply_config_dir(dir.path()); + fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o755)).unwrap(); + + assert!(!out.ok); + assert!(!out.state_written); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_write_error"), + "{:?}", + out.diagnostics + ); + // The seeded state has no statuses; the failed apply must not invent + // the in-memory `applied` ones it failed to persist. + assert!( + out.resource_statuses.is_empty(), + "unpersisted statuses leaked into output: {:?}", + out.resource_statuses + ); + } + #[test] fn plan_annotates_apply_dispositions() { let dir = fixture();