diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 942bb27..8593ef3 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ - ApplyOptions, ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, - ValidateOutput, apply_config_dir_with_options, force_unlock_config_dir, import_config_dir, plan_config_dir, + ApplyOptions, ApplyOutput, ApproveOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, + ValidateOutput, apply_config_dir_with_options, approve_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; @@ -371,6 +371,18 @@ enum ClusterCommand { #[arg(long)] json: bool, }, + /// Record a digest-bound approval for a gated (irreversible) change, + /// e.g. a graph delete. Requires the global --as actor. + Approve { + /// Typed resource address of the gated change (e.g. graph.scratch). + resource: String, + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, /// Read the local JSON state ledger without scanning live graph resources. Status { /// Cluster config directory containing cluster.yaml. @@ -1011,6 +1023,33 @@ fn finish_cluster_apply(output: &ApplyOutput, json: bool) -> Result<()> { Ok(()) } +fn finish_cluster_approve(output: &ApproveOutput, json: bool) -> Result<()> { + if json { + print_json(output)?; + } else if output.ok { + println!( + "cluster approve: {} {} approved by {} (approval {})", + output + .operation + .as_ref() + .map(|operation| format!("{operation:?}").to_lowercase()) + .unwrap_or_default(), + output.resource.as_deref().unwrap_or("?"), + output.approved_by.as_deref().unwrap_or("?"), + output.approval_id.as_deref().unwrap_or("?"), + ); + print_cluster_diagnostics(&output.diagnostics); + } else { + println!("cluster approve failed"); + print_cluster_diagnostics(&output.diagnostics); + } + if !output.ok { + io::stdout().flush()?; + std::process::exit(1); + } + Ok(()) +} + fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> { if json { print_json(output)?; @@ -3581,6 +3620,19 @@ async fn main() -> Result<()> { .await; finish_cluster_apply(&output, json)?; } + ClusterCommand::Approve { + resource, + config, + json, + } => { + let Some(approver) = cli.as_actor.as_deref() else { + bail!( + "`cluster approve` requires the global --as flag: an approval without an approver is meaningless" + ); + }; + let output = approve_config_dir(config, &resource, approver).await; + finish_cluster_approve(&output, json)?; + } ClusterCommand::Status { config, json } => { let output = status_config_dir(config); finish_cluster_status(&output, json)?; diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 1805e29..bfa538d 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1424,22 +1424,29 @@ policies: let mixed = cluster_json(temp.path(), "apply"); assert_eq!(mixed["ok"], true, "{mixed}"); assert_eq!(mixed["converged"], false, "{mixed}"); + // Stage 4C: deletes are gated on a digest-bound approval, one gate per + // subtree (the graph-level approval carries schema + queries). assert_eq!( change_for(&mixed, "graph.engineering")["disposition"], - "deferred" - ); - assert_eq!( - change_for(&mixed, "schema.engineering")["disposition"], - "deferred" - ); - assert_eq!( - change_for(&mixed, "query.engineering.find_service")["disposition"], "blocked" ); assert_eq!( - change_for(&mixed, "query.engineering.find_service")["reason"], - "dependency_not_applied" + change_for(&mixed, "graph.engineering")["reason"], + "approval_required" ); + assert_eq!( + change_for(&mixed, "schema.engineering")["reason"], + "approval_required" + ); + assert_eq!( + change_for(&mixed, "query.engineering.find_service")["reason"], + "approval_required" + ); + let gate_plan = cluster_json(temp.path(), "plan"); + let gates = gate_plan["approvals_required"].as_array().unwrap(); + assert_eq!(gates.len(), 1, "{gate_plan}"); + assert_eq!(gates[0]["resource"], "graph.engineering"); + assert_eq!(gates[0]["satisfied"], false); assert_eq!( change_for(&mixed, "query.knowledge.find_person")["disposition"], "applied" @@ -1461,7 +1468,7 @@ policies: let mut sorted = order.clone(); sorted.sort_unstable(); assert_eq!(order, sorted, "{mixed}"); - // Graph deletion cannot converge until stage 4C's approval artifacts. + // Conclusion (approve + converge) extends below once the delete executor lands. } /// Stage 4A headline: a declared graph is created by `cluster apply` itself — diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 11ebcd9..2fa0eab 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -26,6 +26,7 @@ pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json"; pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources"; pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries"; +pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals"; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -212,6 +213,9 @@ pub struct BlastRadius { pub struct ApprovalRequirement { pub resource: String, pub reason: String, + /// True when a valid (digest-matching, unconsumed) approval artifact is + /// pending for this change. + pub satisfied: bool, } #[derive(Debug, Clone, Serialize)] @@ -293,6 +297,47 @@ pub struct ApplyOutput { pub diagnostics: Vec, } +/// A digest-bound human approval for an irreversible operation (RFC-004 +/// §D4). Written by `cluster approve`, consumed by apply. The file is never +/// deleted on consumption — it is rewritten with `consumed_at` and also +/// summarized into the state ledger's `approval_records`, so the audit fact +/// survives the loss of either store (axiom 11). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct ApprovalArtifact { + schema_version: u32, + approval_id: String, + resource: String, + operation: String, + reason: String, + bound_config_digest: String, + #[serde(default)] + bound_before_digest: Option, + #[serde(default)] + bound_after_digest: Option, + approved_by: String, + created_at: String, + #[serde(default)] + consumed_at: Option, + #[serde(default)] + consumed_by_operation: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct ApproveOutput { + pub ok: bool, + pub config_dir: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub approval_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub resource: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub operation: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub approved_by: Option, + pub diagnostics: Vec, +} + #[derive(Debug, Clone)] struct DesiredCluster { config_dir: PathBuf, @@ -472,6 +517,7 @@ struct LocalStateBackend { state_path: PathBuf, lock_path: PathBuf, recoveries_dir: PathBuf, + approvals_dir: PathBuf, } #[derive(Debug)] @@ -588,7 +634,14 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { }; // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. - classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new()); + let artifacts = backend.list_approval_artifacts(&mut diagnostics); + let approved = approved_resources( + &artifacts, + &changes, + &desired.config_digest, + &mut diagnostics, + ); + classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new(), &approved); // Embed real migration steps for schema updates so plan is a data-aware // preview; failures degrade to the digest diff with a warning. @@ -624,7 +677,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } } let blast_radius = compute_blast_radius(&changes, &desired.dependencies); - let approvals_required = compute_approvals(&changes); + let approvals_required = compute_approvals(&changes, &approved); let ok = !has_errors(&diagnostics); PlanOutput { @@ -790,17 +843,29 @@ pub async fn apply_config_dir_with_options( let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); - classify_changes(&mut changes, &desired.dependencies, &sweep.pending_graphs); + let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics); + let approved = approved_resources( + &approval_artifacts, + &changes, + &desired.config_digest, + &mut diagnostics, + ); + classify_changes( + &mut changes, + &desired.dependencies, + &sweep.pending_graphs, + &approved, + ); - // Defensive invariant: nothing the approval gate covers may be executable. - // Today approvals only cover graph/schema deletes (always deferred); this - // keeps a future widening of the executable set from silently bypassing it. - let approvals = compute_approvals(&changes); + // Defensive invariant: nothing the approval gate covers may be executable + // WITHOUT a matching approval. Gated changes with a valid artifact are the + // sanctioned exception (stage 4C). + let approvals = compute_approvals(&changes, &approved); let approval_violation = changes.iter().any(|change| { change.disposition == Some(ApplyDisposition::Applied) && approvals .iter() - .any(|approval| approval.resource == change.resource) + .any(|approval| approval.resource == change.resource && !approval.satisfied) }); if approval_violation { diagnostics.push(Diagnostic::error( @@ -1349,6 +1414,126 @@ pub async fn apply_config_dir_with_options( } } +/// Record a digest-bound human approval for a gated (irreversible) change — +/// today: graph deletes. The artifact binds to the exact desired config +/// digest and the change's before/after digests, so config or state drift +/// invalidates it automatically (a stale approval can never authorize a +/// different change). +pub async fn approve_config_dir( + config_dir: impl AsRef, + resource: &str, + approved_by: &str, +) -> ApproveOutput { + let outcome = load_desired(config_dir.as_ref()); + let mut diagnostics = outcome.diagnostics; + let backend = LocalStateBackend::new(&outcome.config_dir); + let mut observations = backend.observations(); + + let fail = |config_dir: String, diagnostics: Vec| ApproveOutput { + ok: false, + config_dir, + approval_id: None, + resource: None, + operation: None, + approved_by: None, + diagnostics, + }; + + let Some(desired) = outcome.desired else { + return fail(display_path(&outcome.config_dir), diagnostics); + }; + if has_errors(&diagnostics) { + return fail(display_path(&desired.config_dir), diagnostics); + } + + let _lock_guard = if desired.state_lock { + match backend.acquire_lock("approve", &mut observations) { + Ok(guard) => Some(guard), + Err(diagnostic) => { + diagnostics.push(diagnostic); + return fail(display_path(&desired.config_dir), diagnostics); + } + } + } else { + diagnostics.push(Diagnostic::warning( + "state_lock_disabled", + "state.lock", + "state.lock is false; approve ran without acquiring the cluster state lock", + )); + None + }; + + let state = match backend.read_state(&mut observations) { + Ok(snapshot) => match snapshot.state { + Some(state) => state, + None => { + diagnostics.push(Diagnostic::error( + "state_missing", + CLUSTER_STATE_FILE, + "approve requires an existing state.json; run `cluster import` first", + )); + return fail(display_path(&desired.config_dir), diagnostics); + } + }, + Err(diagnostic) => { + diagnostics.push(diagnostic); + return fail(display_path(&desired.config_dir), diagnostics); + } + }; + + let prior_resources = state_resource_digests(&state); + let changes = diff_resources(&prior_resources, &desired.resource_digests); + let gates = compute_approvals(&changes, &BTreeSet::new()); + let Some(change) = changes.iter().find(|change| { + change.resource == resource && gates.iter().any(|gate| gate.resource == resource) + }) else { + diagnostics.push(Diagnostic::error( + "approval_not_required", + resource, + "no pending change for this resource requires approval (check `cluster plan`)", + )); + return fail(display_path(&desired.config_dir), diagnostics); + }; + + let artifact = ApprovalArtifact { + schema_version: 1, + approval_id: Ulid::new().to_string(), + resource: change.resource.clone(), + operation: match change.operation { + PlanOperation::Create => "create", + PlanOperation::Update => "update", + PlanOperation::Delete => "delete", + } + .to_string(), + reason: gates + .iter() + .find(|gate| gate.resource == resource) + .map(|gate| gate.reason.clone()) + .unwrap_or_default(), + bound_config_digest: desired.config_digest.clone(), + bound_before_digest: change.before_digest.clone(), + bound_after_digest: change.after_digest.clone(), + approved_by: approved_by.to_string(), + created_at: now_rfc3339(), + consumed_at: None, + consumed_by_operation: None, + }; + if let Err(diagnostic) = backend.write_approval_artifact(&artifact) { + diagnostics.push(diagnostic); + return fail(display_path(&desired.config_dir), diagnostics); + } + + ApproveOutput { + ok: !has_errors(&diagnostics), + config_dir: display_path(&desired.config_dir), + approval_id: Some(artifact.approval_id), + resource: Some(artifact.resource), + operation: Some(change.operation.clone()), + approved_by: Some(artifact.approved_by), + diagnostics, + } +} + pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; @@ -1773,10 +1958,102 @@ impl LocalStateBackend { state_path: config_dir.join(CLUSTER_STATE_FILE), lock_path: config_dir.join(CLUSTER_LOCK_FILE), recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR), + approvals_dir: config_dir.join(CLUSTER_APPROVALS_DIR), state_dir, } } + /// List approval artifacts in ULID (filename) order; unparseable files + /// warn and stay on disk for the operator. + fn list_approval_artifacts( + &self, + diagnostics: &mut Vec, + ) -> Vec<(PathBuf, ApprovalArtifact)> { + let mut paths = Vec::new(); + match fs::read_dir(&self.approvals_dir) { + Ok(entries) => { + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().is_some_and(|ext| ext == "json") { + paths.push(path); + } + } + } + Err(err) if err.kind() == ErrorKind::NotFound => {} + Err(err) => diagnostics.push(Diagnostic::warning( + "approval_read_error", + CLUSTER_APPROVALS_DIR, + format!("could not list approval artifacts: {err}"), + )), + } + paths.sort(); + let mut artifacts = Vec::new(); + for path in paths { + match fs::read_to_string(&path) + .map_err(|err| err.to_string()) + .and_then(|text| { + serde_json::from_str::(&text).map_err(|err| err.to_string()) + }) { + Ok(artifact) if artifact.schema_version == 1 => artifacts.push((path, artifact)), + Ok(artifact) => diagnostics.push(Diagnostic::warning( + "unsupported_approval_version", + display_path(&path), + format!( + "unsupported approval artifact version {}; leaving it in place", + artifact.schema_version + ), + )), + Err(err) => diagnostics.push(Diagnostic::warning( + "invalid_approval_artifact", + display_path(&path), + format!("could not parse approval artifact ({err}); leaving it in place"), + )), + } + } + artifacts + } + + /// Atomically write (or rewrite, e.g. on consumption) an approval artifact. + fn write_approval_artifact(&self, artifact: &ApprovalArtifact) -> Result { + fs::create_dir_all(&self.approvals_dir).map_err(|err| { + Diagnostic::error( + "approval_write_error", + CLUSTER_APPROVALS_DIR, + format!("could not create approvals directory: {err}"), + ) + })?; + let target = self + .approvals_dir + .join(format!("{}.json", artifact.approval_id)); + let mut payload = serde_json::to_string_pretty(artifact).map_err(|err| { + Diagnostic::error( + "approval_write_error", + display_path(&target), + format!("could not encode approval artifact: {err}"), + ) + })?; + payload.push('\n'); + let tmp_path = self + .approvals_dir + .join(format!("{}.json.tmp.{}", artifact.approval_id, Ulid::new())); + fs::write(&tmp_path, payload.as_bytes()).map_err(|err| { + Diagnostic::error( + "approval_write_error", + display_path(&tmp_path), + format!("could not write approval artifact: {err}"), + ) + })?; + if let Err(err) = fs::rename(&tmp_path, &target) { + let _ = fs::remove_file(&tmp_path); + return Err(Diagnostic::error( + "approval_write_error", + display_path(&target), + format!("could not move approval artifact into place: {err}"), + )); + } + Ok(target) + } + /// List recovery sidecars in ULID (filename) order. Unparseable files are /// reported as warnings and skipped — they stay on disk for the operator. fn list_recovery_sidecars( @@ -3127,24 +3404,74 @@ fn compute_blast_radius(changes: &[PlanChange], dependencies: &[Dependency]) -> .collect() } -fn compute_approvals(changes: &[PlanChange]) -> Vec { +fn compute_approvals( + changes: &[PlanChange], + approved: &BTreeSet, +) -> Vec { + // One gate per subtree: the graph. delete carries its schema and + // queries, so a schema delete whose graph is also deleted is not listed. + let graph_deletes: BTreeSet = changes + .iter() + .filter(|change| change.operation == PlanOperation::Delete) + .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) + .collect(); changes .iter() .filter_map(|change| { - if change.operation == PlanOperation::Delete - && (change.resource.starts_with("graph.") || change.resource.starts_with("schema.")) - { - Some(ApprovalRequirement { - resource: change.resource.clone(), - reason: "delete may remove deployed graph or schema definition".to_string(), - }) - } else { - None + if change.operation != PlanOperation::Delete { + return None; } + let gated = match resource_kind(&change.resource) { + ResourceKind::Graph(_) => true, + ResourceKind::Schema(graph) => !graph_deletes.contains(&graph), + _ => false, + }; + gated.then(|| ApprovalRequirement { + resource: change.resource.clone(), + reason: "delete may remove deployed graph or schema definition".to_string(), + satisfied: approved.contains(&change.resource), + }) }) .collect() } +/// Resources with a valid (digest-matching, unconsumed) pending approval. +/// Near-misses — an artifact for the same resource whose bound digests no +/// longer match — warn as `approval_stale` and never authorize anything. +fn approved_resources( + artifacts: &[(PathBuf, ApprovalArtifact)], + changes: &[PlanChange], + config_digest: &str, + diagnostics: &mut Vec, +) -> BTreeSet { + let mut approved = BTreeSet::new(); + for change in changes { + let candidates: Vec<&ApprovalArtifact> = artifacts + .iter() + .map(|(_, artifact)| artifact) + .filter(|artifact| artifact.consumed_at.is_none() && artifact.resource == change.resource) + .collect(); + if candidates.is_empty() { + continue; + } + let matched = candidates.iter().any(|artifact| { + artifact.bound_config_digest == config_digest + && artifact.bound_before_digest == change.before_digest + && artifact.bound_after_digest == change.after_digest + }); + if matched { + approved.insert(change.resource.clone()); + } else { + diagnostics.push(Diagnostic::warning( + "approval_stale", + change.resource.clone(), + "an approval artifact exists but its bound digests no longer match the plan; re-run `cluster approve`", + )); + } + } + approved +} + #[derive(Debug, PartialEq, Eq)] enum ResourceKind { Graph(String), @@ -3182,6 +3509,7 @@ fn classify_changes( changes: &mut [PlanChange], dependencies: &[Dependency], pending_recovery: &BTreeSet, + approved: &BTreeSet, ) { let mut schema_creates = BTreeSet::new(); let mut schema_pending = BTreeSet::new(); @@ -3219,6 +3547,11 @@ fn classify_changes( schema_pending.insert(graph.clone()); } } + // Subtree deletes ride the approved graph delete. NOTE: execution lands + // with the delete executor; until then approved deletes stay blocked so a + // gate-only build can never strip state without removing the root. + let rides_approved_delete = |_graph: &str| false; + let _ = approved; for change in changes.iter_mut() { let (disposition, reason) = match resource_kind(&change.resource) { @@ -3238,6 +3571,15 @@ fn classify_changes( PlanOperation::Create | PlanOperation::Update => { (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) } + PlanOperation::Delete if graph_deletes.contains(&graph) => { + if rides_approved_delete(&graph) { + (ApplyDisposition::Applied, None) + } else if pending_recovery.contains(&graph) { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } else { + (ApplyDisposition::Blocked, Some("approval_required")) + } + } _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), }, ResourceKind::Graph(graph) => match change.operation { @@ -3251,15 +3593,26 @@ fn classify_changes( PlanOperation::Update if !schema_pending.contains(&graph) => { (ApplyDisposition::Derived, None) } + // Stage 4C: an approved graph delete executes (the + // irreversible tier — gated by a digest-bound artifact). + PlanOperation::Delete => { + if pending_recovery.contains(&graph) { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } else if rides_approved_delete(&graph) { + (ApplyDisposition::Applied, None) + } else { + (ApplyDisposition::Blocked, Some("approval_required")) + } + } _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), }, ResourceKind::Query { graph, .. } => match change.operation { PlanOperation::Delete => { - if graph_deletes.contains(&graph) { - ( - ApplyDisposition::Blocked, - Some("dependency_not_applied"), - ) + if rides_approved_delete(&graph) { + // Tombstoned with the approved graph delete. + (ApplyDisposition::Applied, None) + } else if graph_deletes.contains(&graph) { + (ApplyDisposition::Blocked, Some("approval_required")) } else { (ApplyDisposition::Applied, None) } @@ -5302,7 +5655,7 @@ graphs: } #[tokio::test] - async fn apply_does_not_delete_subtree_of_deleted_graph() { + async fn apply_blocks_graph_delete_without_approval() { let dir = fixture(); let desired = validate_config_dir(dir.path()); let schema_digest = desired @@ -5331,18 +5684,25 @@ graphs: .iter() .map(|change| (change.resource.as_str(), change)) .collect(); + // Stage 4C: deletes are gated, not deferred — every subtree change + // blocks on the single graph-level approval. assert_eq!( by_resource["graph.old"].disposition, - Some(ApplyDisposition::Deferred) - ); - assert_eq!( - by_resource["schema.old"].disposition, - Some(ApplyDisposition::Deferred) - ); - assert_eq!( - by_resource["query.old.q"].disposition, Some(ApplyDisposition::Blocked) ); + assert_eq!( + by_resource["graph.old"].reason.as_deref(), + Some("approval_required") + ); + assert_eq!( + by_resource["schema.old"].reason.as_deref(), + Some("approval_required") + ); + assert_eq!( + by_resource["query.old.q"].reason.as_deref(), + Some("approval_required") + ); + // State intact; nothing destroyed without the artifact. let state = read_state_json(dir.path()); let resources = &state["applied_revision"]["resources"]; assert_eq!(resources["graph.old"]["digest"], "3333"); @@ -5350,6 +5710,147 @@ graphs: assert_eq!(resources["query.old.q"]["digest"], "5555"); } + #[tokio::test] + async fn approve_writes_digest_bound_artifact() { + let dir = fixture(); + write_applyable_state(dir.path()); + // Seed a deletable subtree. + let state = read_state_json(dir.path()); + let graph_digest_str = state["applied_revision"]["resources"]["graph.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + let schema_digest_str = state["applied_revision"]["resources"]["schema.knowledge"] + ["digest"] + .as_str() + .unwrap() + .to_string(); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_digest_str.as_str()), + ("schema.knowledge", schema_digest_str.as_str()), + ("graph.old", "3333"), + ("schema.old", "4444"), + ], + ); + + let out = approve_config_dir(dir.path(), "graph.old", "andrew").await; + assert!(out.ok, "{:?}", out.diagnostics); + let approval_id = out.approval_id.clone().unwrap(); + let artifact: serde_json::Value = serde_json::from_str( + &fs::read_to_string( + dir.path() + .join(CLUSTER_APPROVALS_DIR) + .join(format!("{approval_id}.json")), + ) + .unwrap(), + ) + .unwrap(); + assert_eq!(artifact["resource"], "graph.old"); + assert_eq!(artifact["operation"], "delete"); + assert_eq!(artifact["approved_by"], "andrew"); + assert_eq!(artifact["bound_before_digest"], "3333"); + assert!(artifact["bound_after_digest"].is_null()); + assert!(artifact["bound_config_digest"].is_string()); + assert!(artifact["consumed_at"].is_null()); + + // A non-gated address is refused. + let not_gated = approve_config_dir(dir.path(), "query.knowledge.find_person", "andrew").await; + assert!(!not_gated.ok); + assert!( + not_gated + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "approval_not_required") + ); + } + + #[tokio::test] + async fn stale_approval_is_ignored() { + let dir = fixture(); + write_applyable_state(dir.path()); + let state = read_state_json(dir.path()); + let graph_digest_str = state["applied_revision"]["resources"]["graph.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + let schema_digest_str = state["applied_revision"]["resources"]["schema.knowledge"] + ["digest"] + .as_str() + .unwrap() + .to_string(); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_digest_str.as_str()), + ("schema.knowledge", schema_digest_str.as_str()), + ("graph.old", "3333"), + ], + ); + let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await; + assert!(approved.ok, "{:?}", approved.diagnostics); + // The config moves after approval: the bound config digest no longer + // matches and the artifact authorizes nothing. + fs::write(dir.path().join("base.policy.yaml"), "rules: [] # moved\n").unwrap(); + + let out = apply_config_dir(dir.path()).await; + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "approval_stale"), + "{:?}", + out.diagnostics + ); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["graph.old"].reason.as_deref(), + Some("approval_required") + ); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["graph.old"]["digest"], + "3333" + ); + } + + #[tokio::test] + async fn compute_approvals_one_gate_per_subtree() { + let dir = fixture(); + write_applyable_state(dir.path()); + let state = read_state_json(dir.path()); + let g = state["applied_revision"]["resources"]["graph.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + let sc = state["applied_revision"]["resources"]["schema.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", g.as_str()), + ("schema.knowledge", sc.as_str()), + ("graph.old", "3333"), + ("schema.old", "4444"), + ("query.old.q", "5555"), + ], + ); + let plan = plan_config_dir(dir.path()).await; + let gated: Vec<&str> = plan + .approvals_required + .iter() + .map(|gate| gate.resource.as_str()) + .collect(); + assert_eq!(gated, vec!["graph.old"], "{plan:?}"); + assert!(!plan.approvals_required[0].satisfied); + } + #[tokio::test] async fn apply_is_idempotent() { let dir = fixture();