fix(cluster): tighten state lock observations

This commit is contained in:
aaltshuler 2026-06-09 18:30:33 +03:00
parent b046515e1c
commit 2f19656c0e
3 changed files with 81 additions and 38 deletions

View file

@ -350,8 +350,9 @@ fn cluster_plan_json_includes_state_cas_revision_and_lock_observation() {
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["state_observations"]["locked"], true);
assert!(json["state_observations"]["lock_id"].is_string());
assert_eq!(json["state_observations"]["locked"], false);
assert_eq!(json["state_observations"]["lock_acquired"], true);
assert!(json["state_observations"]["acquired_lock_id"].is_string());
assert!(!state_dir.join("lock.json").exists());
}
@ -386,6 +387,8 @@ fn cluster_plan_locked_state_exits_nonzero() {
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert_eq!(json["state_observations"]["locked"], true);
assert_eq!(json["state_observations"]["lock_acquired"], false);
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
assert!(
json["diagnostics"]
.as_array()

View file

@ -104,6 +104,9 @@ pub struct StateObservations {
pub locked: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub lock_id: Option<String>,
pub lock_acquired: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub acquired_lock_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -213,7 +216,7 @@ struct LoadOutcome {
config_file: PathBuf,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct RawClusterConfig {
version: u32,
@ -227,20 +230,20 @@ struct RawClusterConfig {
policies: BTreeMap<String, PolicyConfig>,
}
#[derive(Debug, Default, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct Metadata {
name: Option<String>,
}
#[derive(Debug, Default, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct StateConfig {
backend: Option<String>,
lock: Option<bool>,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct GraphConfig {
schema: PathBuf,
@ -248,13 +251,13 @@ struct GraphConfig {
queries: BTreeMap<String, QueryConfig>,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct QueryConfig {
file: PathBuf,
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
struct PolicyConfig {
file: PathBuf,
@ -605,6 +608,8 @@ impl LocalStateBackend {
resource_count: 0,
locked: false,
lock_id: None,
lock_acquired: false,
acquired_lock_id: None,
}
}
@ -692,15 +697,19 @@ impl LocalStateBackend {
.open(&self.lock_path)
{
Ok(mut file) => {
file.write_all(payload.as_bytes()).map_err(|err| {
Diagnostic::error(
if let Err(err) = file.write_all(payload.as_bytes()) {
// No guard exists yet, so clean up the create-new file here
// instead of leaving a stale partial lock for the next run.
drop(file);
let _ = fs::remove_file(&self.lock_path);
return Err(Diagnostic::error(
"state_lock_error",
CLUSTER_LOCK_FILE,
format!("could not write state lock: {err}"),
)
})?;
observations.locked = true;
observations.lock_id = Some(lock_id.clone());
));
}
observations.lock_acquired = true;
observations.acquired_lock_id = Some(lock_id.clone());
Ok(StateLockGuard {
path: self.lock_path.clone(),
})
@ -794,22 +803,6 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
};
};
let settings = validate_cluster_header(&raw, &mut diagnostics);
let config_text = match fs::read_to_string(&config_file) {
Ok(text) => text,
Err(err) => {
diagnostics.push(Diagnostic::error(
"cluster_config_read_error",
CLUSTER_CONFIG_FILE,
format!("could not re-read cluster.yaml: {err}"),
));
return LoadOutcome {
desired: None,
diagnostics,
config_dir,
config_file,
};
}
};
let mut resources = BTreeMap::new();
let mut dependencies = BTreeSet::new();
@ -1026,7 +1019,7 @@ fn load_desired(config_dir: &Path) -> LoadOutcome {
resource_list.push(resource);
}
let dependencies: Vec<_> = dependencies.into_iter().collect();
let config_digest = desired_config_digest(&config_text, &resource_digests);
let config_digest = desired_config_digest(&raw, &resource_digests);
LoadOutcome {
desired: Some(DesiredCluster {
@ -1351,11 +1344,15 @@ fn graph_digest(
}
fn desired_config_digest(
config_source: &str,
raw: &RawClusterConfig,
resource_digests: &BTreeMap<String, String>,
) -> String {
let mut input = String::from("cluster-config\0");
input.push_str(config_source);
// Hash parsed semantics, not raw YAML bytes, so comments and formatting do
// not create a new desired revision and the digest cannot drift from parse.
let config_semantics =
serde_json::to_string(raw).expect("raw cluster config must serialize deterministically");
input.push_str(&config_semantics);
input.push('\0');
for (address, digest) in resource_digests {
input.push_str(address);
@ -1593,6 +1590,8 @@ graphs:
let out = plan_config_dir(dir.path());
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!out.state_observations.state_found);
assert!(!out.state_observations.locked);
assert!(out.state_observations.lock_acquired);
assert!(
out.changes
.iter()
@ -1602,6 +1601,40 @@ graphs:
assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists());
}
#[test]
fn config_digest_ignores_yaml_comments_and_formatting() {
let dir = fixture();
let first = plan_config_dir(dir.path());
assert!(first.ok, "{:?}", first.diagnostics);
fs::write(
dir.path().join(CLUSTER_CONFIG_FILE),
r#"
# Same semantic config as the fixture, intentionally rendered differently.
version: 1
metadata: { name: test }
state: { backend: cluster, lock: true }
graphs:
knowledge:
schema: ./people.pg
queries: { find_person: { file: ./people.gq } }
policies:
base:
file: ./base.policy.yaml
applies_to:
- knowledge
"#,
)
.unwrap();
let second = plan_config_dir(dir.path());
assert!(second.ok, "{:?}", second.diagnostics);
assert_eq!(
first.desired_revision.config_digest,
second.desired_revision.config_digest
);
}
#[test]
fn existing_state_plans_update_and_delete_deterministically() {
let dir = fixture();
@ -1775,8 +1808,10 @@ graphs:
out.state_observations.state_cas.as_deref(),
Some(format!("sha256:{}", sha256_hex(state.as_bytes())).as_str())
);
assert!(out.state_observations.locked);
assert!(out.state_observations.lock_id.is_some());
assert!(!out.state_observations.locked);
assert!(out.state_observations.lock_id.is_none());
assert!(out.state_observations.lock_acquired);
assert!(out.state_observations.acquired_lock_id.is_some());
assert!(
!dir.path().join(CLUSTER_LOCK_FILE).exists(),
"plan must release lock before returning"
@ -1804,6 +1839,8 @@ graphs:
assert!(!out.ok);
assert!(out.state_observations.locked);
assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock"));
assert!(!out.state_observations.lock_acquired);
assert!(out.state_observations.acquired_lock_id.is_none());
assert!(
out.diagnostics
.iter()
@ -1831,6 +1868,7 @@ graphs:
let out = plan_config_dir(dir.path());
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!out.state_observations.locked);
assert!(!out.state_observations.lock_acquired);
assert!(
out.diagnostics
.iter()

View file

@ -112,9 +112,11 @@ Missing `state_revision` is treated as `0`. Resource status values are
Plan output compares desired resource digests against state resource digests
and reports `create`, `update`, and `delete` changes. It also reports the state
CAS (`sha256:<digest>`), state revision, and lock id used for the read. The
command never writes `state.json`; apply, refresh, import, and live drift scans
are later-stage work.
CAS (`sha256:<digest>`) and state revision. `state_observations.locked` means an
existing lock file was observed; a successful `plan` instead reports
`lock_acquired: true` and an `acquired_lock_id`, then releases the lock before
returning. The command never writes `state.json`; apply, refresh, import, and
live drift scans are later-stage work.
## Status