fix(cluster,cli): apply failure output — persisted statuses only, changes list printed

Two review findings (greptile, PR #165):

- ApplyOutput.resource_statuses on a failed state write now carries the
  pre-apply on-disk snapshot instead of the in-memory mutations that were
  never persisted, so automation reading the field independently of `ok`
  cannot see phantom applied/blocked statuses. Regression test forces the
  state write to fail via a read-only __cluster dir (unix-only, skips when
  permissions are not enforced).
- Human-mode `cluster apply` prints the classified changes list on failure
  too, so an operator debugging a partial apply without --json sees what was
  attempted.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 00:35:03 +03:00
parent d870eaaf3f
commit 5e1dede08f
2 changed files with 113 additions and 20 deletions

View file

@ -820,34 +820,42 @@ fn print_cluster_apply_human(output: &ApplyOutput) {
"cluster apply: {} applied, {} deferred/blocked",
output.applied_count, output.deferred_count
);
for change in &output.changes {
match (&change.disposition, change.reason.as_deref()) {
(Some(disposition), Some(reason)) => println!(
" {:?} {} [{disposition:?}: {reason}]",
change.operation, change.resource
),
(Some(disposition), None) => println!(
" {:?} {} [{disposition:?}]",
change.operation, change.resource
),
_ => println!(" {:?} {}", change.operation, change.resource),
}
}
if output.changes.is_empty() {
println!(" no changes");
}
} else {
println!("cluster apply failed");
}
// The change list prints on failure too: an operator debugging a partial
// apply (payload or state-write error) needs to see what was attempted.
print_cluster_apply_changes(&output.changes);
if output.ok {
let state = &output.state_observations;
println!(
" state: revision {}, converged: {}, written: {}",
state.state_revision, output.converged, output.state_written
);
println!(" note: applied = recorded in the cluster catalog; the server still boots from omnigraph.yaml");
} else {
println!("cluster apply failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_apply_changes(changes: &[omnigraph_cluster::PlanChange]) {
for change in changes {
match (&change.disposition, change.reason.as_deref()) {
(Some(disposition), Some(reason)) => println!(
" {:?} {} [{disposition:?}: {reason}]",
change.operation, change.resource
),
(Some(disposition), None) => println!(
" {:?} {} [{disposition:?}]",
change.operation, change.resource
),
_ => println!(" {:?} {}", change.operation, change.resource),
}
}
if changes.is_empty() {
println!(" no changes");
}
}
fn print_cluster_status_human(output: &StatusOutput) {
if output.ok {
let state = &output.state_observations;

View file

@ -276,6 +276,8 @@ pub struct ApplyOutput {
pub converged: bool,
/// False for a no-op re-apply: state bytes (and revision) were left untouched.
pub state_written: bool,
/// The statuses as persisted: post-apply on success, the pre-apply on-disk
/// snapshot when the state write fails (never unpersisted in-memory state).
pub resource_statuses: BTreeMap<String, ResourceStatusRecord>,
pub diagnostics: Vec<Diagnostic>,
}
@ -819,13 +821,26 @@ pub fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
let after_value =
serde_json::to_value(&new_state).expect("cluster state must serialize deterministically");
let mut state_written = false;
let mut state_write_failed = false;
if after_value != before_value {
new_state.state_revision = new_state.state_revision.saturating_add(1);
match backend.write_state(&new_state, expected_cas.as_deref(), &mut observations) {
Ok(()) => state_written = true,
Err(diagnostic) => diagnostics.push(diagnostic),
Err(diagnostic) => {
diagnostics.push(diagnostic);
state_write_failed = true;
}
}
}
// On a failed state write, report the statuses that are actually on disk
// (the pre-apply snapshot), not the in-memory mutations that were never
// persisted — automation reading `resource_statuses` independently of `ok`
// must not see phantom status updates.
let resource_statuses = if state_write_failed {
state.resource_statuses
} else {
new_state.resource_statuses
};
let applied_count = changes
.iter()
@ -853,7 +868,7 @@ pub fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
deferred_count,
converged,
state_written,
resource_statuses: new_state.resource_statuses,
resource_statuses,
diagnostics,
}
}
@ -4232,6 +4247,76 @@ graphs:
assert!(!dir.path().join(CLUSTER_STATE_DIR).exists());
}
/// When the state write fails after payloads landed, the output must
/// report the statuses actually on disk — not the unpersisted in-memory
/// mutations (phantom `applied` entries would mislead automation that
/// reads `resource_statuses` independently of `ok`).
#[cfg(unix)]
#[test]
fn apply_state_write_failure_reports_persisted_statuses() {
use std::os::unix::fs::PermissionsExt;
let dir = fixture();
// lock: false so the only write into __cluster/ is state.json itself.
fs::write(
dir.path().join(CLUSTER_CONFIG_FILE),
r#"
version: 1
state:
backend: cluster
lock: false
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
"#,
)
.unwrap();
write_applyable_state(dir.path());
// Pre-create the payload blob so the payload phase is a no-op and the
// failure lands exactly at the state write.
let desired = validate_config_dir(dir.path());
let query_digest = desired
.resource_digests
.get("query.knowledge.find_person")
.unwrap();
let blob = query_payload_path(dir.path(), query_digest);
fs::create_dir_all(blob.parent().unwrap()).unwrap();
fs::write(&blob, QUERY).unwrap();
let state_dir = dir.path().join(CLUSTER_STATE_DIR);
fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o555)).unwrap();
// Running as root ignores permission bits; skip rather than flake.
if fs::write(state_dir.join("probe"), b"x").is_ok() {
let _ = fs::remove_file(state_dir.join("probe"));
fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o755)).unwrap();
eprintln!("skipping: permissions are not enforced (running as root)");
return;
}
let out = apply_config_dir(dir.path());
fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o755)).unwrap();
assert!(!out.ok);
assert!(!out.state_written);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "state_write_error"),
"{:?}",
out.diagnostics
);
// The seeded state has no statuses; the failed apply must not invent
// the in-memory `applied` ones it failed to persist.
assert!(
out.resource_statuses.is_empty(),
"unpersisted statuses leaked into output: {:?}",
out.resource_statuses
);
}
#[test]
fn plan_annotates_apply_dispositions() {
let dir = fixture();