diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 7ae824c..01ad171 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -21,6 +21,7 @@ pub const CLUSTER_GRAPHS_DIR: &str = "graphs"; pub const CLUSTER_STATE_DIR: &str = "__cluster"; pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json"; +pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources"; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -161,6 +162,23 @@ pub enum PlanOperation { Delete, } +/// How `cluster apply` treats a planned change in the current stage. +/// +/// `Applied` changes execute (config-only query/policy catalog writes). +/// `Derived` marks a `graph.` composite-digest update that converges +/// automatically once its applied query digests land in state. `Deferred` +/// changes need a later phase (graph/schema lifecycle or schema content). +/// `Blocked` query/policy changes are gated by an unapplied or missing +/// dependency. +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ApplyDisposition { + Applied, + Derived, + Deferred, + Blocked, +} + #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct PlanChange { pub resource: String, @@ -169,6 +187,10 @@ pub struct PlanChange { pub before_digest: Option, #[serde(skip_serializing_if = "Option::is_none")] pub after_digest: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub disposition: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option, } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] @@ -236,6 +258,28 @@ pub struct ForceUnlockOutput { pub diagnostics: Vec, } +/// Output of config-only `cluster apply`. "Applied" means recorded in the +/// local cluster catalog (`__cluster/`); nothing applied here serves traffic — +/// the server still boots from `omnigraph.yaml` until the server-boot stage. +#[derive(Debug, Clone, Serialize)] +pub struct ApplyOutput { + pub ok: bool, + pub config_dir: String, + pub desired_revision: DesiredRevision, + pub state_observations: StateObservations, + /// Every planned change, with `disposition`/`reason` always populated. + pub changes: Vec, + pub applied_count: usize, + /// Deferred + Blocked changes (Derived composite updates count as neither). + pub deferred_count: usize, + /// True when state matches the desired revision after this apply. + pub converged: bool, + /// False for a no-op re-apply: state bytes (and revision) were left untouched. + pub state_written: bool, + pub resource_statuses: BTreeMap, + pub diagnostics: Vec, +} + #[derive(Debug, Clone)] struct DesiredCluster { config_dir: PathBuf, @@ -477,11 +521,12 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } } - let changes = if has_errors(&diagnostics) { + let mut changes = if has_errors(&diagnostics) { Vec::new() } else { diff_resources(&prior_resources, &desired.resource_digests) }; + classify_changes(&mut changes, &desired.dependencies); let blast_radius = compute_blast_radius(&changes, &desired.dependencies); let approvals_required = compute_approvals(&changes); let ok = !has_errors(&diagnostics); @@ -502,6 +547,317 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } } +/// Config-only `cluster apply` (Stage 3A): execute the query/policy subset of +/// the plan against the local cluster catalog. The plan is recomputed under +/// the state lock, so freshness is structural; the state CAS inside +/// `write_state` is the second fence. Graph/schema changes are never executed +/// here — they are deferred to the graph-lifecycle phase and reported loudly. +/// +/// Payloads are content-addressed and written BEFORE the state CAS because +/// state is the publish point: a failure after payload writes leaves inert +/// digest-named blobs and no success acknowledgement; re-running apply is the +/// repair. +pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { + let outcome = load_desired(config_dir.as_ref()); + let mut diagnostics = outcome.diagnostics; + let backend = LocalStateBackend::new(&outcome.config_dir); + let mut observations = backend.observations(); + + let early_return = |config_dir: String, + config_digest: Option, + observations: StateObservations, + changes: Vec, + resource_statuses: BTreeMap, + diagnostics: Vec| { + ApplyOutput { + ok: !has_errors(&diagnostics), + config_dir, + desired_revision: DesiredRevision { + config_digest, + }, + state_observations: observations, + changes, + applied_count: 0, + deferred_count: 0, + converged: false, + state_written: false, + resource_statuses, + diagnostics, + } + }; + + let Some(desired) = outcome.desired else { + return early_return( + display_path(&outcome.config_dir), + None, + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + }; + + if has_errors(&diagnostics) { + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + } + + // Named guard: the lock must be held until the state outcome is recorded. + let _lock_guard = if desired.state_lock { + match backend.acquire_lock("apply", &mut observations) { + Ok(guard) => Some(guard), + Err(diagnostic) => { + diagnostics.push(diagnostic); + None + } + } + } else { + diagnostics.push(Diagnostic::warning( + "state_lock_disabled", + "state.lock", + "state.lock is false; apply wrote state without acquiring the cluster state lock", + )); + None + }; + + if has_errors(&diagnostics) { + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + } + + let snapshot = match backend.read_state(&mut observations) { + Ok(snapshot) => snapshot, + Err(diagnostic) => { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + } + }; + let expected_cas = snapshot.state_cas; + let Some(state) = snapshot.state else { + diagnostics.push(Diagnostic::error( + "state_missing", + CLUSTER_STATE_FILE, + "apply requires an existing state.json; run `cluster import` to bootstrap state", + )); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + }; + + let prior_resources = state_resource_digests(&state); + let mut changes = diff_resources(&prior_resources, &desired.resource_digests); + classify_changes(&mut changes, &desired.dependencies); + + // Defensive invariant: nothing the approval gate covers may be executable. + // Today approvals only cover graph/schema deletes (always deferred); this + // keeps a future widening of the executable set from silently bypassing it. + let approvals = compute_approvals(&changes); + let approval_violation = changes.iter().any(|change| { + change.disposition == Some(ApplyDisposition::Applied) + && approvals + .iter() + .any(|approval| approval.resource == change.resource) + }); + if approval_violation { + diagnostics.push(Diagnostic::error( + "apply_approval_invariant_violation", + "changes", + "an executable change requires approval; refusing to apply", + )); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + + for change in &changes { + match change.disposition { + Some(ApplyDisposition::Deferred) => diagnostics.push(Diagnostic::warning( + "apply_unsupported_change", + change.resource.clone(), + "graph/schema changes are not applied in this stage; they are deferred to the graph-lifecycle phase", + )), + Some(ApplyDisposition::Blocked) => diagnostics.push(Diagnostic::warning( + "apply_dependency_blocked", + change.resource.clone(), + format!( + "blocked by an unapplied or missing dependency ({})", + change.reason.as_deref().unwrap_or("dependency") + ), + )), + _ => {} + } + } + + // Payload phase: content-addressed writes before the state CAS. Any + // failure aborts before state moves; blobs already written are inert. + let source_paths: BTreeMap<&str, &str> = desired + .resources + .iter() + .filter_map(|resource| { + resource + .path + .as_deref() + .map(|path| (resource.address.as_str(), path)) + }) + .collect(); + for change in &changes { + if change.disposition != Some(ApplyDisposition::Applied) + || change.operation == PlanOperation::Delete + { + continue; + } + let kind = resource_kind(&change.resource); + let digest = change + .after_digest + .as_deref() + .expect("create/update always carries an after digest"); + let Some(target) = payload_path(&desired.config_dir, &kind, digest) else { + continue; + }; + let Some(source) = source_paths.get(change.resource.as_str()) else { + diagnostics.push(Diagnostic::error( + "resource_payload_write_error", + change.resource.clone(), + "no source file recorded for resource", + )); + continue; + }; + if let Err(diagnostic) = + write_resource_payload(&target, Path::new(source), digest, &change.resource) + { + diagnostics.push(diagnostic); + } + } + if has_errors(&diagnostics) { + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + + // State mutation. Apply owns query/policy statuses only; graph/schema + // statuses belong to refresh/import observation and must not be clobbered. + let before_value = + serde_json::to_value(&state).expect("cluster state must serialize deterministically"); + let mut new_state = state.clone(); + for change in &changes { + match change.disposition { + Some(ApplyDisposition::Applied) => match change.operation { + PlanOperation::Create | PlanOperation::Update => { + new_state.applied_revision.resources.insert( + change.resource.clone(), + StateResource { + digest: change + .after_digest + .clone() + .expect("create/update always carries an after digest"), + }, + ); + set_resource_status_applied(&mut new_state, &change.resource); + } + PlanOperation::Delete => { + new_state.applied_revision.resources.remove(&change.resource); + new_state.resource_statuses.remove(&change.resource); + } + }, + Some(ApplyDisposition::Blocked) => { + set_resource_status( + &mut new_state, + &change.resource, + ResourceLifecycleStatus::Blocked, + change.reason.as_deref().unwrap_or("dependency_not_applied"), + "waiting on an unapplied or missing dependency", + ); + } + _ => {} + } + } + recompute_state_graph_digests(&mut new_state, &desired); + + let residual = diff_resources( + &state_resource_digests(&new_state), + &desired.resource_digests, + ); + let converged = residual.is_empty(); + if converged { + new_state.applied_revision.config_digest = Some(desired.config_digest.clone()); + } + + let after_value = + serde_json::to_value(&new_state).expect("cluster state must serialize deterministically"); + let mut state_written = false; + if after_value != before_value { + new_state.state_revision = new_state.state_revision.saturating_add(1); + match backend.write_state(&new_state, expected_cas.as_deref(), &mut observations) { + Ok(()) => state_written = true, + Err(diagnostic) => diagnostics.push(diagnostic), + } + } + + let applied_count = changes + .iter() + .filter(|change| change.disposition == Some(ApplyDisposition::Applied)) + .count(); + let deferred_count = changes + .iter() + .filter(|change| { + matches!( + change.disposition, + Some(ApplyDisposition::Deferred) | Some(ApplyDisposition::Blocked) + ) + }) + .count(); + + ApplyOutput { + ok: !has_errors(&diagnostics), + config_dir: display_path(&desired.config_dir), + desired_revision: DesiredRevision { + config_digest: Some(desired.config_digest), + }, + state_observations: observations, + changes, + applied_count, + deferred_count, + converged, + state_written, + resource_statuses: new_state.resource_statuses, + diagnostics, + } +} + pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; @@ -1797,12 +2153,16 @@ fn diff_resources( operation: PlanOperation::Create, before_digest: None, after_digest: Some(after.clone()), + disposition: None, + reason: None, }), Some(before) if before != after => changes.push(PlanChange { resource: address.clone(), operation: PlanOperation::Update, before_digest: Some(before.clone()), after_digest: Some(after.clone()), + disposition: None, + reason: None, }), Some(_) => {} } @@ -1814,6 +2174,8 @@ fn diff_resources( operation: PlanOperation::Delete, before_digest: Some(before.clone()), after_digest: None, + disposition: None, + reason: None, }); } } @@ -1855,6 +2217,249 @@ fn compute_approvals(changes: &[PlanChange]) -> Vec { .collect() } +#[derive(Debug, PartialEq, Eq)] +enum ResourceKind { + Graph(String), + Schema(String), + Query { graph: String, name: String }, + Policy(String), + Unknown, +} + +fn resource_kind(address: &str) -> ResourceKind { + if let Some(graph) = address.strip_prefix("graph.") { + ResourceKind::Graph(graph.to_string()) + } else if let Some(graph) = address.strip_prefix("schema.") { + ResourceKind::Schema(graph.to_string()) + } else if let Some(rest) = address.strip_prefix("query.") { + match rest.split_once('.') { + Some((graph, name)) => ResourceKind::Query { + graph: graph.to_string(), + name: name.to_string(), + }, + None => ResourceKind::Unknown, + } + } else if let Some(name) = address.strip_prefix("policy.") { + ResourceKind::Policy(name.to_string()) + } else { + ResourceKind::Unknown + } +} + +/// Classify every planned change with the disposition config-only apply gives +/// it. Stage 3A executes only query/policy catalog writes; graph/schema +/// movement is a later phase, and `graph.` composite updates whose schema +/// component is unchanged converge automatically once query digests land. +fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { + let mut schema_changed = BTreeSet::new(); + let mut graph_creates = BTreeSet::new(); + let mut graph_deletes = BTreeSet::new(); + for change in changes.iter() { + match resource_kind(&change.resource) { + ResourceKind::Schema(graph) => { + schema_changed.insert(graph); + } + ResourceKind::Graph(graph) => match change.operation { + PlanOperation::Create => { + graph_creates.insert(graph); + } + PlanOperation::Delete => { + graph_deletes.insert(graph); + } + PlanOperation::Update => {} + }, + _ => {} + } + } + + for change in changes.iter_mut() { + let (disposition, reason) = match resource_kind(&change.resource) { + ResourceKind::Schema(_) => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), + ResourceKind::Graph(graph) => match change.operation { + PlanOperation::Update if !schema_changed.contains(&graph) => { + (ApplyDisposition::Derived, None) + } + _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), + }, + ResourceKind::Query { graph, .. } => match change.operation { + PlanOperation::Delete => { + if graph_deletes.contains(&graph) { + ( + ApplyDisposition::Blocked, + Some("dependency_not_applied"), + ) + } else { + (ApplyDisposition::Applied, None) + } + } + PlanOperation::Create | PlanOperation::Update => { + // A missing graph is the more fundamental blocker than a + // pending schema change, so check it first. + if graph_creates.contains(&graph) { + (ApplyDisposition::Blocked, Some("dependency_missing")) + } else if schema_changed.contains(&graph) { + ( + ApplyDisposition::Blocked, + Some("dependency_not_applied"), + ) + } else { + (ApplyDisposition::Applied, None) + } + } + }, + ResourceKind::Policy(_) => match change.operation { + PlanOperation::Delete => (ApplyDisposition::Applied, None), + PlanOperation::Create | PlanOperation::Update => { + let blocked_dep = dependencies.iter().any(|dep| { + dep.from == change.resource + && dep + .to + .strip_prefix("graph.") + .is_some_and(|graph| graph_creates.contains(graph)) + }); + if blocked_dep { + (ApplyDisposition::Blocked, Some("dependency_missing")) + } else { + (ApplyDisposition::Applied, None) + } + } + }, + ResourceKind::Unknown => { + (ApplyDisposition::Deferred, Some("apply_unsupported_kind")) + } + }; + change.disposition = Some(disposition); + change.reason = reason.map(str::to_string); + } +} + +/// Content-addressed catalog path for an applied resource payload. Extensions +/// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name, +/// so the catalog layout cannot drift with operator file conventions. +fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option { + let resources_dir = config_dir.join(CLUSTER_RESOURCES_DIR); + match kind { + ResourceKind::Query { graph, name } => Some( + resources_dir + .join("query") + .join(graph) + .join(name) + .join(format!("{digest}.gq")), + ), + ResourceKind::Policy(name) => Some( + resources_dir + .join("policy") + .join(name) + .join(format!("{digest}.yaml")), + ), + _ => None, + } +} + +/// Write one content-addressed payload blob. Idempotent: an existing +/// digest-named file is trusted as-is. The digest re-check is the apply-side +/// TOCTOU detector — the source file changing between `load_desired` and the +/// payload write must fail loudly, never publish mismatched content. +fn write_resource_payload( + target: &Path, + source: &Path, + expected_digest: &str, + resource: &str, +) -> Result<(), Diagnostic> { + if target.exists() { + return Ok(()); + } + let bytes = fs::read(source).map_err(|err| { + Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not read resource source '{}': {err}", source.display()), + ) + })?; + if sha256_hex(&bytes) != expected_digest { + return Err(Diagnostic::error( + "resource_content_changed", + resource, + format!( + "resource source '{}' changed while apply was running; re-run `cluster apply`", + source.display() + ), + )); + } + let parent = target.parent().expect("payload path always has a parent"); + fs::create_dir_all(parent).map_err(|err| { + Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not create payload directory: {err}"), + ) + })?; + let file_name = target + .file_name() + .expect("payload path always has a file name") + .to_string_lossy(); + let tmp_path = parent.join(format!("{file_name}.tmp.{}", Ulid::new())); + let mut file = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tmp_path) + .map_err(|err| { + Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not create temporary payload file: {err}"), + ) + })?; + let write_result = file + .write_all(&bytes) + .and_then(|()| file.sync_all()) + .map_err(|err| { + Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not write payload file: {err}"), + ) + }); + drop(file); + if let Err(diagnostic) = write_result { + let _ = fs::remove_file(&tmp_path); + return Err(diagnostic); + } + if let Err(err) = fs::rename(&tmp_path, target) { + let _ = fs::remove_file(&tmp_path); + return Err(Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not move payload file into place: {err}"), + )); + } + Ok(()) +} + +/// Recompute the composite `graph.` digests for state-resident graphs from +/// state's own schema/query components. Without this, an applied query change +/// would leave the prior composite digest in state and `graph.` would show +/// a phantom update in every later plan — apply could never converge. +fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredCluster) { + for graph in &desired.graphs { + let graph_address = graph_address(&graph.id); + if !state.applied_revision.resources.contains_key(&graph_address) { + continue; + } + let schema_digest = state + .applied_revision + .resources + .get(&schema_address(&graph.id)) + .map(|resource| resource.digest.clone()); + let query_digests = state_query_digests_for_graph(state, &graph.id); + let digest = graph_digest(&graph.id, schema_digest.as_ref(), Some(&query_digests)); + state + .applied_revision + .resources + .insert(graph_address, StateResource { digest }); + } +} + fn duplicate_key_diagnostics(text: &str) -> Vec { #[derive(Debug)] struct Frame { @@ -3115,4 +3720,545 @@ graphs: ); assert!(!dir.path().join(CLUSTER_STATE_FILE).exists()); } + + // ---- config-only apply (Stage 3A) ---- + + /// Seed a state.json that simulates "graph exists with the desired schema, + /// queries/policies not yet applied" by borrowing the desired digests. + fn write_applyable_state(config_dir: &Path) { + let out = validate_config_dir(config_dir); + assert!(out.ok, "{:?}", out.diagnostics); + let schema_digest = out.resource_digests.get("schema.knowledge").unwrap().clone(); + let graph_composite = + graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + write_state_resources( + config_dir, + &[ + ("graph.knowledge", graph_composite.as_str()), + ("schema.knowledge", schema_digest.as_str()), + ], + ); + } + + fn write_state_resources(config_dir: &Path, resources: &[(&str, &str)]) { + let resource_map: serde_json::Map = resources + .iter() + .map(|(address, digest)| ((*address).to_string(), json!({ "digest": digest }))) + .collect(); + let state_dir = config_dir.join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + serde_json::to_string_pretty(&json!({ + "version": 1, + "state_revision": 1, + "applied_revision": { "resources": resource_map } + })) + .unwrap(), + ) + .unwrap(); + } + + fn read_state_json(config_dir: &Path) -> serde_json::Value { + serde_json::from_str(&fs::read_to_string(config_dir.join(CLUSTER_STATE_FILE)).unwrap()) + .unwrap() + } + + fn query_payload_path(config_dir: &Path, digest: &str) -> std::path::PathBuf { + config_dir + .join(CLUSTER_RESOURCES_DIR) + .join("query/knowledge/find_person") + .join(format!("{digest}.gq")) + } + + fn policy_payload_path(config_dir: &Path, digest: &str) -> std::path::PathBuf { + config_dir + .join(CLUSTER_RESOURCES_DIR) + .join("policy/base") + .join(format!("{digest}.yaml")) + } + + #[test] + fn apply_without_state_fails_with_state_missing() { + let dir = fixture(); + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_missing" + && diagnostic.message.contains("cluster import")) + ); + assert!(!dir.path().join(CLUSTER_STATE_FILE).exists()); + assert!(!dir.path().join(CLUSTER_RESOURCES_DIR).exists()); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[test] + fn apply_writes_payloads_state_and_statuses() { + let dir = fixture(); + write_applyable_state(dir.path()); + let desired = validate_config_dir(dir.path()); + let query_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap() + .clone(); + let policy_digest = desired.resource_digests.get("policy.base").unwrap().clone(); + let schema_digest = desired + .resource_digests + .get("schema.knowledge") + .unwrap() + .clone(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(out.applied_count, 2); + assert_eq!(out.deferred_count, 0); + assert!(out.converged); + assert!(out.state_written); + + let query_blob = query_payload_path(dir.path(), &query_digest); + assert_eq!(fs::read_to_string(&query_blob).unwrap(), QUERY); + let policy_blob = policy_payload_path(dir.path(), &policy_digest); + assert_eq!(fs::read_to_string(&policy_blob).unwrap(), "rules: []\n"); + + let state = read_state_json(dir.path()); + assert_eq!(state["state_revision"], 2); + let resources = &state["applied_revision"]["resources"]; + assert_eq!( + resources["query.knowledge.find_person"]["digest"], + query_digest + ); + assert_eq!(resources["policy.base"]["digest"], policy_digest); + let expected_composite = graph_digest( + "knowledge", + Some(&schema_digest), + Some( + &[("find_person".to_string(), query_digest.clone())] + .into_iter() + .collect(), + ), + ); + assert_eq!(resources["graph.knowledge"]["digest"], expected_composite); + assert_eq!( + state["applied_revision"]["config_digest"], + desired_revision_digest(&out) + ); + assert_eq!( + state["resource_statuses"]["query.knowledge.find_person"]["status"], + "applied" + ); + assert_eq!(state["resource_statuses"]["policy.base"]["status"], "applied"); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + fn desired_revision_digest(out: &ApplyOutput) -> String { + out.desired_revision.config_digest.clone().unwrap() + } + + #[test] + fn apply_update_changes_query_digest_and_keeps_old_blob() { + let dir = fixture(); + let desired = validate_config_dir(dir.path()); + let schema_digest = desired + .resource_digests + .get("schema.knowledge") + .unwrap() + .clone(); + let old_digest = "0".repeat(64); + let graph_composite = + graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_composite.as_str()), + ("schema.knowledge", schema_digest.as_str()), + ("query.knowledge.find_person", old_digest.as_str()), + ], + ); + let old_blob = query_payload_path(dir.path(), &old_digest); + fs::create_dir_all(old_blob.parent().unwrap()).unwrap(); + fs::write(&old_blob, "old query source").unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + let new_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap(); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"], + *new_digest + ); + assert_eq!(fs::read_to_string(&old_blob).unwrap(), "old query source"); + assert!(query_payload_path(dir.path(), new_digest).exists()); + } + + #[test] + fn apply_deletes_removed_resources_but_keeps_blobs() { + let dir = fixture(); + let desired = validate_config_dir(dir.path()); + let schema_digest = desired + .resource_digests + .get("schema.knowledge") + .unwrap() + .clone(); + let stale_query_digest = "1".repeat(64); + let stale_policy_digest = "2".repeat(64); + let graph_composite = + graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_composite.as_str()), + ("schema.knowledge", schema_digest.as_str()), + ("query.knowledge.orphan", stale_query_digest.as_str()), + ("policy.old", stale_policy_digest.as_str()), + ], + ); + let stale_blob = dir + .path() + .join(CLUSTER_RESOURCES_DIR) + .join("policy/old") + .join(format!("{stale_policy_digest}.yaml")); + fs::create_dir_all(stale_blob.parent().unwrap()).unwrap(); + fs::write(&stale_blob, "old policy").unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.converged); + let state = read_state_json(dir.path()); + let resources = &state["applied_revision"]["resources"]; + assert!(resources.get("query.knowledge.orphan").is_none()); + assert!(resources.get("policy.old").is_none()); + assert!( + state["resource_statuses"] + .get("query.knowledge.orphan") + .is_none() + ); + // Deleted resources leave their content-addressed blobs in place; GC is + // a later stage. + assert_eq!(fs::read_to_string(&stale_blob).unwrap(), "old policy"); + // The composite no longer includes the orphan query. + let query_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap() + .clone(); + let expected_composite = graph_digest( + "knowledge", + Some(&schema_digest), + Some(&[("find_person".to_string(), query_digest)].into_iter().collect()), + ); + assert_eq!(resources["graph.knowledge"]["digest"], expected_composite); + } + + #[test] + fn apply_defers_schema_change_and_blocks_dependent_query() { + let dir = fixture(); + write_applyable_state(dir.path()); + // Change the schema after seeding state: schema.knowledge now differs. + fs::write( + dir.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + ) + .unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!out.converged); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["schema.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Blocked) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].reason.as_deref(), + Some("dependency_not_applied") + ); + // Policy is independent of the schema and still applies. + assert_eq!( + by_resource["policy.base"].disposition, + Some(ApplyDisposition::Applied) + ); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "apply_unsupported_change") + ); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "apply_dependency_blocked") + ); + + let state = read_state_json(dir.path()); + assert_eq!( + state["resource_statuses"]["query.knowledge.find_person"]["status"], + "blocked" + ); + // The blocked query wrote no payload and no state digest. + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_none() + ); + assert!( + !dir.path() + .join(CLUSTER_RESOURCES_DIR) + .join("query") + .exists() + ); + // Not converged: the applied config digest must not be claimed. + assert!( + state["applied_revision"] + .get("config_digest") + .is_none_or(serde_json::Value::is_null) + ); + } + + #[test] + fn apply_blocks_resources_of_uncreated_graph() { + let dir = fixture(); + write_state_resources(dir.path(), &[]); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(out.applied_count, 0); + assert!(!out.converged); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].reason.as_deref(), + Some("dependency_missing") + ); + assert_eq!( + by_resource["policy.base"].reason.as_deref(), + Some("dependency_missing") + ); + // Statuses for blocked resources are recorded (state changed), but no + // resource digests moved. + let state = read_state_json(dir.path()); + assert_eq!(state["state_revision"], 2); + assert!( + state["applied_revision"]["resources"] + .as_object() + .unwrap() + .is_empty() + ); + assert_eq!( + state["resource_statuses"]["policy.base"]["status"], + "blocked" + ); + } + + #[test] + fn apply_does_not_delete_subtree_of_deleted_graph() { + let dir = fixture(); + let desired = validate_config_dir(dir.path()); + let schema_digest = desired + .resource_digests + .get("schema.knowledge") + .unwrap() + .clone(); + let graph_composite = + graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_composite.as_str()), + ("schema.knowledge", schema_digest.as_str()), + ("graph.old", "3333"), + ("schema.old", "4444"), + ("query.old.q", "5555"), + ], + ); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!out.converged); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["graph.old"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["schema.old"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["query.old.q"].disposition, + Some(ApplyDisposition::Blocked) + ); + let state = read_state_json(dir.path()); + let resources = &state["applied_revision"]["resources"]; + assert_eq!(resources["graph.old"]["digest"], "3333"); + assert_eq!(resources["schema.old"]["digest"], "4444"); + assert_eq!(resources["query.old.q"]["digest"], "5555"); + } + + #[test] + fn apply_is_idempotent() { + let dir = fixture(); + write_applyable_state(dir.path()); + + let first = apply_config_dir(dir.path()); + assert!(first.ok, "{:?}", first.diagnostics); + assert!(first.state_written); + let state_after_first = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); + + let second = apply_config_dir(dir.path()); + assert!(second.ok, "{:?}", second.diagnostics); + assert!(second.changes.is_empty()); + assert_eq!(second.applied_count, 0); + assert!(second.converged); + assert!(!second.state_written); + let state_after_second = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); + assert_eq!(state_after_first, state_after_second); + assert_eq!(second.state_observations.state_revision, 2); + } + + #[test] + fn apply_respects_held_lock() { + let dir = fixture(); + write_applyable_state(dir.path()); + write_lock_file(dir.path(), "held-lock", "plan"); + + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_held") + ); + // The held lock survives a refused apply, and nothing was written. + assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); + assert!(!dir.path().join(CLUSTER_RESOURCES_DIR).exists()); + let state = read_state_json(dir.path()); + assert_eq!(state["state_revision"], 1); + } + + #[test] + fn apply_state_lock_false_bypasses_with_warning() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +state: + backend: cluster + lock: false +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +"#, + ) + .unwrap(); + write_applyable_state(dir.path()); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.state_written); + assert!(!out.state_observations.lock_acquired); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_disabled") + ); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[test] + fn apply_skips_existing_payload_blob() { + let dir = fixture(); + write_applyable_state(dir.path()); + let desired = validate_config_dir(dir.path()); + let query_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap() + .clone(); + // Content-addressed blobs are trusted by name: an existing file is + // never rewritten. + let blob = query_payload_path(dir.path(), &query_digest); + fs::create_dir_all(blob.parent().unwrap()).unwrap(); + fs::write(&blob, "pre-existing").unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(fs::read_to_string(&blob).unwrap(), "pre-existing"); + } + + #[test] + fn apply_invalid_config_fails_before_lock() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + "version: 1\nnot_a_field: true\n", + ) + .unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + // Config errors bail before the lock or any state directory exists. + assert!(!dir.path().join(CLUSTER_STATE_DIR).exists()); + } + + #[test] + fn plan_annotates_apply_dispositions() { + let dir = fixture(); + let out = plan_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + // Empty state: graph/schema creates are deferred, query/policy blocked + // on the uncreated graph — and plan says so before apply runs. + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["schema.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Blocked) + ); + assert_eq!( + by_resource["policy.base"].reason.as_deref(), + Some("dependency_missing") + ); + } }