From 1f8e5945cfcaf99bca75992f51dc786122b237d7 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Tue, 9 Jun 2026 23:32:13 +0300 Subject: [PATCH 1/6] feat(cluster): config-only apply with content-addressed catalog publish apply_config_dir executes the query/policy subset of the plan: payloads are written content-addressed under __cluster/resources/{query,policy}/... before the state CAS (state is the publish point; orphaned blobs from a failed CAS are inert and re-apply is the repair), then state.json is CAS-updated with applied digests, Applied/Blocked statuses, and a revision bump. Graph/schema changes are never executed here: schema content and graph lifecycle defer to a later phase with loud warnings, while graph. composite-digest updates whose schema component is unchanged converge automatically via recomputation from state's own components (without which apply could never converge). Idempotent re-apply leaves state bytes and revision untouched. PlanChange gains optional disposition/reason fields, populated by the same classifier in cluster plan, so plan is an honest preview of what apply will execute, derive, defer, or block. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/lib.rs | 1148 ++++++++++++++++++++++++++- 1 file changed, 1147 insertions(+), 1 deletion(-) diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 7ae824c..01ad171 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -21,6 +21,7 @@ pub const CLUSTER_GRAPHS_DIR: &str = "graphs"; pub const CLUSTER_STATE_DIR: &str = "__cluster"; pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json"; +pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources"; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -161,6 +162,23 @@ pub enum PlanOperation { Delete, } +/// How `cluster apply` treats a planned change in the current stage. +/// +/// `Applied` changes execute (config-only query/policy catalog writes). +/// `Derived` marks a `graph.` composite-digest update that converges +/// automatically once its applied query digests land in state. `Deferred` +/// changes need a later phase (graph/schema lifecycle or schema content). +/// `Blocked` query/policy changes are gated by an unapplied or missing +/// dependency. +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ApplyDisposition { + Applied, + Derived, + Deferred, + Blocked, +} + #[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct PlanChange { pub resource: String, @@ -169,6 +187,10 @@ pub struct PlanChange { pub before_digest: Option, #[serde(skip_serializing_if = "Option::is_none")] pub after_digest: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub disposition: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option, } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] @@ -236,6 +258,28 @@ pub struct ForceUnlockOutput { pub diagnostics: Vec, } +/// Output of config-only `cluster apply`. "Applied" means recorded in the +/// local cluster catalog (`__cluster/`); nothing applied here serves traffic — +/// the server still boots from `omnigraph.yaml` until the server-boot stage. +#[derive(Debug, Clone, Serialize)] +pub struct ApplyOutput { + pub ok: bool, + pub config_dir: String, + pub desired_revision: DesiredRevision, + pub state_observations: StateObservations, + /// Every planned change, with `disposition`/`reason` always populated. + pub changes: Vec, + pub applied_count: usize, + /// Deferred + Blocked changes (Derived composite updates count as neither). + pub deferred_count: usize, + /// True when state matches the desired revision after this apply. + pub converged: bool, + /// False for a no-op re-apply: state bytes (and revision) were left untouched. + pub state_written: bool, + pub resource_statuses: BTreeMap, + pub diagnostics: Vec, +} + #[derive(Debug, Clone)] struct DesiredCluster { config_dir: PathBuf, @@ -477,11 +521,12 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } } - let changes = if has_errors(&diagnostics) { + let mut changes = if has_errors(&diagnostics) { Vec::new() } else { diff_resources(&prior_resources, &desired.resource_digests) }; + classify_changes(&mut changes, &desired.dependencies); let blast_radius = compute_blast_radius(&changes, &desired.dependencies); let approvals_required = compute_approvals(&changes); let ok = !has_errors(&diagnostics); @@ -502,6 +547,317 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } } +/// Config-only `cluster apply` (Stage 3A): execute the query/policy subset of +/// the plan against the local cluster catalog. The plan is recomputed under +/// the state lock, so freshness is structural; the state CAS inside +/// `write_state` is the second fence. Graph/schema changes are never executed +/// here — they are deferred to the graph-lifecycle phase and reported loudly. +/// +/// Payloads are content-addressed and written BEFORE the state CAS because +/// state is the publish point: a failure after payload writes leaves inert +/// digest-named blobs and no success acknowledgement; re-running apply is the +/// repair. +pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { + let outcome = load_desired(config_dir.as_ref()); + let mut diagnostics = outcome.diagnostics; + let backend = LocalStateBackend::new(&outcome.config_dir); + let mut observations = backend.observations(); + + let early_return = |config_dir: String, + config_digest: Option, + observations: StateObservations, + changes: Vec, + resource_statuses: BTreeMap, + diagnostics: Vec| { + ApplyOutput { + ok: !has_errors(&diagnostics), + config_dir, + desired_revision: DesiredRevision { + config_digest, + }, + state_observations: observations, + changes, + applied_count: 0, + deferred_count: 0, + converged: false, + state_written: false, + resource_statuses, + diagnostics, + } + }; + + let Some(desired) = outcome.desired else { + return early_return( + display_path(&outcome.config_dir), + None, + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + }; + + if has_errors(&diagnostics) { + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + } + + // Named guard: the lock must be held until the state outcome is recorded. + let _lock_guard = if desired.state_lock { + match backend.acquire_lock("apply", &mut observations) { + Ok(guard) => Some(guard), + Err(diagnostic) => { + diagnostics.push(diagnostic); + None + } + } + } else { + diagnostics.push(Diagnostic::warning( + "state_lock_disabled", + "state.lock", + "state.lock is false; apply wrote state without acquiring the cluster state lock", + )); + None + }; + + if has_errors(&diagnostics) { + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + } + + let snapshot = match backend.read_state(&mut observations) { + Ok(snapshot) => snapshot, + Err(diagnostic) => { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + } + }; + let expected_cas = snapshot.state_cas; + let Some(state) = snapshot.state else { + diagnostics.push(Diagnostic::error( + "state_missing", + CLUSTER_STATE_FILE, + "apply requires an existing state.json; run `cluster import` to bootstrap state", + )); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + Vec::new(), + BTreeMap::new(), + diagnostics, + ); + }; + + let prior_resources = state_resource_digests(&state); + let mut changes = diff_resources(&prior_resources, &desired.resource_digests); + classify_changes(&mut changes, &desired.dependencies); + + // Defensive invariant: nothing the approval gate covers may be executable. + // Today approvals only cover graph/schema deletes (always deferred); this + // keeps a future widening of the executable set from silently bypassing it. + let approvals = compute_approvals(&changes); + let approval_violation = changes.iter().any(|change| { + change.disposition == Some(ApplyDisposition::Applied) + && approvals + .iter() + .any(|approval| approval.resource == change.resource) + }); + if approval_violation { + diagnostics.push(Diagnostic::error( + "apply_approval_invariant_violation", + "changes", + "an executable change requires approval; refusing to apply", + )); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + + for change in &changes { + match change.disposition { + Some(ApplyDisposition::Deferred) => diagnostics.push(Diagnostic::warning( + "apply_unsupported_change", + change.resource.clone(), + "graph/schema changes are not applied in this stage; they are deferred to the graph-lifecycle phase", + )), + Some(ApplyDisposition::Blocked) => diagnostics.push(Diagnostic::warning( + "apply_dependency_blocked", + change.resource.clone(), + format!( + "blocked by an unapplied or missing dependency ({})", + change.reason.as_deref().unwrap_or("dependency") + ), + )), + _ => {} + } + } + + // Payload phase: content-addressed writes before the state CAS. Any + // failure aborts before state moves; blobs already written are inert. + let source_paths: BTreeMap<&str, &str> = desired + .resources + .iter() + .filter_map(|resource| { + resource + .path + .as_deref() + .map(|path| (resource.address.as_str(), path)) + }) + .collect(); + for change in &changes { + if change.disposition != Some(ApplyDisposition::Applied) + || change.operation == PlanOperation::Delete + { + continue; + } + let kind = resource_kind(&change.resource); + let digest = change + .after_digest + .as_deref() + .expect("create/update always carries an after digest"); + let Some(target) = payload_path(&desired.config_dir, &kind, digest) else { + continue; + }; + let Some(source) = source_paths.get(change.resource.as_str()) else { + diagnostics.push(Diagnostic::error( + "resource_payload_write_error", + change.resource.clone(), + "no source file recorded for resource", + )); + continue; + }; + if let Err(diagnostic) = + write_resource_payload(&target, Path::new(source), digest, &change.resource) + { + diagnostics.push(diagnostic); + } + } + if has_errors(&diagnostics) { + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + + // State mutation. Apply owns query/policy statuses only; graph/schema + // statuses belong to refresh/import observation and must not be clobbered. + let before_value = + serde_json::to_value(&state).expect("cluster state must serialize deterministically"); + let mut new_state = state.clone(); + for change in &changes { + match change.disposition { + Some(ApplyDisposition::Applied) => match change.operation { + PlanOperation::Create | PlanOperation::Update => { + new_state.applied_revision.resources.insert( + change.resource.clone(), + StateResource { + digest: change + .after_digest + .clone() + .expect("create/update always carries an after digest"), + }, + ); + set_resource_status_applied(&mut new_state, &change.resource); + } + PlanOperation::Delete => { + new_state.applied_revision.resources.remove(&change.resource); + new_state.resource_statuses.remove(&change.resource); + } + }, + Some(ApplyDisposition::Blocked) => { + set_resource_status( + &mut new_state, + &change.resource, + ResourceLifecycleStatus::Blocked, + change.reason.as_deref().unwrap_or("dependency_not_applied"), + "waiting on an unapplied or missing dependency", + ); + } + _ => {} + } + } + recompute_state_graph_digests(&mut new_state, &desired); + + let residual = diff_resources( + &state_resource_digests(&new_state), + &desired.resource_digests, + ); + let converged = residual.is_empty(); + if converged { + new_state.applied_revision.config_digest = Some(desired.config_digest.clone()); + } + + let after_value = + serde_json::to_value(&new_state).expect("cluster state must serialize deterministically"); + let mut state_written = false; + if after_value != before_value { + new_state.state_revision = new_state.state_revision.saturating_add(1); + match backend.write_state(&new_state, expected_cas.as_deref(), &mut observations) { + Ok(()) => state_written = true, + Err(diagnostic) => diagnostics.push(diagnostic), + } + } + + let applied_count = changes + .iter() + .filter(|change| change.disposition == Some(ApplyDisposition::Applied)) + .count(); + let deferred_count = changes + .iter() + .filter(|change| { + matches!( + change.disposition, + Some(ApplyDisposition::Deferred) | Some(ApplyDisposition::Blocked) + ) + }) + .count(); + + ApplyOutput { + ok: !has_errors(&diagnostics), + config_dir: display_path(&desired.config_dir), + desired_revision: DesiredRevision { + config_digest: Some(desired.config_digest), + }, + state_observations: observations, + changes, + applied_count, + deferred_count, + converged, + state_written, + resource_statuses: new_state.resource_statuses, + diagnostics, + } +} + pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; @@ -1797,12 +2153,16 @@ fn diff_resources( operation: PlanOperation::Create, before_digest: None, after_digest: Some(after.clone()), + disposition: None, + reason: None, }), Some(before) if before != after => changes.push(PlanChange { resource: address.clone(), operation: PlanOperation::Update, before_digest: Some(before.clone()), after_digest: Some(after.clone()), + disposition: None, + reason: None, }), Some(_) => {} } @@ -1814,6 +2174,8 @@ fn diff_resources( operation: PlanOperation::Delete, before_digest: Some(before.clone()), after_digest: None, + disposition: None, + reason: None, }); } } @@ -1855,6 +2217,249 @@ fn compute_approvals(changes: &[PlanChange]) -> Vec { .collect() } +#[derive(Debug, PartialEq, Eq)] +enum ResourceKind { + Graph(String), + Schema(String), + Query { graph: String, name: String }, + Policy(String), + Unknown, +} + +fn resource_kind(address: &str) -> ResourceKind { + if let Some(graph) = address.strip_prefix("graph.") { + ResourceKind::Graph(graph.to_string()) + } else if let Some(graph) = address.strip_prefix("schema.") { + ResourceKind::Schema(graph.to_string()) + } else if let Some(rest) = address.strip_prefix("query.") { + match rest.split_once('.') { + Some((graph, name)) => ResourceKind::Query { + graph: graph.to_string(), + name: name.to_string(), + }, + None => ResourceKind::Unknown, + } + } else if let Some(name) = address.strip_prefix("policy.") { + ResourceKind::Policy(name.to_string()) + } else { + ResourceKind::Unknown + } +} + +/// Classify every planned change with the disposition config-only apply gives +/// it. Stage 3A executes only query/policy catalog writes; graph/schema +/// movement is a later phase, and `graph.` composite updates whose schema +/// component is unchanged converge automatically once query digests land. +fn classify_changes(changes: &mut [PlanChange], dependencies: &[Dependency]) { + let mut schema_changed = BTreeSet::new(); + let mut graph_creates = BTreeSet::new(); + let mut graph_deletes = BTreeSet::new(); + for change in changes.iter() { + match resource_kind(&change.resource) { + ResourceKind::Schema(graph) => { + schema_changed.insert(graph); + } + ResourceKind::Graph(graph) => match change.operation { + PlanOperation::Create => { + graph_creates.insert(graph); + } + PlanOperation::Delete => { + graph_deletes.insert(graph); + } + PlanOperation::Update => {} + }, + _ => {} + } + } + + for change in changes.iter_mut() { + let (disposition, reason) = match resource_kind(&change.resource) { + ResourceKind::Schema(_) => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), + ResourceKind::Graph(graph) => match change.operation { + PlanOperation::Update if !schema_changed.contains(&graph) => { + (ApplyDisposition::Derived, None) + } + _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), + }, + ResourceKind::Query { graph, .. } => match change.operation { + PlanOperation::Delete => { + if graph_deletes.contains(&graph) { + ( + ApplyDisposition::Blocked, + Some("dependency_not_applied"), + ) + } else { + (ApplyDisposition::Applied, None) + } + } + PlanOperation::Create | PlanOperation::Update => { + // A missing graph is the more fundamental blocker than a + // pending schema change, so check it first. + if graph_creates.contains(&graph) { + (ApplyDisposition::Blocked, Some("dependency_missing")) + } else if schema_changed.contains(&graph) { + ( + ApplyDisposition::Blocked, + Some("dependency_not_applied"), + ) + } else { + (ApplyDisposition::Applied, None) + } + } + }, + ResourceKind::Policy(_) => match change.operation { + PlanOperation::Delete => (ApplyDisposition::Applied, None), + PlanOperation::Create | PlanOperation::Update => { + let blocked_dep = dependencies.iter().any(|dep| { + dep.from == change.resource + && dep + .to + .strip_prefix("graph.") + .is_some_and(|graph| graph_creates.contains(graph)) + }); + if blocked_dep { + (ApplyDisposition::Blocked, Some("dependency_missing")) + } else { + (ApplyDisposition::Applied, None) + } + } + }, + ResourceKind::Unknown => { + (ApplyDisposition::Deferred, Some("apply_unsupported_kind")) + } + }; + change.disposition = Some(disposition); + change.reason = reason.map(str::to_string); + } +} + +/// Content-addressed catalog path for an applied resource payload. Extensions +/// are fixed per kind (`.gq` / `.yaml`) regardless of the source file's name, +/// so the catalog layout cannot drift with operator file conventions. +fn payload_path(config_dir: &Path, kind: &ResourceKind, digest: &str) -> Option { + let resources_dir = config_dir.join(CLUSTER_RESOURCES_DIR); + match kind { + ResourceKind::Query { graph, name } => Some( + resources_dir + .join("query") + .join(graph) + .join(name) + .join(format!("{digest}.gq")), + ), + ResourceKind::Policy(name) => Some( + resources_dir + .join("policy") + .join(name) + .join(format!("{digest}.yaml")), + ), + _ => None, + } +} + +/// Write one content-addressed payload blob. Idempotent: an existing +/// digest-named file is trusted as-is. The digest re-check is the apply-side +/// TOCTOU detector — the source file changing between `load_desired` and the +/// payload write must fail loudly, never publish mismatched content. +fn write_resource_payload( + target: &Path, + source: &Path, + expected_digest: &str, + resource: &str, +) -> Result<(), Diagnostic> { + if target.exists() { + return Ok(()); + } + let bytes = fs::read(source).map_err(|err| { + Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not read resource source '{}': {err}", source.display()), + ) + })?; + if sha256_hex(&bytes) != expected_digest { + return Err(Diagnostic::error( + "resource_content_changed", + resource, + format!( + "resource source '{}' changed while apply was running; re-run `cluster apply`", + source.display() + ), + )); + } + let parent = target.parent().expect("payload path always has a parent"); + fs::create_dir_all(parent).map_err(|err| { + Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not create payload directory: {err}"), + ) + })?; + let file_name = target + .file_name() + .expect("payload path always has a file name") + .to_string_lossy(); + let tmp_path = parent.join(format!("{file_name}.tmp.{}", Ulid::new())); + let mut file = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tmp_path) + .map_err(|err| { + Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not create temporary payload file: {err}"), + ) + })?; + let write_result = file + .write_all(&bytes) + .and_then(|()| file.sync_all()) + .map_err(|err| { + Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not write payload file: {err}"), + ) + }); + drop(file); + if let Err(diagnostic) = write_result { + let _ = fs::remove_file(&tmp_path); + return Err(diagnostic); + } + if let Err(err) = fs::rename(&tmp_path, target) { + let _ = fs::remove_file(&tmp_path); + return Err(Diagnostic::error( + "resource_payload_write_error", + resource, + format!("could not move payload file into place: {err}"), + )); + } + Ok(()) +} + +/// Recompute the composite `graph.` digests for state-resident graphs from +/// state's own schema/query components. Without this, an applied query change +/// would leave the prior composite digest in state and `graph.` would show +/// a phantom update in every later plan — apply could never converge. +fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredCluster) { + for graph in &desired.graphs { + let graph_address = graph_address(&graph.id); + if !state.applied_revision.resources.contains_key(&graph_address) { + continue; + } + let schema_digest = state + .applied_revision + .resources + .get(&schema_address(&graph.id)) + .map(|resource| resource.digest.clone()); + let query_digests = state_query_digests_for_graph(state, &graph.id); + let digest = graph_digest(&graph.id, schema_digest.as_ref(), Some(&query_digests)); + state + .applied_revision + .resources + .insert(graph_address, StateResource { digest }); + } +} + fn duplicate_key_diagnostics(text: &str) -> Vec { #[derive(Debug)] struct Frame { @@ -3115,4 +3720,545 @@ graphs: ); assert!(!dir.path().join(CLUSTER_STATE_FILE).exists()); } + + // ---- config-only apply (Stage 3A) ---- + + /// Seed a state.json that simulates "graph exists with the desired schema, + /// queries/policies not yet applied" by borrowing the desired digests. + fn write_applyable_state(config_dir: &Path) { + let out = validate_config_dir(config_dir); + assert!(out.ok, "{:?}", out.diagnostics); + let schema_digest = out.resource_digests.get("schema.knowledge").unwrap().clone(); + let graph_composite = + graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + write_state_resources( + config_dir, + &[ + ("graph.knowledge", graph_composite.as_str()), + ("schema.knowledge", schema_digest.as_str()), + ], + ); + } + + fn write_state_resources(config_dir: &Path, resources: &[(&str, &str)]) { + let resource_map: serde_json::Map = resources + .iter() + .map(|(address, digest)| ((*address).to_string(), json!({ "digest": digest }))) + .collect(); + let state_dir = config_dir.join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + serde_json::to_string_pretty(&json!({ + "version": 1, + "state_revision": 1, + "applied_revision": { "resources": resource_map } + })) + .unwrap(), + ) + .unwrap(); + } + + fn read_state_json(config_dir: &Path) -> serde_json::Value { + serde_json::from_str(&fs::read_to_string(config_dir.join(CLUSTER_STATE_FILE)).unwrap()) + .unwrap() + } + + fn query_payload_path(config_dir: &Path, digest: &str) -> std::path::PathBuf { + config_dir + .join(CLUSTER_RESOURCES_DIR) + .join("query/knowledge/find_person") + .join(format!("{digest}.gq")) + } + + fn policy_payload_path(config_dir: &Path, digest: &str) -> std::path::PathBuf { + config_dir + .join(CLUSTER_RESOURCES_DIR) + .join("policy/base") + .join(format!("{digest}.yaml")) + } + + #[test] + fn apply_without_state_fails_with_state_missing() { + let dir = fixture(); + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_missing" + && diagnostic.message.contains("cluster import")) + ); + assert!(!dir.path().join(CLUSTER_STATE_FILE).exists()); + assert!(!dir.path().join(CLUSTER_RESOURCES_DIR).exists()); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[test] + fn apply_writes_payloads_state_and_statuses() { + let dir = fixture(); + write_applyable_state(dir.path()); + let desired = validate_config_dir(dir.path()); + let query_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap() + .clone(); + let policy_digest = desired.resource_digests.get("policy.base").unwrap().clone(); + let schema_digest = desired + .resource_digests + .get("schema.knowledge") + .unwrap() + .clone(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(out.applied_count, 2); + assert_eq!(out.deferred_count, 0); + assert!(out.converged); + assert!(out.state_written); + + let query_blob = query_payload_path(dir.path(), &query_digest); + assert_eq!(fs::read_to_string(&query_blob).unwrap(), QUERY); + let policy_blob = policy_payload_path(dir.path(), &policy_digest); + assert_eq!(fs::read_to_string(&policy_blob).unwrap(), "rules: []\n"); + + let state = read_state_json(dir.path()); + assert_eq!(state["state_revision"], 2); + let resources = &state["applied_revision"]["resources"]; + assert_eq!( + resources["query.knowledge.find_person"]["digest"], + query_digest + ); + assert_eq!(resources["policy.base"]["digest"], policy_digest); + let expected_composite = graph_digest( + "knowledge", + Some(&schema_digest), + Some( + &[("find_person".to_string(), query_digest.clone())] + .into_iter() + .collect(), + ), + ); + assert_eq!(resources["graph.knowledge"]["digest"], expected_composite); + assert_eq!( + state["applied_revision"]["config_digest"], + desired_revision_digest(&out) + ); + assert_eq!( + state["resource_statuses"]["query.knowledge.find_person"]["status"], + "applied" + ); + assert_eq!(state["resource_statuses"]["policy.base"]["status"], "applied"); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + fn desired_revision_digest(out: &ApplyOutput) -> String { + out.desired_revision.config_digest.clone().unwrap() + } + + #[test] + fn apply_update_changes_query_digest_and_keeps_old_blob() { + let dir = fixture(); + let desired = validate_config_dir(dir.path()); + let schema_digest = desired + .resource_digests + .get("schema.knowledge") + .unwrap() + .clone(); + let old_digest = "0".repeat(64); + let graph_composite = + graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_composite.as_str()), + ("schema.knowledge", schema_digest.as_str()), + ("query.knowledge.find_person", old_digest.as_str()), + ], + ); + let old_blob = query_payload_path(dir.path(), &old_digest); + fs::create_dir_all(old_blob.parent().unwrap()).unwrap(); + fs::write(&old_blob, "old query source").unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + let new_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap(); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"], + *new_digest + ); + assert_eq!(fs::read_to_string(&old_blob).unwrap(), "old query source"); + assert!(query_payload_path(dir.path(), new_digest).exists()); + } + + #[test] + fn apply_deletes_removed_resources_but_keeps_blobs() { + let dir = fixture(); + let desired = validate_config_dir(dir.path()); + let schema_digest = desired + .resource_digests + .get("schema.knowledge") + .unwrap() + .clone(); + let stale_query_digest = "1".repeat(64); + let stale_policy_digest = "2".repeat(64); + let graph_composite = + graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_composite.as_str()), + ("schema.knowledge", schema_digest.as_str()), + ("query.knowledge.orphan", stale_query_digest.as_str()), + ("policy.old", stale_policy_digest.as_str()), + ], + ); + let stale_blob = dir + .path() + .join(CLUSTER_RESOURCES_DIR) + .join("policy/old") + .join(format!("{stale_policy_digest}.yaml")); + fs::create_dir_all(stale_blob.parent().unwrap()).unwrap(); + fs::write(&stale_blob, "old policy").unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.converged); + let state = read_state_json(dir.path()); + let resources = &state["applied_revision"]["resources"]; + assert!(resources.get("query.knowledge.orphan").is_none()); + assert!(resources.get("policy.old").is_none()); + assert!( + state["resource_statuses"] + .get("query.knowledge.orphan") + .is_none() + ); + // Deleted resources leave their content-addressed blobs in place; GC is + // a later stage. + assert_eq!(fs::read_to_string(&stale_blob).unwrap(), "old policy"); + // The composite no longer includes the orphan query. + let query_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap() + .clone(); + let expected_composite = graph_digest( + "knowledge", + Some(&schema_digest), + Some(&[("find_person".to_string(), query_digest)].into_iter().collect()), + ); + assert_eq!(resources["graph.knowledge"]["digest"], expected_composite); + } + + #[test] + fn apply_defers_schema_change_and_blocks_dependent_query() { + let dir = fixture(); + write_applyable_state(dir.path()); + // Change the schema after seeding state: schema.knowledge now differs. + fs::write( + dir.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + ) + .unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!out.converged); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["schema.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Blocked) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].reason.as_deref(), + Some("dependency_not_applied") + ); + // Policy is independent of the schema and still applies. + assert_eq!( + by_resource["policy.base"].disposition, + Some(ApplyDisposition::Applied) + ); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "apply_unsupported_change") + ); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "apply_dependency_blocked") + ); + + let state = read_state_json(dir.path()); + assert_eq!( + state["resource_statuses"]["query.knowledge.find_person"]["status"], + "blocked" + ); + // The blocked query wrote no payload and no state digest. + assert!( + state["applied_revision"]["resources"] + .get("query.knowledge.find_person") + .is_none() + ); + assert!( + !dir.path() + .join(CLUSTER_RESOURCES_DIR) + .join("query") + .exists() + ); + // Not converged: the applied config digest must not be claimed. + assert!( + state["applied_revision"] + .get("config_digest") + .is_none_or(serde_json::Value::is_null) + ); + } + + #[test] + fn apply_blocks_resources_of_uncreated_graph() { + let dir = fixture(); + write_state_resources(dir.path(), &[]); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(out.applied_count, 0); + assert!(!out.converged); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].reason.as_deref(), + Some("dependency_missing") + ); + assert_eq!( + by_resource["policy.base"].reason.as_deref(), + Some("dependency_missing") + ); + // Statuses for blocked resources are recorded (state changed), but no + // resource digests moved. + let state = read_state_json(dir.path()); + assert_eq!(state["state_revision"], 2); + assert!( + state["applied_revision"]["resources"] + .as_object() + .unwrap() + .is_empty() + ); + assert_eq!( + state["resource_statuses"]["policy.base"]["status"], + "blocked" + ); + } + + #[test] + fn apply_does_not_delete_subtree_of_deleted_graph() { + let dir = fixture(); + let desired = validate_config_dir(dir.path()); + let schema_digest = desired + .resource_digests + .get("schema.knowledge") + .unwrap() + .clone(); + let graph_composite = + graph_digest("knowledge", Some(&schema_digest), Some(&BTreeMap::new())); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_composite.as_str()), + ("schema.knowledge", schema_digest.as_str()), + ("graph.old", "3333"), + ("schema.old", "4444"), + ("query.old.q", "5555"), + ], + ); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!out.converged); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["graph.old"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["schema.old"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["query.old.q"].disposition, + Some(ApplyDisposition::Blocked) + ); + let state = read_state_json(dir.path()); + let resources = &state["applied_revision"]["resources"]; + assert_eq!(resources["graph.old"]["digest"], "3333"); + assert_eq!(resources["schema.old"]["digest"], "4444"); + assert_eq!(resources["query.old.q"]["digest"], "5555"); + } + + #[test] + fn apply_is_idempotent() { + let dir = fixture(); + write_applyable_state(dir.path()); + + let first = apply_config_dir(dir.path()); + assert!(first.ok, "{:?}", first.diagnostics); + assert!(first.state_written); + let state_after_first = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); + + let second = apply_config_dir(dir.path()); + assert!(second.ok, "{:?}", second.diagnostics); + assert!(second.changes.is_empty()); + assert_eq!(second.applied_count, 0); + assert!(second.converged); + assert!(!second.state_written); + let state_after_second = fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap(); + assert_eq!(state_after_first, state_after_second); + assert_eq!(second.state_observations.state_revision, 2); + } + + #[test] + fn apply_respects_held_lock() { + let dir = fixture(); + write_applyable_state(dir.path()); + write_lock_file(dir.path(), "held-lock", "plan"); + + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_held") + ); + // The held lock survives a refused apply, and nothing was written. + assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); + assert!(!dir.path().join(CLUSTER_RESOURCES_DIR).exists()); + let state = read_state_json(dir.path()); + assert_eq!(state["state_revision"], 1); + } + + #[test] + fn apply_state_lock_false_bypasses_with_warning() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +state: + backend: cluster + lock: false +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +"#, + ) + .unwrap(); + write_applyable_state(dir.path()); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.state_written); + assert!(!out.state_observations.lock_acquired); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_disabled") + ); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[test] + fn apply_skips_existing_payload_blob() { + let dir = fixture(); + write_applyable_state(dir.path()); + let desired = validate_config_dir(dir.path()); + let query_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap() + .clone(); + // Content-addressed blobs are trusted by name: an existing file is + // never rewritten. + let blob = query_payload_path(dir.path(), &query_digest); + fs::create_dir_all(blob.parent().unwrap()).unwrap(); + fs::write(&blob, "pre-existing").unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(fs::read_to_string(&blob).unwrap(), "pre-existing"); + } + + #[test] + fn apply_invalid_config_fails_before_lock() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + "version: 1\nnot_a_field: true\n", + ) + .unwrap(); + + let out = apply_config_dir(dir.path()); + assert!(!out.ok); + // Config errors bail before the lock or any state directory exists. + assert!(!dir.path().join(CLUSTER_STATE_DIR).exists()); + } + + #[test] + fn plan_annotates_apply_dispositions() { + let dir = fixture(); + let out = plan_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + // Empty state: graph/schema creates are deferred, query/policy blocked + // on the uncreated graph — and plan says so before apply runs. + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["schema.knowledge"].disposition, + Some(ApplyDisposition::Deferred) + ); + assert_eq!( + by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Blocked) + ); + assert_eq!( + by_resource["policy.base"].reason.as_deref(), + Some("dependency_missing") + ); + } } From bcef8444dd59cfdecd29c7f7a6b845b8a705423f Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Tue, 9 Jun 2026 23:34:48 +0300 Subject: [PATCH 2/6] feat(cli): omnigraph cluster apply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Terraform-style: apply executes directly (cluster plan is the preview, now annotated with apply dispositions). Human output prints per-change dispositions, convergence, and the catalog-only caveat; --json emits the full ApplyOutput. Exit is non-zero only on errors — deferred/blocked changes are warnings with converged: false as the automation signal. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 65 +++++++++++++- crates/omnigraph-cli/tests/cli.rs | 136 ++++++++++++++++++++++++++++++ 2 files changed, 199 insertions(+), 2 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 971ffff..42bbed8 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ - DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, - ValidateOutput, force_unlock_config_dir, import_config_dir, plan_config_dir, + ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, + ValidateOutput, apply_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; @@ -361,6 +361,16 @@ enum ClusterCommand { #[arg(long)] json: bool, }, + /// Apply the config-only (query/policy) subset of the plan to the local + /// cluster catalog. Graph/schema changes are deferred to a later stage. + Apply { + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, /// Read the local JSON state ledger without scanning live graph resources. Status { /// Cluster config directory containing cluster.yaml. @@ -804,6 +814,40 @@ fn print_cluster_plan_human(output: &PlanOutput) { print_cluster_diagnostics(&output.diagnostics); } +fn print_cluster_apply_human(output: &ApplyOutput) { + if output.ok { + println!( + "cluster apply: {} applied, {} deferred/blocked", + output.applied_count, output.deferred_count + ); + for change in &output.changes { + match (&change.disposition, change.reason.as_deref()) { + (Some(disposition), Some(reason)) => println!( + " {:?} {} [{disposition:?}: {reason}]", + change.operation, change.resource + ), + (Some(disposition), None) => println!( + " {:?} {} [{disposition:?}]", + change.operation, change.resource + ), + _ => println!(" {:?} {}", change.operation, change.resource), + } + } + if output.changes.is_empty() { + println!(" no changes"); + } + let state = &output.state_observations; + println!( + " state: revision {}, converged: {}, written: {}", + state.state_revision, output.converged, output.state_written + ); + println!(" note: applied = recorded in the cluster catalog; the server still boots from omnigraph.yaml"); + } else { + println!("cluster apply failed"); + } + print_cluster_diagnostics(&output.diagnostics); +} + fn print_cluster_status_human(output: &StatusOutput) { if output.ok { let state = &output.state_observations; @@ -935,6 +979,19 @@ fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> { Ok(()) } +fn finish_cluster_apply(output: &ApplyOutput, json: bool) -> Result<()> { + if json { + print_json(output)?; + } else { + print_cluster_apply_human(output); + } + if !output.ok { + io::stdout().flush()?; + std::process::exit(1); + } + Ok(()) +} + fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> { if json { print_json(output)?; @@ -3492,6 +3549,10 @@ async fn main() -> Result<()> { let output = plan_config_dir(config); finish_cluster_plan(&output, json)?; } + ClusterCommand::Apply { config, json } => { + let output = apply_config_dir(config); + finish_cluster_apply(&output, json)?; + } ClusterCommand::Status { config, json } => { let output = status_config_dir(config); finish_cluster_status(&output, json)?; diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 1dd26a7..9dbf250 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -754,6 +754,142 @@ fn cluster_validate_invalid_config_exits_nonzero() { assert!(stdout.contains("future_phase_field"), "{stdout}"); } +/// Seed an applyable state: schema digest borrowed from `cluster validate`, +/// graph entry present (composite recomputed by apply), queries/policies +/// pending. +fn write_cluster_applyable_state(root: &std::path::Path) -> serde_json::Value { + let validate = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("validate") + .arg("--config") + .arg(root) + .arg("--json"), + )); + let schema_digest = validate["resource_digests"]["schema.knowledge"] + .as_str() + .unwrap() + .to_string(); + let state_dir = root.join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + format!( + r#"{{ + "version": 1, + "state_revision": 1, + "applied_revision": {{ + "resources": {{ + "graph.knowledge": {{ "digest": "seed" }}, + "schema.knowledge": {{ "digest": "{schema_digest}" }} + }} + }} +}} +"# + ), + ) + .unwrap(); + validate +} + +#[test] +fn cluster_apply_json_applies_query_and_policy() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + let validate = write_cluster_applyable_state(temp.path()); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true, "{json}"); + assert_eq!(json["applied_count"], 2, "{json}"); + assert_eq!(json["converged"], true, "{json}"); + assert_eq!(json["state_written"], true, "{json}"); + assert_eq!( + json["resource_statuses"]["query.knowledge.find_person"]["status"], + "applied" + ); + + let query_digest = validate["resource_digests"]["query.knowledge.find_person"] + .as_str() + .unwrap(); + let payload = temp + .path() + .join("__cluster/resources/query/knowledge/find_person") + .join(format!("{query_digest}.gq")); + assert!(payload.exists(), "missing payload {}", payload.display()); + + let state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(), + ) + .unwrap(); + assert_eq!(state["state_revision"], 2); + assert_eq!( + state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"], + *query_digest + ); +} + +#[test] +fn cluster_apply_missing_state_exits_nonzero() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let output = output_failure( + cli() + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + ); + let json = parse_stdout_json(&output); + assert_eq!(json["ok"], false); + assert!( + json["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_missing"), + "{json}" + ); + assert!(!temp.path().join("__cluster/resources").exists()); +} + +#[test] +fn cluster_apply_locked_exits_nonzero() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + write_cluster_applyable_state(temp.path()); + write_cluster_lock(temp.path(), "held-lock", "plan"); + + let output = output_failure( + cli() + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + ); + let json = parse_stdout_json(&output); + assert_eq!(json["ok"], false); + assert!( + json["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_lock_held"), + "{json}" + ); + assert!(temp.path().join("__cluster/lock.json").exists()); + assert!(!temp.path().join("__cluster/resources").exists()); +} + #[test] fn short_version_flag_prints_current_cli_version() { let output = output_success(cli().arg("-v")); From 40a21e4e77e50192902818f2e5a93b1dd42a54fe Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Tue, 9 Jun 2026 23:36:33 +0300 Subject: [PATCH 3/6] docs(cluster): document Stage 3A config-only cluster apply Co-Authored-By: Claude Fable 5 --- docs/dev/testing.md | 2 +- docs/user/cli-reference.md | 17 ++++++---- docs/user/cluster-config.md | 67 ++++++++++++++++++++++++++++++++----- 3 files changed, 70 insertions(+), 16 deletions(-) diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 3c5ee32..1f818e9 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, and config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 70ac6f4..774ea6b 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -19,7 +19,7 @@ Top-level command families and subcommands. Graph-targeting commands accept eith | `commit list \| show` | inspect commit graph | | `schema plan \| apply \| show (alias: get)` | migrations | | `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` | -| `cluster validate \| plan \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`; `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock ` manually removes a held local state lock by exact id. No apply, graph-resource mutation, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 2C | +| `cluster validate \| plan \| apply \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` and annotates each change with its apply disposition; `apply` executes the config-only (stored-query/policy) subset into the content-addressed local catalog under `__cluster/resources/` — graph/schema changes are deferred loudly, and nothing applied serves traffic (the server still boots from `omnigraph.yaml`); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock ` manually removes a held local state lock by exact id. No graph-manifest movement, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 3A | | `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns or uncovered drift; `--json` reports `skipped`) | | `repair [--confirm] [--force]` | preview or explicitly publish uncovered manifest/head drift. `--confirm` heals verified maintenance drift and exits non-zero if suspicious/unverifiable drift is refused; `--force --confirm` publishes suspicious/unverifiable drift after operator review | | `cleanup --keep N --older-than 7d --confirm` | destructive version GC | @@ -78,6 +78,7 @@ policy: ```bash omnigraph cluster validate --config ./company-brain omnigraph cluster plan --config ./company-brain --json +omnigraph cluster apply --config ./company-brain --json omnigraph cluster status --config ./company-brain --json omnigraph cluster refresh --config ./company-brain --json omnigraph cluster import --config ./company-brain --json @@ -85,16 +86,20 @@ omnigraph cluster force-unlock --config ./company-brain --json ``` `--config` is a directory containing `cluster.yaml`; it defaults to `.`. -Stage 2C accepts graphs, schemas, stored queries, and policy bundle file +Stage 3A accepts graphs, schemas, stored queries, and policy bundle file references. `cluster plan` reads local JSON state from `/__cluster/state.json`; a missing file means empty state. Plan, -refresh, and import acquire `__cluster/lock.json` by default and release it -before returning. `cluster status` reads state only and reports any existing +apply, refresh, and import acquire `__cluster/lock.json` by default and release +it before returning. `cluster apply` executes only stored-query/policy catalog +writes (content-addressed under `__cluster/resources/`) and requires an +existing `state.json`; graph/schema changes are deferred with warnings, and +applied resources do not serve traffic — the server still boots from +`omnigraph.yaml`. `cluster status` reads state only and reports any existing lock metadata. `force-unlock` removes a lock only when the supplied id exactly matches the lock file. `refresh` requires an existing `state.json`; `import` creates one only when it is missing. Both observe declared graphs read-only at -`/graphs/.omni`. External state backends, apply, -automatic stale-lock breaking, `plan --refresh`, pipelines, UI specs, +`/graphs/.omni`. External state backends, graph/schema +apply, automatic stale-lock breaking, `plan --refresh`, pipelines, UI specs, embeddings, aliases, and bindings are reserved for later stages. See [cluster-config.md](cluster-config.md). diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 24718b1..b285cf3 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -1,19 +1,23 @@ # Cluster Config -**Status:** Stage 2C state-lock recovery preview. +**Status:** Stage 3A config-only apply preview. Cluster config is the future control-plane configuration surface for a whole OmniGraph deployment. In this stage, OmniGraph can validate a local `cluster.yaml` folder, produce a deterministic read-only plan, inspect the -local JSON state ledger, and explicitly refresh/import graph observations into -that ledger. It can also manually remove a held local state lock by exact lock -id. It does not apply desired changes, start servers, or write graph resources. +local JSON state ledger, explicitly refresh/import graph observations into +that ledger, manually remove a held local state lock by exact lock id, and +**apply the config-only subset of the plan** — stored-query and policy-bundle +catalog writes. It does not move graph manifests, change schemas, start +servers, or serve anything it applies: the server still boots from +`omnigraph.yaml`. ## Commands ```bash omnigraph cluster validate --config ./company-brain omnigraph cluster plan --config ./company-brain --json +omnigraph cluster apply --config ./company-brain --json omnigraph cluster status --config ./company-brain --json omnigraph cluster refresh --config ./company-brain --json omnigraph cluster import --config ./company-brain --json @@ -51,9 +55,9 @@ policies: `metadata.name` is a display label. `state.backend` may be omitted or set to `cluster`; external state backends are reserved for a later stage. `state.lock` -defaults to `true`. When enabled, `cluster plan`, `cluster refresh`, and -`cluster import` briefly acquire `/__cluster/lock.json`, then remove -it before returning. `cluster status` never acquires the lock; it only reports +defaults to `true`. When enabled, `cluster plan`, `cluster apply`, +`cluster refresh`, and `cluster import` briefly acquire +`/__cluster/lock.json`, then remove it before returning. `cluster status` never acquires the lock; it only reports whether one is present. `cluster force-unlock` is the only lock-removal command; it requires the exact lock id and should be run only after confirming no cluster operation is active. @@ -125,8 +129,53 @@ successful `plan` instead reports `lock_acquired: true` and an `acquired_lock_id`, then releases the lock before returning. The command never writes `state.json` and does not scan live graphs. Use explicit `cluster refresh` / `cluster import` when the state ledger should be updated -from live observations. Apply and live drift scans during plan are later-stage -work. +from live observations. Live drift scans during plan are later-stage work. + +Each plan change carries a `disposition` field — an honest preview of what +`cluster apply` will do with it in this stage: `applied` (executes), `derived` +(a `graph.` composite-digest update that converges automatically once its +query digests land), `deferred` (graph/schema change, later phase), or +`blocked` (query/policy gated by an unapplied or missing dependency, with the +condition in `reason`). + +## Apply + +`cluster apply` executes the config-only subset of the plan — stored-query and +policy-bundle changes. There is no confirm flag: `cluster plan` is the preview, +and apply recomputes the same diff under the state lock before executing, so a +stale preview can never be applied. Apply requires an existing `state.json` +(`state_missing` directs you to `cluster import` first). + +For each applied create/update, the resource payload is written +content-addressed into the local catalog: + +```text +/__cluster/resources/query///.gq +/__cluster/resources/policy//.yaml +``` + +Extensions are fixed per kind regardless of the source file's name. Payloads +are written before the state update because `state.json` is the publish point: +if the final CAS-checked state write fails, no success is reported and the +digest-named blobs already written are inert — re-running apply is the repair. +Deletes remove the resource from state; their old payload blobs stay on disk +(garbage collection is a later stage). Re-running a converged apply is a no-op: +no state write, no revision change (`state_written: false`). + +**Applied means recorded in the cluster catalog — nothing more.** The server +still boots from `omnigraph.yaml`; no query or policy applied here serves +traffic until the server-boot stage ships, as an explicit per-deployment mode +switch. + +Graph and schema changes are never executed by this stage. They are reported +as `deferred` (warning `apply_unsupported_change`), and query/policy changes +that depend on them are `blocked` (warning `apply_dependency_blocked`, status +`blocked` in state). A partially-applicable plan still exits 0 with warnings; +the JSON `converged` field is the automation signal for "state now matches the +desired revision". The applied `config_digest` is only recorded when apply +fully converges. The `graph.` composite digest is recomputed from state's +own schema/query digests after each apply, so applied query changes converge +without graph movement. ## Status From d870eaaf3ff3d083061a799f4cf3ec1023b02cbb Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Tue, 9 Jun 2026 23:44:49 +0300 Subject: [PATCH 4/6] =?UTF-8?q?test(cli):=20cluster=20lifecycle=20e2e=20?= =?UTF-8?q?=E2=80=94=20real-graph=20import/apply/refresh,=20schema-change?= =?UTF-8?q?=20loop,=20force-unlock=20retry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three composition tests over the spawned binary against a real derived graph: - import -> plan (dispositions) -> apply -> status -> refresh -> plan-empty, then a query edit round-trip. Pins that refresh and apply recompute the graph composite digest identically — divergence would silently re-open the plan forever and no single-command test would catch it. - The Stage 3A operator workflow across the control/data-plane boundary: cluster apply defers a schema change, omnigraph schema apply executes it, cluster refresh observes it, the next cluster apply re-converges. - Held lock refuses apply, force-unlock clears it, retried apply converges. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/tests/cli.rs | 183 ++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 9dbf250..30fa796 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -890,6 +890,189 @@ fn cluster_apply_locked_exits_nonzero() { assert!(!temp.path().join("__cluster/resources").exists()); } +fn cluster_json(root: &std::path::Path, command: &str) -> serde_json::Value { + parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg(command) + .arg("--config") + .arg(root) + .arg("--json"), + )) +} + +/// End-to-end lifecycle against a REAL derived graph: import observes the live +/// graph, plan/apply converge the query+policy catalog, status reports it, +/// refresh re-observes without un-converging, and a query edit round-trips. +/// This is the composition test — every step passes individually elsewhere; +/// this catches the seams (e.g. refresh and apply recomputing the graph +/// composite digest differently would silently re-open the plan forever). +#[test] +fn cluster_e2e_lifecycle_import_apply_status_refresh_converges() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + assert_eq!(import["state_observations"]["state_revision"], 1); + + let plan = cluster_json(temp.path(), "plan"); + let changes = plan["changes"].as_array().unwrap(); + assert_eq!(changes.len(), 3, "{plan}"); + let disposition_of = |resource: &str| { + changes + .iter() + .find(|change| change["resource"] == resource) + .unwrap_or_else(|| panic!("missing change for {resource}: {plan}"))["disposition"] + .clone() + }; + assert_eq!(disposition_of("graph.knowledge"), "derived"); + assert_eq!(disposition_of("query.knowledge.find_person"), "applied"); + assert_eq!(disposition_of("policy.base"), "applied"); + + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["ok"], true, "{apply}"); + assert_eq!(apply["applied_count"], 2, "{apply}"); + assert_eq!(apply["converged"], true, "{apply}"); + + let status = cluster_json(temp.path(), "status"); + assert_eq!( + status["resource_statuses"]["query.knowledge.find_person"]["status"], + "applied" + ); + assert_eq!(status["resource_statuses"]["policy.base"]["status"], "applied"); + assert!( + status["state_observations"]["applied_config_digest"].is_string(), + "converged apply must record the applied config digest: {status}" + ); + + // Refresh re-observes the live graph; it must not undo apply's work. + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + let replan = cluster_json(temp.path(), "plan"); + assert!( + replan["changes"].as_array().unwrap().is_empty(), + "refresh after a converged apply must not re-open the plan: {replan}" + ); + + // A query edit round-trips: plan update -> apply -> converged again. + fs::write( + temp.path().join("people.gq"), + r#" +query find_person($name: String) { + match { $p: Person { name: $name } } + return { $p.name } +} +"#, + ) + .unwrap(); + let apply_edit = cluster_json(temp.path(), "apply"); + assert_eq!(apply_edit["applied_count"], 1, "{apply_edit}"); + assert_eq!(apply_edit["converged"], true, "{apply_edit}"); + + let final_apply = cluster_json(temp.path(), "apply"); + assert_eq!(final_apply["state_written"], false, "{final_apply}"); + assert!(final_apply["changes"].as_array().unwrap().is_empty()); +} + +/// The operator workflow across the Stage 3A boundary: a schema change is +/// deferred by cluster apply, executed by `omnigraph schema apply` against +/// the graph, picked up by `cluster refresh`, and the next apply re-converges. +#[test] +fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["converged"], true, "{apply}"); + + // Additive schema change: cluster apply must defer it loudly, not act. + fs::write( + temp.path().join("people.pg"), + r#" +node Person { + name: String @key + age: I32? + bio: String? +} +"#, + ) + .unwrap(); + let deferred = cluster_json(temp.path(), "apply"); + assert_eq!(deferred["ok"], true, "{deferred}"); + assert_eq!(deferred["applied_count"], 0, "{deferred}"); + assert_eq!(deferred["converged"], false, "{deferred}"); + assert!( + deferred["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "apply_unsupported_change"), + "{deferred}" + ); + + // The graph-plane tool applies the migration... + output_success( + cli() + .arg("schema") + .arg("apply") + .arg(temp.path().join("graphs/knowledge.omni")) + .arg("--schema") + .arg(temp.path().join("people.pg")) + .arg("--json"), + ); + // ...refresh observes it... + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + // ...and the control plane re-converges. + let reconverge = cluster_json(temp.path(), "apply"); + assert_eq!(reconverge["ok"], true, "{reconverge}"); + assert_eq!(reconverge["converged"], true, "{reconverge}"); + let replan = cluster_json(temp.path(), "plan"); + assert!( + replan["changes"].as_array().unwrap().is_empty(), + "after schema apply + refresh + apply, the plan must be empty: {replan}" + ); +} + +/// Lock-recovery composition: a held lock refuses apply, force-unlock clears +/// it, and the retried apply converges. +#[test] +fn cluster_e2e_force_unlock_unblocks_apply() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + write_cluster_applyable_state(temp.path()); + write_cluster_lock(temp.path(), "stuck-lock", "apply"); + + let refused = parse_stdout_json(&output_failure( + cli() + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(refused["ok"], false); + + let unlocked = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("force-unlock") + .arg("stuck-lock") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(unlocked["lock_removed"], true, "{unlocked}"); + + let retried = cluster_json(temp.path(), "apply"); + assert_eq!(retried["ok"], true, "{retried}"); + assert_eq!(retried["converged"], true, "{retried}"); +} + #[test] fn short_version_flag_prints_current_cli_version() { let output = output_success(cli().arg("-v")); From 5e1dede08f20403eb6c87fd8f21a17267669d908 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 00:35:03 +0300 Subject: [PATCH 5/6] =?UTF-8?q?fix(cluster,cli):=20apply=20failure=20outpu?= =?UTF-8?q?t=20=E2=80=94=20persisted=20statuses=20only,=20changes=20list?= =?UTF-8?q?=20printed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two review findings (greptile, PR #165): - ApplyOutput.resource_statuses on a failed state write now carries the pre-apply on-disk snapshot instead of the in-memory mutations that were never persisted, so automation reading the field independently of `ok` cannot see phantom applied/blocked statuses. Regression test forces the state write to fail via a read-only __cluster dir (unix-only, skips when permissions are not enforced). - Human-mode `cluster apply` prints the classified changes list on failure too, so an operator debugging a partial apply without --json sees what was attempted. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 44 ++++++++------ crates/omnigraph-cluster/src/lib.rs | 89 ++++++++++++++++++++++++++++- 2 files changed, 113 insertions(+), 20 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 42bbed8..37db77f 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -820,34 +820,42 @@ fn print_cluster_apply_human(output: &ApplyOutput) { "cluster apply: {} applied, {} deferred/blocked", output.applied_count, output.deferred_count ); - for change in &output.changes { - match (&change.disposition, change.reason.as_deref()) { - (Some(disposition), Some(reason)) => println!( - " {:?} {} [{disposition:?}: {reason}]", - change.operation, change.resource - ), - (Some(disposition), None) => println!( - " {:?} {} [{disposition:?}]", - change.operation, change.resource - ), - _ => println!(" {:?} {}", change.operation, change.resource), - } - } - if output.changes.is_empty() { - println!(" no changes"); - } + } else { + println!("cluster apply failed"); + } + // The change list prints on failure too: an operator debugging a partial + // apply (payload or state-write error) needs to see what was attempted. + print_cluster_apply_changes(&output.changes); + if output.ok { let state = &output.state_observations; println!( " state: revision {}, converged: {}, written: {}", state.state_revision, output.converged, output.state_written ); println!(" note: applied = recorded in the cluster catalog; the server still boots from omnigraph.yaml"); - } else { - println!("cluster apply failed"); } print_cluster_diagnostics(&output.diagnostics); } +fn print_cluster_apply_changes(changes: &[omnigraph_cluster::PlanChange]) { + for change in changes { + match (&change.disposition, change.reason.as_deref()) { + (Some(disposition), Some(reason)) => println!( + " {:?} {} [{disposition:?}: {reason}]", + change.operation, change.resource + ), + (Some(disposition), None) => println!( + " {:?} {} [{disposition:?}]", + change.operation, change.resource + ), + _ => println!(" {:?} {}", change.operation, change.resource), + } + } + if changes.is_empty() { + println!(" no changes"); + } +} + fn print_cluster_status_human(output: &StatusOutput) { if output.ok { let state = &output.state_observations; diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 01ad171..3673194 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -276,6 +276,8 @@ pub struct ApplyOutput { pub converged: bool, /// False for a no-op re-apply: state bytes (and revision) were left untouched. pub state_written: bool, + /// The statuses as persisted: post-apply on success, the pre-apply on-disk + /// snapshot when the state write fails (never unpersisted in-memory state). pub resource_statuses: BTreeMap, pub diagnostics: Vec, } @@ -819,13 +821,26 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { let after_value = serde_json::to_value(&new_state).expect("cluster state must serialize deterministically"); let mut state_written = false; + let mut state_write_failed = false; if after_value != before_value { new_state.state_revision = new_state.state_revision.saturating_add(1); match backend.write_state(&new_state, expected_cas.as_deref(), &mut observations) { Ok(()) => state_written = true, - Err(diagnostic) => diagnostics.push(diagnostic), + Err(diagnostic) => { + diagnostics.push(diagnostic); + state_write_failed = true; + } } } + // On a failed state write, report the statuses that are actually on disk + // (the pre-apply snapshot), not the in-memory mutations that were never + // persisted — automation reading `resource_statuses` independently of `ok` + // must not see phantom status updates. + let resource_statuses = if state_write_failed { + state.resource_statuses + } else { + new_state.resource_statuses + }; let applied_count = changes .iter() @@ -853,7 +868,7 @@ pub fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { deferred_count, converged, state_written, - resource_statuses: new_state.resource_statuses, + resource_statuses, diagnostics, } } @@ -4232,6 +4247,76 @@ graphs: assert!(!dir.path().join(CLUSTER_STATE_DIR).exists()); } + /// When the state write fails after payloads landed, the output must + /// report the statuses actually on disk — not the unpersisted in-memory + /// mutations (phantom `applied` entries would mislead automation that + /// reads `resource_statuses` independently of `ok`). + #[cfg(unix)] + #[test] + fn apply_state_write_failure_reports_persisted_statuses() { + use std::os::unix::fs::PermissionsExt; + + let dir = fixture(); + // lock: false so the only write into __cluster/ is state.json itself. + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +state: + backend: cluster + lock: false +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +"#, + ) + .unwrap(); + write_applyable_state(dir.path()); + // Pre-create the payload blob so the payload phase is a no-op and the + // failure lands exactly at the state write. + let desired = validate_config_dir(dir.path()); + let query_digest = desired + .resource_digests + .get("query.knowledge.find_person") + .unwrap(); + let blob = query_payload_path(dir.path(), query_digest); + fs::create_dir_all(blob.parent().unwrap()).unwrap(); + fs::write(&blob, QUERY).unwrap(); + + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o555)).unwrap(); + // Running as root ignores permission bits; skip rather than flake. + if fs::write(state_dir.join("probe"), b"x").is_ok() { + let _ = fs::remove_file(state_dir.join("probe")); + fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o755)).unwrap(); + eprintln!("skipping: permissions are not enforced (running as root)"); + return; + } + + let out = apply_config_dir(dir.path()); + fs::set_permissions(&state_dir, fs::Permissions::from_mode(0o755)).unwrap(); + + assert!(!out.ok); + assert!(!out.state_written); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_write_error"), + "{:?}", + out.diagnostics + ); + // The seeded state has no statuses; the failed apply must not invent + // the in-memory `applied` ones it failed to persist. + assert!( + out.resource_statuses.is_empty(), + "unpersisted statuses leaked into output: {:?}", + out.resource_statuses + ); + } + #[test] fn plan_annotates_apply_dispositions() { let dir = fixture(); From 7f3ecf282a2e3f11f94477b49ab786a0b294c180 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 00:45:44 +0300 Subject: [PATCH 6/6] Merge origin/main (#164 axiom-15 docs, #86 TableStorage migration) into feat/cluster-apply-stage3a Clean auto-merge; also fix the stale 'Stage 2C accepts' line in cluster-config.md to Stage 3A. Co-Authored-By: Claude Fable 5 --- docs/user/cluster-config.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 8146646..912f307 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -42,7 +42,7 @@ of the two files. ## Supported `cluster.yaml` -Stage 2C accepts only the read-only resource subset: +Stage 3A accepts only this resource subset: ```yaml version: 1