diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 942bb27..8593ef3 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ - ApplyOptions, ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, - ValidateOutput, apply_config_dir_with_options, force_unlock_config_dir, import_config_dir, plan_config_dir, + ApplyOptions, ApplyOutput, ApproveOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, + ValidateOutput, apply_config_dir_with_options, approve_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; @@ -371,6 +371,18 @@ enum ClusterCommand { #[arg(long)] json: bool, }, + /// Record a digest-bound approval for a gated (irreversible) change, + /// e.g. a graph delete. Requires the global --as actor. + Approve { + /// Typed resource address of the gated change (e.g. graph.scratch). + resource: String, + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, /// Read the local JSON state ledger without scanning live graph resources. Status { /// Cluster config directory containing cluster.yaml. @@ -1011,6 +1023,33 @@ fn finish_cluster_apply(output: &ApplyOutput, json: bool) -> Result<()> { Ok(()) } +fn finish_cluster_approve(output: &ApproveOutput, json: bool) -> Result<()> { + if json { + print_json(output)?; + } else if output.ok { + println!( + "cluster approve: {} {} approved by {} (approval {})", + output + .operation + .as_ref() + .map(|operation| format!("{operation:?}").to_lowercase()) + .unwrap_or_default(), + output.resource.as_deref().unwrap_or("?"), + output.approved_by.as_deref().unwrap_or("?"), + output.approval_id.as_deref().unwrap_or("?"), + ); + print_cluster_diagnostics(&output.diagnostics); + } else { + println!("cluster approve failed"); + print_cluster_diagnostics(&output.diagnostics); + } + if !output.ok { + io::stdout().flush()?; + std::process::exit(1); + } + Ok(()) +} + fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> { if json { print_json(output)?; @@ -3581,6 +3620,19 @@ async fn main() -> Result<()> { .await; finish_cluster_apply(&output, json)?; } + ClusterCommand::Approve { + resource, + config, + json, + } => { + let Some(approver) = cli.as_actor.as_deref() else { + bail!( + "`cluster approve` requires the global --as flag: an approval without an approver is meaningless" + ); + }; + let output = approve_config_dir(config, &resource, approver).await; + finish_cluster_approve(&output, json)?; + } ClusterCommand::Status { config, json } => { let output = status_config_dir(config); finish_cluster_status(&output, json)?; diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 1805e29..336f19e 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1358,7 +1358,7 @@ fn cluster_e2e_graph_root_destruction_drifts_then_apply_recreates_empty_graph() /// (applied), its composite (derived) — shows all four dispositions at once /// before the graph-plane schema apply closes the loop. #[test] -fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { +fn cluster_e2e_multi_graph_mixed_dispositions_then_approve_and_converge() { let temp = tempdir().unwrap(); write_multi_graph_cluster_fixture(temp.path()); // No manual init: Stage 4A creates both graphs. @@ -1424,22 +1424,29 @@ policies: let mixed = cluster_json(temp.path(), "apply"); assert_eq!(mixed["ok"], true, "{mixed}"); assert_eq!(mixed["converged"], false, "{mixed}"); + // Stage 4C: deletes are gated on a digest-bound approval, one gate per + // subtree (the graph-level approval carries schema + queries). assert_eq!( change_for(&mixed, "graph.engineering")["disposition"], - "deferred" - ); - assert_eq!( - change_for(&mixed, "schema.engineering")["disposition"], - "deferred" - ); - assert_eq!( - change_for(&mixed, "query.engineering.find_service")["disposition"], "blocked" ); assert_eq!( - change_for(&mixed, "query.engineering.find_service")["reason"], - "dependency_not_applied" + change_for(&mixed, "graph.engineering")["reason"], + "approval_required" ); + assert_eq!( + change_for(&mixed, "schema.engineering")["reason"], + "approval_required" + ); + assert_eq!( + change_for(&mixed, "query.engineering.find_service")["reason"], + "approval_required" + ); + let gate_plan = cluster_json(temp.path(), "plan"); + let gates = gate_plan["approvals_required"].as_array().unwrap(); + assert_eq!(gates.len(), 1, "{gate_plan}"); + assert_eq!(gates[0]["resource"], "graph.engineering"); + assert_eq!(gates[0]["satisfied"], false); assert_eq!( change_for(&mixed, "query.knowledge.find_person")["disposition"], "applied" @@ -1461,7 +1468,55 @@ policies: let mut sorted = order.clone(); sorted.sort_unstable(); assert_eq!(order, sorted, "{mixed}"); - // Graph deletion cannot converge until stage 4C's approval artifacts. + // The conclusion: an apply without approval stays blocked; the approved + // delete converges the cluster, tombstoning the removed graph. + let still_blocked = cluster_json(temp.path(), "apply"); + assert_eq!(still_blocked["converged"], false, "{still_blocked}"); + + let approve = parse_stdout_json(&output_success( + cli() + .arg("--as") + .arg("andrew") + .arg("cluster") + .arg("approve") + .arg("graph.engineering") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(approve["ok"], true, "{approve}"); + assert_eq!(approve["approved_by"], "andrew"); + + let converge = cluster_json(temp.path(), "apply"); + assert_eq!(converge["ok"], true, "{converge}"); + assert_eq!(converge["converged"], true, "{converge}"); + assert!(!temp.path().join("graphs/engineering.omni").exists()); + + let status = cluster_json(temp.path(), "status"); + assert_eq!(status["observations"]["graph.engineering"]["kind"], "tombstone"); + let final_plan = cluster_json(temp.path(), "plan"); + assert!( + final_plan["changes"].as_array().unwrap().is_empty(), + "{final_plan}" + ); +} + +/// An approval without an approver is meaningless: approve requires --as. +#[test] +fn cluster_e2e_approve_requires_actor() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let output = output_failure( + cli() + .arg("cluster") + .arg("approve") + .arg("graph.knowledge") + .arg("--config") + .arg(temp.path()), + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("--as"), "{stderr}"); } /// Stage 4A headline: a declared graph is created by `cluster apply` itself — diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 11ebcd9..f67d8f7 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -26,6 +26,7 @@ pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json"; pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources"; pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries"; +pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals"; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -212,6 +213,9 @@ pub struct BlastRadius { pub struct ApprovalRequirement { pub resource: String, pub reason: String, + /// True when a valid (digest-matching, unconsumed) approval artifact is + /// pending for this change. + pub satisfied: bool, } #[derive(Debug, Clone, Serialize)] @@ -293,6 +297,47 @@ pub struct ApplyOutput { pub diagnostics: Vec, } +/// A digest-bound human approval for an irreversible operation (RFC-004 +/// §D4). Written by `cluster approve`, consumed by apply. The file is never +/// deleted on consumption — it is rewritten with `consumed_at` and also +/// summarized into the state ledger's `approval_records`, so the audit fact +/// survives the loss of either store (axiom 11). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct ApprovalArtifact { + schema_version: u32, + approval_id: String, + resource: String, + operation: String, + reason: String, + bound_config_digest: String, + #[serde(default)] + bound_before_digest: Option, + #[serde(default)] + bound_after_digest: Option, + approved_by: String, + created_at: String, + #[serde(default)] + consumed_at: Option, + #[serde(default)] + consumed_by_operation: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct ApproveOutput { + pub ok: bool, + pub config_dir: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub approval_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub resource: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub operation: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub approved_by: Option, + pub diagnostics: Vec, +} + #[derive(Debug, Clone)] struct DesiredCluster { config_dir: PathBuf, @@ -446,6 +491,10 @@ struct RecoverySidecar { desired_schema_digest: String, #[serde(default)] state_cas_base: Option, + /// For graph_delete: the approval this operation consumes; lets a sweep + /// roll-forward consume it too. + #[serde(default)] + approval_id: Option, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] @@ -453,7 +502,7 @@ struct RecoverySidecar { enum RecoverySidecarKind { GraphCreate, SchemaApply, - // GraphDelete arrives with stage 4C. + GraphDelete, } #[derive(Debug, Default)] @@ -464,6 +513,9 @@ struct SweepOutcome { /// Sidecars whose outcome is recorded (rows 2/4): deleted only after the /// command's state write lands, so a CAS failure re-sweeps them. completed_sidecars: Vec, + /// Approval artifacts consumed by a roll-forward (delete row 7b): their + /// files are rewritten with consumed_at only after the state write lands. + consumed_approvals: Vec, } #[derive(Debug)] @@ -472,6 +524,7 @@ struct LocalStateBackend { state_path: PathBuf, lock_path: PathBuf, recoveries_dir: PathBuf, + approvals_dir: PathBuf, } #[derive(Debug)] @@ -588,7 +641,14 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { }; // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. - classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new()); + let artifacts = backend.list_approval_artifacts(&mut diagnostics); + let approved = approved_resources( + &artifacts, + &changes, + &desired.config_digest, + &mut diagnostics, + ); + classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new(), &approved); // Embed real migration steps for schema updates so plan is a data-aware // preview; failures degrade to the digest diff with a warning. @@ -624,7 +684,7 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } } let blast_radius = compute_blast_radius(&changes, &desired.dependencies); - let approvals_required = compute_approvals(&changes); + let approvals_required = compute_approvals(&changes, &approved); let ok = !has_errors(&diagnostics); PlanOutput { @@ -790,17 +850,29 @@ pub async fn apply_config_dir_with_options( let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); - classify_changes(&mut changes, &desired.dependencies, &sweep.pending_graphs); + let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics); + let approved = approved_resources( + &approval_artifacts, + &changes, + &desired.config_digest, + &mut diagnostics, + ); + classify_changes( + &mut changes, + &desired.dependencies, + &sweep.pending_graphs, + &approved, + ); - // Defensive invariant: nothing the approval gate covers may be executable. - // Today approvals only cover graph/schema deletes (always deferred); this - // keeps a future widening of the executable set from silently bypassing it. - let approvals = compute_approvals(&changes); + // Defensive invariant: nothing the approval gate covers may be executable + // WITHOUT a matching approval. Gated changes with a valid artifact are the + // sanctioned exception (stage 4C). + let approvals = compute_approvals(&changes, &approved); let approval_violation = changes.iter().any(|change| { change.disposition == Some(ApplyDisposition::Applied) && approvals .iter() - .any(|approval| approval.resource == change.resource) + .any(|approval| approval.resource == change.resource && !approval.satisfied) }); if approval_violation { diagnostics.push(Diagnostic::error( @@ -877,6 +949,7 @@ pub async fn apply_config_dir_with_options( expected_manifest_version: None, desired_schema_digest: desired_graph.schema_digest.clone(), state_cas_base: expected_cas.clone(), + approval_id: None, }; let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { Ok(path) => path, @@ -1040,6 +1113,7 @@ pub async fn apply_config_dir_with_options( expected_manifest_version: None, desired_schema_digest: desired_graph.schema_digest.clone(), state_cas_base: expected_cas.clone(), + approval_id: None, }; let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { Ok(path) => path, @@ -1225,6 +1299,121 @@ pub async fn apply_config_dir_with_options( ); } + // Approved graph deletes execute LAST (RFC-004 §D5): catalog writes for + // surviving resources land first, then the irreversible work. + let graph_deletes_to_run: Vec = changes + .iter() + .filter(|change| { + change.disposition == Some(ApplyDisposition::Applied) + && change.operation == PlanOperation::Delete + && matches!(resource_kind(&change.resource), ResourceKind::Graph(_)) + }) + .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) + .collect(); + let mut executed_deletes: Vec<(String, Option)> = Vec::new(); // (graph_id, approval_id) + let mut consumed_approval_ids: Vec = Vec::new(); + for graph_id in &graph_deletes_to_run { + if graph_moving_aborted { + diagnostics.push(Diagnostic::warning( + "graph_delete_skipped", + graph_address(graph_id), + "skipped after an earlier graph-moving operation failed in this run", + )); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); + continue; + } + let graph_addr = graph_address(graph_id); + // Re-locate the consumable approval (classification verified one exists). + let approval_id = approval_artifacts + .iter() + .map(|(_, artifact)| artifact) + .find(|artifact| { + artifact.consumed_at.is_none() + && artifact.resource == graph_addr + && artifact.bound_config_digest == desired.config_digest + }) + .map(|artifact| artifact.approval_id.clone()); + let graph_uri = display_path( + &desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ); + let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await { + Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await { + Ok(snapshot) => Some(snapshot.version()), + Err(_) => None, + }, + Err(_) => None, // partial/unopenable roots still get deleted + }; + let sidecar = RecoverySidecar { + schema_version: 1, + operation_id: Ulid::new().to_string(), + started_at: now_rfc3339(), + actor: options.actor.clone(), + kind: RecoverySidecarKind::GraphDelete, + graph_id: graph_id.clone(), + graph_uri: graph_uri.clone(), + observed_manifest_version, + expected_manifest_version: None, // no post-op manifest exists + desired_schema_digest: String::new(), + state_cas_base: expected_cas.clone(), + approval_id: approval_id.clone(), + }; + let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + Ok(path) => path, + Err(diagnostic) => { + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); + graph_moving_aborted = true; + continue; + } + }; + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_delete") { + // Simulated crash before removal: row 8 retires the intent and + // the still-valid approval lets a later run retry. + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); + graph_moving_aborted = true; + continue; + } + match fs::remove_dir_all(PathBuf::from(&graph_uri)) { + Ok(()) => {} + Err(err) if err.kind() == ErrorKind::NotFound => {} // already gone + Err(err) => { + diagnostics.push(Diagnostic::error( + "graph_delete_failed", + graph_addr.clone(), + format!("could not remove graph root '{graph_uri}': {err}"), + )); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); + graph_moving_aborted = true; + continue; + } + } + // Crash point: the root is gone, the ledger does not record it yet. + // The sweep rolls forward (row 7b) and consumes the approval. + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_delete") { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + executed_deletes.push((graph_id.clone(), approval_id.clone())); + if let Some(approval_id) = approval_id { + consumed_approval_ids.push(approval_id); + } + completed_op_sidecars.push(sidecar_path); + } + if !failed_graphs.is_empty() { + demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies); + } + // State mutation. Apply owns query/policy statuses only; graph/schema // statuses belong to refresh/import observation and must not be clobbered // (the sweep above is the one exception: it owns recovery statuses). @@ -1265,6 +1454,17 @@ pub async fn apply_config_dir_with_options( _ => {} } } + for (graph_id, approval_id) in &executed_deletes { + tombstone_graph_subtree( + &mut new_state, + graph_id, + approval_id.as_deref(), + options.actor.as_deref(), + ); + if let Some(approval_id) = approval_id { + record_approval_consumed(&mut new_state, approval_id, "apply"); + } + } recompute_state_graph_digests(&mut new_state, &desired); let residual = diff_resources( @@ -1306,6 +1506,9 @@ pub async fn apply_config_dir_with_options( { let _ = fs::remove_file(sidecar_path); } + let mut all_consumed = sweep.consumed_approvals.clone(); + all_consumed.extend(consumed_approval_ids.iter().cloned()); + mark_approvals_consumed(&backend, &all_consumed); } // On a failed state write, report the statuses that are actually on disk // (the pre-apply snapshot), not the in-memory mutations that were never @@ -1349,6 +1552,126 @@ pub async fn apply_config_dir_with_options( } } +/// Record a digest-bound human approval for a gated (irreversible) change — +/// today: graph deletes. The artifact binds to the exact desired config +/// digest and the change's before/after digests, so config or state drift +/// invalidates it automatically (a stale approval can never authorize a +/// different change). +pub async fn approve_config_dir( + config_dir: impl AsRef, + resource: &str, + approved_by: &str, +) -> ApproveOutput { + let outcome = load_desired(config_dir.as_ref()); + let mut diagnostics = outcome.diagnostics; + let backend = LocalStateBackend::new(&outcome.config_dir); + let mut observations = backend.observations(); + + let fail = |config_dir: String, diagnostics: Vec| ApproveOutput { + ok: false, + config_dir, + approval_id: None, + resource: None, + operation: None, + approved_by: None, + diagnostics, + }; + + let Some(desired) = outcome.desired else { + return fail(display_path(&outcome.config_dir), diagnostics); + }; + if has_errors(&diagnostics) { + return fail(display_path(&desired.config_dir), diagnostics); + } + + let _lock_guard = if desired.state_lock { + match backend.acquire_lock("approve", &mut observations) { + Ok(guard) => Some(guard), + Err(diagnostic) => { + diagnostics.push(diagnostic); + return fail(display_path(&desired.config_dir), diagnostics); + } + } + } else { + diagnostics.push(Diagnostic::warning( + "state_lock_disabled", + "state.lock", + "state.lock is false; approve ran without acquiring the cluster state lock", + )); + None + }; + + let state = match backend.read_state(&mut observations) { + Ok(snapshot) => match snapshot.state { + Some(state) => state, + None => { + diagnostics.push(Diagnostic::error( + "state_missing", + CLUSTER_STATE_FILE, + "approve requires an existing state.json; run `cluster import` first", + )); + return fail(display_path(&desired.config_dir), diagnostics); + } + }, + Err(diagnostic) => { + diagnostics.push(diagnostic); + return fail(display_path(&desired.config_dir), diagnostics); + } + }; + + let prior_resources = state_resource_digests(&state); + let changes = diff_resources(&prior_resources, &desired.resource_digests); + let gates = compute_approvals(&changes, &BTreeSet::new()); + let Some(change) = changes.iter().find(|change| { + change.resource == resource && gates.iter().any(|gate| gate.resource == resource) + }) else { + diagnostics.push(Diagnostic::error( + "approval_not_required", + resource, + "no pending change for this resource requires approval (check `cluster plan`)", + )); + return fail(display_path(&desired.config_dir), diagnostics); + }; + + let artifact = ApprovalArtifact { + schema_version: 1, + approval_id: Ulid::new().to_string(), + resource: change.resource.clone(), + operation: match change.operation { + PlanOperation::Create => "create", + PlanOperation::Update => "update", + PlanOperation::Delete => "delete", + } + .to_string(), + reason: gates + .iter() + .find(|gate| gate.resource == resource) + .map(|gate| gate.reason.clone()) + .unwrap_or_default(), + bound_config_digest: desired.config_digest.clone(), + bound_before_digest: change.before_digest.clone(), + bound_after_digest: change.after_digest.clone(), + approved_by: approved_by.to_string(), + created_at: now_rfc3339(), + consumed_at: None, + consumed_by_operation: None, + }; + if let Err(diagnostic) = backend.write_approval_artifact(&artifact) { + diagnostics.push(diagnostic); + return fail(display_path(&desired.config_dir), diagnostics); + } + + ApproveOutput { + ok: !has_errors(&diagnostics), + config_dir: display_path(&desired.config_dir), + approval_id: Some(artifact.approval_id), + resource: Some(artifact.resource), + operation: Some(change.operation.clone()), + approved_by: Some(artifact.approved_by), + diagnostics, + } +} + pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let parsed = parse_cluster_config(config_dir.as_ref()); let mut diagnostics = parsed.diagnostics; @@ -1642,6 +1965,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St for sidecar_path in &sweep.completed_sidecars { let _ = fs::remove_file(sidecar_path); } + mark_approvals_consumed(&backend, &sweep.consumed_approvals); } Err(diagnostic) => diagnostics.push(diagnostic), } @@ -1773,10 +2097,102 @@ impl LocalStateBackend { state_path: config_dir.join(CLUSTER_STATE_FILE), lock_path: config_dir.join(CLUSTER_LOCK_FILE), recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR), + approvals_dir: config_dir.join(CLUSTER_APPROVALS_DIR), state_dir, } } + /// List approval artifacts in ULID (filename) order; unparseable files + /// warn and stay on disk for the operator. + fn list_approval_artifacts( + &self, + diagnostics: &mut Vec, + ) -> Vec<(PathBuf, ApprovalArtifact)> { + let mut paths = Vec::new(); + match fs::read_dir(&self.approvals_dir) { + Ok(entries) => { + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().is_some_and(|ext| ext == "json") { + paths.push(path); + } + } + } + Err(err) if err.kind() == ErrorKind::NotFound => {} + Err(err) => diagnostics.push(Diagnostic::warning( + "approval_read_error", + CLUSTER_APPROVALS_DIR, + format!("could not list approval artifacts: {err}"), + )), + } + paths.sort(); + let mut artifacts = Vec::new(); + for path in paths { + match fs::read_to_string(&path) + .map_err(|err| err.to_string()) + .and_then(|text| { + serde_json::from_str::(&text).map_err(|err| err.to_string()) + }) { + Ok(artifact) if artifact.schema_version == 1 => artifacts.push((path, artifact)), + Ok(artifact) => diagnostics.push(Diagnostic::warning( + "unsupported_approval_version", + display_path(&path), + format!( + "unsupported approval artifact version {}; leaving it in place", + artifact.schema_version + ), + )), + Err(err) => diagnostics.push(Diagnostic::warning( + "invalid_approval_artifact", + display_path(&path), + format!("could not parse approval artifact ({err}); leaving it in place"), + )), + } + } + artifacts + } + + /// Atomically write (or rewrite, e.g. on consumption) an approval artifact. + fn write_approval_artifact(&self, artifact: &ApprovalArtifact) -> Result { + fs::create_dir_all(&self.approvals_dir).map_err(|err| { + Diagnostic::error( + "approval_write_error", + CLUSTER_APPROVALS_DIR, + format!("could not create approvals directory: {err}"), + ) + })?; + let target = self + .approvals_dir + .join(format!("{}.json", artifact.approval_id)); + let mut payload = serde_json::to_string_pretty(artifact).map_err(|err| { + Diagnostic::error( + "approval_write_error", + display_path(&target), + format!("could not encode approval artifact: {err}"), + ) + })?; + payload.push('\n'); + let tmp_path = self + .approvals_dir + .join(format!("{}.json.tmp.{}", artifact.approval_id, Ulid::new())); + fs::write(&tmp_path, payload.as_bytes()).map_err(|err| { + Diagnostic::error( + "approval_write_error", + display_path(&tmp_path), + format!("could not write approval artifact: {err}"), + ) + })?; + if let Err(err) = fs::rename(&tmp_path, &target) { + let _ = fs::remove_file(&tmp_path); + return Err(Diagnostic::error( + "approval_write_error", + display_path(&target), + format!("could not move approval artifact into place: {err}"), + )); + } + Ok(target) + } + /// List recovery sidecars in ULID (filename) order. Unparseable files are /// reported as warnings and skipped — they stay on disk for the operator. fn list_recovery_sidecars( @@ -2281,6 +2697,9 @@ async fn sweep_recovery_sidecars( RecoverySidecarKind::SchemaApply => { sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; } + RecoverySidecarKind::GraphDelete => { + sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome); + } } } outcome @@ -2501,6 +2920,121 @@ async fn sweep_schema_apply_sidecar( } } +fn sweep_graph_delete_sidecar( + path: PathBuf, + sidecar: RecoverySidecar, + state: &mut ClusterState, + diagnostics: &mut Vec, + outcome: &mut SweepOutcome, +) { + let graph_address = graph_address(&sidecar.graph_id); + let root = PathBuf::from(&sidecar.graph_uri); + + if root.exists() { + // Row 8: the delete never completed. Prefix removal is idempotent and + // works on partial roots, so the repair is simply the re-proposed, + // still-approved delete on a later run — retire the stale intent. + diagnostics.push(Diagnostic::warning( + "graph_delete_incomplete", + graph_address, + "a previous graph delete did not complete; it will be re-proposed by plan and can be retried under its approval", + )); + outcome.completed_sidecars.push(path); + return; + } + + if !state.applied_revision.resources.contains_key(&graph_address) { + // Row 7: already tombstoned (or never recorded); crash fell between + // the state CAS and sidecar delete. + outcome.completed_sidecars.push(path); + return; + } + + // Row 7b: the root is gone, the ledger is stale — roll forward the + // tombstone, consume the approval the sidecar carries, audit. + tombstone_graph_subtree(state, &sidecar.graph_id, sidecar.approval_id.as_deref(), sidecar.actor.as_deref()); + state.recovery_records.insert( + sidecar.operation_id.clone(), + json!({ + "kind": "graph_delete", + "graph_id": sidecar.graph_id, + "outcome": "rolled_forward", + "recovered_at": now_rfc3339(), + "actor": sidecar.actor, + }), + ); + if let Some(approval_id) = &sidecar.approval_id { + record_approval_consumed(state, approval_id, &sidecar.operation_id); + outcome.consumed_approvals.push(approval_id.clone()); + } + diagnostics.push(Diagnostic::warning( + "cluster_recovery_rolled_forward", + graph_address, + "an interrupted graph delete had completed on disk; cluster state was rolled forward to match", + )); + outcome.completed_sidecars.push(path); +} + +/// Remove a graph's subtree (graph, schema, queries) from the ledger and +/// leave a tombstone observation. Idempotent. +fn tombstone_graph_subtree( + state: &mut ClusterState, + graph_id: &str, + approval_id: Option<&str>, + actor: Option<&str>, +) { + let graph_addr = graph_address(graph_id); + let schema_addr = schema_address(graph_id); + let query_prefix = format!("query.{graph_id}."); + state.applied_revision.resources.remove(&graph_addr); + state.applied_revision.resources.remove(&schema_addr); + state + .applied_revision + .resources + .retain(|address, _| !address.starts_with(&query_prefix)); + state.resource_statuses.remove(&graph_addr); + state.resource_statuses.remove(&schema_addr); + state + .resource_statuses + .retain(|address, _| !address.starts_with(&query_prefix)); + state.observations.insert( + graph_addr, + json!({ + "kind": "tombstone", + "deleted_at": now_rfc3339(), + "approval_id": approval_id, + "actor": actor, + }), + ); +} + +/// Record approval consumption in the state ledger. The artifact FILE is +/// rewritten with consumed_at only after the state write lands, so a failed +/// CAS leaves the approval valid for the retry. +fn record_approval_consumed(state: &mut ClusterState, approval_id: &str, operation_id: &str) { + state.approval_records.insert( + approval_id.to_string(), + json!({ + "consumed_at": now_rfc3339(), + "consumed_by_operation": operation_id, + }), + ); +} + +/// Mark approval artifact files consumed on disk (post-CAS). +fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) { + if approval_ids.is_empty() { + return; + } + let mut sink = Vec::new(); + for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) { + if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() { + artifact.consumed_at = Some(now_rfc3339()); + let _ = backend.write_approval_artifact(&artifact); + } + } +} + /// Read-only commands report pending sidecars without acting on them. fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec) { let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR); @@ -3127,24 +3661,74 @@ fn compute_blast_radius(changes: &[PlanChange], dependencies: &[Dependency]) -> .collect() } -fn compute_approvals(changes: &[PlanChange]) -> Vec { +fn compute_approvals( + changes: &[PlanChange], + approved: &BTreeSet, +) -> Vec { + // One gate per subtree: the graph. delete carries its schema and + // queries, so a schema delete whose graph is also deleted is not listed. + let graph_deletes: BTreeSet = changes + .iter() + .filter(|change| change.operation == PlanOperation::Delete) + .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) + .collect(); changes .iter() .filter_map(|change| { - if change.operation == PlanOperation::Delete - && (change.resource.starts_with("graph.") || change.resource.starts_with("schema.")) - { - Some(ApprovalRequirement { - resource: change.resource.clone(), - reason: "delete may remove deployed graph or schema definition".to_string(), - }) - } else { - None + if change.operation != PlanOperation::Delete { + return None; } + let gated = match resource_kind(&change.resource) { + ResourceKind::Graph(_) => true, + ResourceKind::Schema(graph) => !graph_deletes.contains(&graph), + _ => false, + }; + gated.then(|| ApprovalRequirement { + resource: change.resource.clone(), + reason: "delete may remove deployed graph or schema definition".to_string(), + satisfied: approved.contains(&change.resource), + }) }) .collect() } +/// Resources with a valid (digest-matching, unconsumed) pending approval. +/// Near-misses — an artifact for the same resource whose bound digests no +/// longer match — warn as `approval_stale` and never authorize anything. +fn approved_resources( + artifacts: &[(PathBuf, ApprovalArtifact)], + changes: &[PlanChange], + config_digest: &str, + diagnostics: &mut Vec, +) -> BTreeSet { + let mut approved = BTreeSet::new(); + for change in changes { + let candidates: Vec<&ApprovalArtifact> = artifacts + .iter() + .map(|(_, artifact)| artifact) + .filter(|artifact| artifact.consumed_at.is_none() && artifact.resource == change.resource) + .collect(); + if candidates.is_empty() { + continue; + } + let matched = candidates.iter().any(|artifact| { + artifact.bound_config_digest == config_digest + && artifact.bound_before_digest == change.before_digest + && artifact.bound_after_digest == change.after_digest + }); + if matched { + approved.insert(change.resource.clone()); + } else { + diagnostics.push(Diagnostic::warning( + "approval_stale", + change.resource.clone(), + "an approval artifact exists but its bound digests no longer match the plan; re-run `cluster approve`", + )); + } + } + approved +} + #[derive(Debug, PartialEq, Eq)] enum ResourceKind { Graph(String), @@ -3182,6 +3766,7 @@ fn classify_changes( changes: &mut [PlanChange], dependencies: &[Dependency], pending_recovery: &BTreeSet, + approved: &BTreeSet, ) { let mut schema_creates = BTreeSet::new(); let mut schema_pending = BTreeSet::new(); @@ -3219,6 +3804,12 @@ fn classify_changes( schema_pending.insert(graph.clone()); } } + // Subtree deletes ride the approved graph delete. + let rides_approved_delete = |graph: &str| { + graph_deletes.contains(graph) + && approved.contains(&graph_address(graph)) + && !pending_recovery.contains(graph) + }; for change in changes.iter_mut() { let (disposition, reason) = match resource_kind(&change.resource) { @@ -3238,6 +3829,15 @@ fn classify_changes( PlanOperation::Create | PlanOperation::Update => { (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) } + PlanOperation::Delete if graph_deletes.contains(&graph) => { + if rides_approved_delete(&graph) { + (ApplyDisposition::Applied, None) + } else if pending_recovery.contains(&graph) { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } else { + (ApplyDisposition::Blocked, Some("approval_required")) + } + } _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), }, ResourceKind::Graph(graph) => match change.operation { @@ -3251,15 +3851,26 @@ fn classify_changes( PlanOperation::Update if !schema_pending.contains(&graph) => { (ApplyDisposition::Derived, None) } + // Stage 4C: an approved graph delete executes (the + // irreversible tier — gated by a digest-bound artifact). + PlanOperation::Delete => { + if pending_recovery.contains(&graph) { + (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) + } else if rides_approved_delete(&graph) { + (ApplyDisposition::Applied, None) + } else { + (ApplyDisposition::Blocked, Some("approval_required")) + } + } _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), }, ResourceKind::Query { graph, .. } => match change.operation { PlanOperation::Delete => { - if graph_deletes.contains(&graph) { - ( - ApplyDisposition::Blocked, - Some("dependency_not_applied"), - ) + if rides_approved_delete(&graph) { + // Tombstoned with the approved graph delete. + (ApplyDisposition::Applied, None) + } else if graph_deletes.contains(&graph) { + (ApplyDisposition::Blocked, Some("approval_required")) } else { (ApplyDisposition::Applied, None) } @@ -3309,6 +3920,7 @@ fn classify_changes( enum FailedGraphOrigin { GraphCreate, SchemaApply, + GraphDelete, } /// After a graph-moving operation fails mid-run, every change that depended @@ -3328,12 +3940,15 @@ fn demote_dependents_of_failed_graphs( let demote_reason = match resource_kind(&change.resource) { ResourceKind::Graph(graph) => match failed.get(&graph) { Some(FailedGraphOrigin::GraphCreate) => Some("graph_create_failed"), + Some(FailedGraphOrigin::GraphDelete) => Some("graph_delete_failed"), Some(FailedGraphOrigin::SchemaApply) => Some("dependency_not_applied"), None => None, }, ResourceKind::Schema(graph) => match failed.get(&graph) { Some(FailedGraphOrigin::SchemaApply) => Some("schema_apply_failed"), - Some(FailedGraphOrigin::GraphCreate) => Some("dependency_not_applied"), + Some(FailedGraphOrigin::GraphCreate) | Some(FailedGraphOrigin::GraphDelete) => { + Some("dependency_not_applied") + } None => None, }, ResourceKind::Query { graph, .. } if failed.contains_key(&graph) => { @@ -5302,7 +5917,7 @@ graphs: } #[tokio::test] - async fn apply_does_not_delete_subtree_of_deleted_graph() { + async fn apply_blocks_graph_delete_without_approval() { let dir = fixture(); let desired = validate_config_dir(dir.path()); let schema_digest = desired @@ -5331,18 +5946,25 @@ graphs: .iter() .map(|change| (change.resource.as_str(), change)) .collect(); + // Stage 4C: deletes are gated, not deferred — every subtree change + // blocks on the single graph-level approval. assert_eq!( by_resource["graph.old"].disposition, - Some(ApplyDisposition::Deferred) - ); - assert_eq!( - by_resource["schema.old"].disposition, - Some(ApplyDisposition::Deferred) - ); - assert_eq!( - by_resource["query.old.q"].disposition, Some(ApplyDisposition::Blocked) ); + assert_eq!( + by_resource["graph.old"].reason.as_deref(), + Some("approval_required") + ); + assert_eq!( + by_resource["schema.old"].reason.as_deref(), + Some("approval_required") + ); + assert_eq!( + by_resource["query.old.q"].reason.as_deref(), + Some("approval_required") + ); + // State intact; nothing destroyed without the artifact. let state = read_state_json(dir.path()); let resources = &state["applied_revision"]["resources"]; assert_eq!(resources["graph.old"]["digest"], "3333"); @@ -5350,6 +5972,147 @@ graphs: assert_eq!(resources["query.old.q"]["digest"], "5555"); } + #[tokio::test] + async fn approve_writes_digest_bound_artifact() { + let dir = fixture(); + write_applyable_state(dir.path()); + // Seed a deletable subtree. + let state = read_state_json(dir.path()); + let graph_digest_str = state["applied_revision"]["resources"]["graph.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + let schema_digest_str = state["applied_revision"]["resources"]["schema.knowledge"] + ["digest"] + .as_str() + .unwrap() + .to_string(); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_digest_str.as_str()), + ("schema.knowledge", schema_digest_str.as_str()), + ("graph.old", "3333"), + ("schema.old", "4444"), + ], + ); + + let out = approve_config_dir(dir.path(), "graph.old", "andrew").await; + assert!(out.ok, "{:?}", out.diagnostics); + let approval_id = out.approval_id.clone().unwrap(); + let artifact: serde_json::Value = serde_json::from_str( + &fs::read_to_string( + dir.path() + .join(CLUSTER_APPROVALS_DIR) + .join(format!("{approval_id}.json")), + ) + .unwrap(), + ) + .unwrap(); + assert_eq!(artifact["resource"], "graph.old"); + assert_eq!(artifact["operation"], "delete"); + assert_eq!(artifact["approved_by"], "andrew"); + assert_eq!(artifact["bound_before_digest"], "3333"); + assert!(artifact["bound_after_digest"].is_null()); + assert!(artifact["bound_config_digest"].is_string()); + assert!(artifact["consumed_at"].is_null()); + + // A non-gated address is refused. + let not_gated = approve_config_dir(dir.path(), "query.knowledge.find_person", "andrew").await; + assert!(!not_gated.ok); + assert!( + not_gated + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "approval_not_required") + ); + } + + #[tokio::test] + async fn stale_approval_is_ignored() { + let dir = fixture(); + write_applyable_state(dir.path()); + let state = read_state_json(dir.path()); + let graph_digest_str = state["applied_revision"]["resources"]["graph.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + let schema_digest_str = state["applied_revision"]["resources"]["schema.knowledge"] + ["digest"] + .as_str() + .unwrap() + .to_string(); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", graph_digest_str.as_str()), + ("schema.knowledge", schema_digest_str.as_str()), + ("graph.old", "3333"), + ], + ); + let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await; + assert!(approved.ok, "{:?}", approved.diagnostics); + // The config moves after approval: the bound config digest no longer + // matches and the artifact authorizes nothing. + fs::write(dir.path().join("base.policy.yaml"), "rules: [] # moved\n").unwrap(); + + let out = apply_config_dir(dir.path()).await; + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "approval_stale"), + "{:?}", + out.diagnostics + ); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["graph.old"].reason.as_deref(), + Some("approval_required") + ); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["graph.old"]["digest"], + "3333" + ); + } + + #[tokio::test] + async fn compute_approvals_one_gate_per_subtree() { + let dir = fixture(); + write_applyable_state(dir.path()); + let state = read_state_json(dir.path()); + let g = state["applied_revision"]["resources"]["graph.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + let sc = state["applied_revision"]["resources"]["schema.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + write_state_resources( + dir.path(), + &[ + ("graph.knowledge", g.as_str()), + ("schema.knowledge", sc.as_str()), + ("graph.old", "3333"), + ("schema.old", "4444"), + ("query.old.q", "5555"), + ], + ); + let plan = plan_config_dir(dir.path()).await; + let gated: Vec<&str> = plan + .approvals_required + .iter() + .map(|gate| gate.resource.as_str()) + .collect(); + assert_eq!(gated, vec!["graph.old"], "{plan:?}"); + assert!(!plan.approvals_required[0].satisfied); + } + #[tokio::test] async fn apply_is_idempotent() { let dir = fixture(); @@ -6105,6 +6868,200 @@ graphs: assert!(sidecar.exists()); } + /// Seed: converged knowledge subtree + a stale `old` graph subtree with a + /// real directory on disk. + fn seed_deletable_state(config_dir: &Path) { + write_applyable_state(config_dir); + let state = read_state_json(config_dir); + let g = state["applied_revision"]["resources"]["graph.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + let sc = state["applied_revision"]["resources"]["schema.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + write_state_resources( + config_dir, + &[ + ("graph.knowledge", g.as_str()), + ("schema.knowledge", sc.as_str()), + ("graph.old", "3333"), + ("schema.old", "4444"), + ("query.old.q", "5555"), + ], + ); + let root = config_dir.join(CLUSTER_GRAPHS_DIR).join("old.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "stale").unwrap(); + } + + #[tokio::test] + async fn apply_executes_approved_graph_delete() { + let dir = fixture(); + seed_deletable_state(dir.path()); + let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await; + assert!(approved.ok, "{:?}", approved.diagnostics); + let approval_id = approved.approval_id.clone().unwrap(); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.converged, "{out:?}"); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!(by_resource["graph.old"].disposition, Some(ApplyDisposition::Applied)); + assert_eq!(by_resource["schema.old"].disposition, Some(ApplyDisposition::Applied)); + assert_eq!(by_resource["query.old.q"].disposition, Some(ApplyDisposition::Applied)); + // The root is gone; the subtree is tombstoned out of the ledger. + assert!(!dir.path().join(CLUSTER_GRAPHS_DIR).join("old.omni").exists()); + let state = read_state_json(dir.path()); + let resources = state["applied_revision"]["resources"].as_object().unwrap(); + assert!(!resources.contains_key("graph.old")); + assert!(!resources.contains_key("schema.old")); + assert!(!resources.contains_key("query.old.q")); + assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone"); + assert_eq!(state["observations"]["graph.old"]["approval_id"], approval_id); + // Approval consumed in BOTH stores: ledger summary + artifact file. + assert!(state["approval_records"][&approval_id]["consumed_at"].is_string()); + let artifact: serde_json::Value = serde_json::from_str( + &fs::read_to_string( + dir.path() + .join(CLUSTER_APPROVALS_DIR) + .join(format!("{approval_id}.json")), + ) + .unwrap(), + ) + .unwrap(); + assert!(artifact["consumed_at"].is_string(), "{artifact}"); + // Sidecar retired. + assert!( + fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR)) + .map(|mut entries| entries.next().is_none()) + .unwrap_or(true) + ); + // A consumed approval authorizes nothing further (idempotent re-apply). + let again = apply_config_dir(dir.path()).await; + assert!(again.ok && again.converged && !again.state_written, "{again:?}"); + } + + fn write_delete_sidecar( + config_dir: &Path, + graph_id: &str, + approval_id: Option<&str>, + operation_id: &str, + ) -> PathBuf { + let dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join(format!("{operation_id}.json")); + fs::write( + &path, + serde_json::to_string_pretty(&json!({ + "schema_version": 1, + "operation_id": operation_id, + "started_at": "1970-01-01T00:00:00Z", + "kind": "graph_delete", + "graph_id": graph_id, + "graph_uri": derived_graph_uri(config_dir, graph_id), + "desired_schema_digest": "", + "approval_id": approval_id, + })) + .unwrap(), + ) + .unwrap(); + path + } + + #[tokio::test] + async fn sweep_retires_delete_sidecar_when_tombstoned() { + let dir = fixture(); + write_applyable_state(dir.path()); // no graph.old in state, no root + let sidecar = write_delete_sidecar(dir.path(), "old", None, "01DROW7"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert!( + state["recovery_records"] + .as_object() + .is_none_or(|records| records.is_empty()) + ); + } + + #[tokio::test] + async fn sweep_rolls_forward_completed_delete() { + let dir = fixture(); + seed_deletable_state(dir.path()); + // Approve, then simulate: root removed, state stale, sidecar present. + let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await; + let approval_id = approved.approval_id.unwrap(); + fs::remove_dir_all(dir.path().join(CLUSTER_GRAPHS_DIR).join("old.omni")).unwrap(); + let sidecar = write_delete_sidecar(dir.path(), "old", Some(&approval_id), "01DROW7B"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert!( + !state["applied_revision"]["resources"] + .as_object() + .unwrap() + .contains_key("graph.old") + ); + assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone"); + assert!(state["approval_records"][&approval_id]["consumed_at"].is_string()); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["kind"] == "graph_delete" + && record["outcome"] == "rolled_forward") + ); + // The artifact file is marked consumed post-CAS. + let artifact: serde_json::Value = serde_json::from_str( + &fs::read_to_string( + dir.path() + .join(CLUSTER_APPROVALS_DIR) + .join(format!("{approval_id}.json")), + ) + .unwrap(), + ) + .unwrap(); + assert!(artifact["consumed_at"].is_string()); + assert!(out.converged, "{out:?}"); + } + + #[tokio::test] + async fn sweep_reproposes_incomplete_delete() { + let dir = fixture(); + seed_deletable_state(dir.path()); // root present + let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await; + assert!(approved.ok); + let sidecar = write_delete_sidecar(dir.path(), "old", approved.approval_id.as_deref(), "01DROW8"); + + // Row 8: the stale intent is retired with a warning, and the same run + // re-executes the still-approved delete to completion. + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "graph_delete_incomplete") + ); + assert!(!sidecar.exists()); + assert!(!dir.path().join(CLUSTER_GRAPHS_DIR).join("old.omni").exists()); + assert!(out.converged, "{out:?}"); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture(); diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index cc91b85..5cdf2d4 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -16,7 +16,8 @@ use fail::FailScenario; use omnigraph_cluster::failpoints::ScopedFailPoint; use omnigraph::db::Omnigraph; use omnigraph_cluster::{ - ApplyOptions, apply_config_dir, apply_config_dir_with_options, validate_config_dir, + ApplyOptions, apply_config_dir, apply_config_dir_with_options, approve_config_dir, + validate_config_dir, }; use tempfile::tempdir; @@ -467,3 +468,126 @@ async fn schema_crash_after_apply_rolls_state_forward() { ); scenario.teardown(); } + +/// Seed: converged state + a stale `old` graph subtree with a real root and +/// a valid approval for its delete. Returns the approval id. +async fn seed_approved_delete(dir: &Path) -> String { + let digests = seed_applyable_state(dir); + let graph_digest = digests["graph.knowledge"].clone(); + let schema_digest = digests["schema.knowledge"].clone(); + let state_dir = dir.join("__cluster"); + fs::write( + state_dir.join("state.json"), + format!( + r#"{{ + "version": 1, + "state_revision": 1, + "applied_revision": {{ + "resources": {{ + "graph.knowledge": {{ "digest": "{graph_digest}" }}, + "schema.knowledge": {{ "digest": "{schema_digest}" }}, + "graph.old": {{ "digest": "3333" }}, + "schema.old": {{ "digest": "4444" }} + }} + }} +}} +"# + ), + ) + .unwrap(); + let root = dir.join("graphs/old.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "stale").unwrap(); + let approved = approve_config_dir(dir, "graph.old", "test-actor").await; + assert!(approved.ok, "{:?}", approved.diagnostics); + approved.approval_id.unwrap() +} + +/// Crash before the removal: root intact, approval unconsumed, no ack; the +/// next run retires the stale intent (row 8) and the still-approved delete +/// completes in the same run. +#[tokio::test] +async fn delete_crash_before_removal_reproposes() { + let scenario = FailScenario::setup(); + let dir = fixture(); + let approval_id = seed_approved_delete(dir.path()).await; + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_delete", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(dir.path().join("graphs/old.omni").exists()); + assert_eq!(recovery_sidecars(dir.path()).len(), 1); + // The approval is untouched (file unconsumed). + let artifact: serde_json::Value = serde_json::from_str( + &fs::read_to_string( + dir.path() + .join("__cluster/approvals") + .join(format!("{approval_id}.json")), + ) + .unwrap(), + ) + .unwrap(); + assert!(artifact["consumed_at"].is_null()); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!( + recovered + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "graph_delete_incomplete") + ); + assert!(recovered.converged); + assert!(!dir.path().join("graphs/old.omni").exists()); + assert!(recovery_sidecars(dir.path()).is_empty()); + scenario.teardown(); +} + +/// Crash after the removal, before the state CAS: root gone, ledger stale, +/// nothing acknowledged; the next run's sweep rolls the tombstone forward, +/// consumes the approval the sidecar carries, and audits the recovery. +#[tokio::test] +async fn delete_crash_after_removal_rolls_forward() { + let scenario = FailScenario::setup(); + let dir = fixture(); + let approval_id = seed_approved_delete(dir.path()).await; + let state_before = fs::read(state_path(dir.path())).unwrap(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_delete", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(!out.state_written); + assert!(!dir.path().join("graphs/old.omni").exists()); + assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before); + let sidecars = recovery_sidecars(dir.path()); + assert_eq!(sidecars.len(), 1); + let sidecar: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); + assert_eq!(sidecar["approval_id"], approval_id.as_str()); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!( + recovered + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(recovered.converged); + let state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap(); + assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone"); + assert!(state["approval_records"][&approval_id]["consumed_at"].is_string()); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["kind"] == "graph_delete") + ); + scenario.teardown(); +} diff --git a/docs/dev/rfc-004-cluster-graph-schema-apply.md b/docs/dev/rfc-004-cluster-graph-schema-apply.md index ca72fdc..e9c0336 100644 --- a/docs/dev/rfc-004-cluster-graph-schema-apply.md +++ b/docs/dev/rfc-004-cluster-graph-schema-apply.md @@ -1,6 +1,7 @@ # RFC: Cluster Graph & Schema Apply — Phase 4 of the Cluster Control Plane -**Status:** Proposed +**Status:** Landed (4A #170, 4B #171, 4C — all shipped) +**Implementation deviations:** (1) D3 row 8 retires the stale delete sidecar and lets the still-approved delete re-propose and retry, instead of a pending-block — prefix removal is idempotent, so the retry is the repair. (2) The approver/actor flag is the CLI's existing global `--as`, not a dedicated `--actor`/`--by`. (3) Consumed approval artifacts are rewritten with `consumed_at` rather than moved into state — the file and the ledger record both survive independently (axiom 11). **Date:** 2026-06-10 **Builds on:** cluster Stages 1–3B (shipped: validate/plan/status/refresh/import/force-unlock, config-only `cluster apply` with content-addressed catalog publish, catalog payload verification, failpoint-proven crash/CAS recovery for the apply protocol). Normative context: [cluster-config-specs.md](cluster-config-specs.md), [cluster-axioms.md](cluster-axioms.md), [cluster-config-implementation-spec.md](cluster-config-implementation-spec.md). **Target release:** unversioned (phased — see Sequencing); no cluster functionality is in a tagged release yet. diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 5402ccf..2302b13 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), and Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows) | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), and Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 774ea6b..9dc8a25 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -19,7 +19,7 @@ Top-level command families and subcommands. Graph-targeting commands accept eith | `commit list \| show` | inspect commit graph | | `schema plan \| apply \| show (alias: get)` | migrations | | `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` | -| `cluster validate \| plan \| apply \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` and annotates each change with its apply disposition; `apply` executes the config-only (stored-query/policy) subset into the content-addressed local catalog under `__cluster/resources/` — graph/schema changes are deferred loudly, and nothing applied serves traffic (the server still boots from `omnigraph.yaml`); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock ` manually removes a held local state lock by exact id. No graph-manifest movement, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 3A | +| `cluster validate \| plan \| apply \| approve \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` and annotates each change with its apply disposition; `apply` executes the config-only (stored-query/policy) subset into the content-addressed local catalog under `__cluster/resources/` — graph/schema changes are deferred loudly, and nothing applied serves traffic (the server still boots from `omnigraph.yaml`); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock ` manually removes a held local state lock by exact id. No graph-manifest movement, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 3A | | `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns or uncovered drift; `--json` reports `skipped`) | | `repair [--confirm] [--force]` | preview or explicitly publish uncovered manifest/head drift. `--confirm` heals verified maintenance drift and exits non-zero if suspicious/unverifiable drift is refused; `--force --confirm` publishes suspicious/unverifiable drift after operator review | | `cleanup --keep N --older-than 7d --confirm` | destructive version GC | @@ -79,6 +79,7 @@ policy: omnigraph cluster validate --config ./company-brain omnigraph cluster plan --config ./company-brain --json omnigraph cluster apply --config ./company-brain --json +omnigraph cluster approve graph. --config ./company-brain --as omnigraph cluster status --config ./company-brain --json omnigraph cluster refresh --config ./company-brain --json omnigraph cluster import --config ./company-brain --json diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 9de305a..2df26be 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -1,6 +1,6 @@ # Cluster Config -**Status:** Stage 4B schema-apply preview. +**Status:** Stage 4C — Phase 4 complete (graph create, schema apply, gated graph delete). Cluster config is the future control-plane configuration surface for a whole OmniGraph deployment. In this stage, OmniGraph can validate a local @@ -9,11 +9,10 @@ local JSON state ledger, explicitly refresh/import graph observations into that ledger, manually remove a held local state lock by exact lock id, and **apply the executable subset of the plan** — stored-query and policy-bundle catalog writes, **graph creation** (a declared graph that does not exist yet -is initialized by apply at the derived root), and **schema updates**: a -changed schema is migrated on the live graph by apply itself, soft drops -only. It does not delete graphs (a later stage), perform data-loss -migrations, start servers, or serve anything it applies: the server still -boots from `omnigraph.yaml`. +is initialized by apply at the derived root), **schema updates** (soft drops +only), and — behind an explicit, digest-bound **approval** — **graph +deletion**. It does not perform data-loss schema migrations, start servers, +or serve anything it applies: the server still boots from `omnigraph.yaml`. ## Commands @@ -21,6 +20,7 @@ boots from `omnigraph.yaml`. omnigraph cluster validate --config ./company-brain omnigraph cluster plan --config ./company-brain --json omnigraph cluster apply --config ./company-brain --json +omnigraph cluster approve graph. --config ./company-brain --as omnigraph cluster status --config ./company-brain --json omnigraph cluster refresh --config ./company-brain --json omnigraph cluster import --config ./company-brain --json @@ -253,7 +253,38 @@ in recovery sidecars and audit entries and threads it to the engine's schema-apply (so commit attribution and Cedar enforcement — wherever a policy checker is installed — work unchanged). -Schema deletes (removing a graph) are never executed by this stage. They are +### Approvals and graph deletion + +Deleting a graph is the irreversible tier: it requires a recorded human +decision. `cluster plan` lists the gate under `approvals_required` (one gate +per graph — the graph-level approval carries its schema and queries); +`cluster approve graph. --as ` writes a digest-bound artifact to + +```text +/__cluster/approvals/.json +``` + +bound to the exact desired config digest and the change's state digest, so +**any config or state drift after approving invalidates the artifact** +automatically (`approval_stale` warning; it never authorizes a different +change). An unapproved delete blocks with `approval_required`. + +An approved delete executes **last** in the apply run: the graph root is +removed recursively, the subtree (graph, schema, its queries) is tombstoned +out of the state ledger with a tombstone observation, and the approval is +consumed — recorded in the state's `approval_records` in the same state +update, and the artifact file rewritten with `consumed_at` (the file is never +deleted: the audit fact survives the loss of either store). A failed run +consumes nothing; the approval stays valid for the retry. Catalog blobs of +the deleted graph's queries stay on disk (GC is a later stage). + +Crash recovery for deletes: a completed-but-unrecorded delete is rolled +forward by the sweep (tombstone + approval consumption + audit entry); an +incomplete delete (root still present) is retired with a +`graph_delete_incomplete` warning and simply **re-proposed** — prefix removal +is idempotent, so the still-approved retry is the repair. + +Standalone schema deletes are never executed by this stage. They are reported as `deferred` (warning `apply_unsupported_change`), and query/policy changes that depend on them are `blocked` (warning `apply_dependency_blocked`, status `blocked` in state). A partially-applicable plan still exits 0 with warnings;