diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index bfa538d..336f19e 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -1358,7 +1358,7 @@ fn cluster_e2e_graph_root_destruction_drifts_then_apply_recreates_empty_graph() /// (applied), its composite (derived) — shows all four dispositions at once /// before the graph-plane schema apply closes the loop. #[test] -fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { +fn cluster_e2e_multi_graph_mixed_dispositions_then_approve_and_converge() { let temp = tempdir().unwrap(); write_multi_graph_cluster_fixture(temp.path()); // No manual init: Stage 4A creates both graphs. @@ -1468,7 +1468,55 @@ policies: let mut sorted = order.clone(); sorted.sort_unstable(); assert_eq!(order, sorted, "{mixed}"); - // Conclusion (approve + converge) extends below once the delete executor lands. + // The conclusion: an apply without approval stays blocked; the approved + // delete converges the cluster, tombstoning the removed graph. + let still_blocked = cluster_json(temp.path(), "apply"); + assert_eq!(still_blocked["converged"], false, "{still_blocked}"); + + let approve = parse_stdout_json(&output_success( + cli() + .arg("--as") + .arg("andrew") + .arg("cluster") + .arg("approve") + .arg("graph.engineering") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(approve["ok"], true, "{approve}"); + assert_eq!(approve["approved_by"], "andrew"); + + let converge = cluster_json(temp.path(), "apply"); + assert_eq!(converge["ok"], true, "{converge}"); + assert_eq!(converge["converged"], true, "{converge}"); + assert!(!temp.path().join("graphs/engineering.omni").exists()); + + let status = cluster_json(temp.path(), "status"); + assert_eq!(status["observations"]["graph.engineering"]["kind"], "tombstone"); + let final_plan = cluster_json(temp.path(), "plan"); + assert!( + final_plan["changes"].as_array().unwrap().is_empty(), + "{final_plan}" + ); +} + +/// An approval without an approver is meaningless: approve requires --as. +#[test] +fn cluster_e2e_approve_requires_actor() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let output = output_failure( + cli() + .arg("cluster") + .arg("approve") + .arg("graph.knowledge") + .arg("--config") + .arg(temp.path()), + ); + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("--as"), "{stderr}"); } /// Stage 4A headline: a declared graph is created by `cluster apply` itself — diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 2fa0eab..f67d8f7 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -491,6 +491,10 @@ struct RecoverySidecar { desired_schema_digest: String, #[serde(default)] state_cas_base: Option, + /// For graph_delete: the approval this operation consumes; lets a sweep + /// roll-forward consume it too. + #[serde(default)] + approval_id: Option, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] @@ -498,7 +502,7 @@ struct RecoverySidecar { enum RecoverySidecarKind { GraphCreate, SchemaApply, - // GraphDelete arrives with stage 4C. + GraphDelete, } #[derive(Debug, Default)] @@ -509,6 +513,9 @@ struct SweepOutcome { /// Sidecars whose outcome is recorded (rows 2/4): deleted only after the /// command's state write lands, so a CAS failure re-sweeps them. completed_sidecars: Vec, + /// Approval artifacts consumed by a roll-forward (delete row 7b): their + /// files are rewritten with consumed_at only after the state write lands. + consumed_approvals: Vec, } #[derive(Debug)] @@ -942,6 +949,7 @@ pub async fn apply_config_dir_with_options( expected_manifest_version: None, desired_schema_digest: desired_graph.schema_digest.clone(), state_cas_base: expected_cas.clone(), + approval_id: None, }; let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { Ok(path) => path, @@ -1105,6 +1113,7 @@ pub async fn apply_config_dir_with_options( expected_manifest_version: None, desired_schema_digest: desired_graph.schema_digest.clone(), state_cas_base: expected_cas.clone(), + approval_id: None, }; let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { Ok(path) => path, @@ -1290,6 +1299,121 @@ pub async fn apply_config_dir_with_options( ); } + // Approved graph deletes execute LAST (RFC-004 §D5): catalog writes for + // surviving resources land first, then the irreversible work. + let graph_deletes_to_run: Vec = changes + .iter() + .filter(|change| { + change.disposition == Some(ApplyDisposition::Applied) + && change.operation == PlanOperation::Delete + && matches!(resource_kind(&change.resource), ResourceKind::Graph(_)) + }) + .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) + .collect(); + let mut executed_deletes: Vec<(String, Option)> = Vec::new(); // (graph_id, approval_id) + let mut consumed_approval_ids: Vec = Vec::new(); + for graph_id in &graph_deletes_to_run { + if graph_moving_aborted { + diagnostics.push(Diagnostic::warning( + "graph_delete_skipped", + graph_address(graph_id), + "skipped after an earlier graph-moving operation failed in this run", + )); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); + continue; + } + let graph_addr = graph_address(graph_id); + // Re-locate the consumable approval (classification verified one exists). + let approval_id = approval_artifacts + .iter() + .map(|(_, artifact)| artifact) + .find(|artifact| { + artifact.consumed_at.is_none() + && artifact.resource == graph_addr + && artifact.bound_config_digest == desired.config_digest + }) + .map(|artifact| artifact.approval_id.clone()); + let graph_uri = display_path( + &desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ); + let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await { + Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await { + Ok(snapshot) => Some(snapshot.version()), + Err(_) => None, + }, + Err(_) => None, // partial/unopenable roots still get deleted + }; + let sidecar = RecoverySidecar { + schema_version: 1, + operation_id: Ulid::new().to_string(), + started_at: now_rfc3339(), + actor: options.actor.clone(), + kind: RecoverySidecarKind::GraphDelete, + graph_id: graph_id.clone(), + graph_uri: graph_uri.clone(), + observed_manifest_version, + expected_manifest_version: None, // no post-op manifest exists + desired_schema_digest: String::new(), + state_cas_base: expected_cas.clone(), + approval_id: approval_id.clone(), + }; + let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + Ok(path) => path, + Err(diagnostic) => { + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); + graph_moving_aborted = true; + continue; + } + }; + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_graph_delete") { + // Simulated crash before removal: row 8 retires the intent and + // the still-valid approval lets a later run retry. + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); + graph_moving_aborted = true; + continue; + } + match fs::remove_dir_all(PathBuf::from(&graph_uri)) { + Ok(()) => {} + Err(err) if err.kind() == ErrorKind::NotFound => {} // already gone + Err(err) => { + diagnostics.push(Diagnostic::error( + "graph_delete_failed", + graph_addr.clone(), + format!("could not remove graph root '{graph_uri}': {err}"), + )); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete); + graph_moving_aborted = true; + continue; + } + } + // Crash point: the root is gone, the ledger does not record it yet. + // The sweep rolls forward (row 7b) and consumes the approval. + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_graph_delete") { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + executed_deletes.push((graph_id.clone(), approval_id.clone())); + if let Some(approval_id) = approval_id { + consumed_approval_ids.push(approval_id); + } + completed_op_sidecars.push(sidecar_path); + } + if !failed_graphs.is_empty() { + demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies); + } + // State mutation. Apply owns query/policy statuses only; graph/schema // statuses belong to refresh/import observation and must not be clobbered // (the sweep above is the one exception: it owns recovery statuses). @@ -1330,6 +1454,17 @@ pub async fn apply_config_dir_with_options( _ => {} } } + for (graph_id, approval_id) in &executed_deletes { + tombstone_graph_subtree( + &mut new_state, + graph_id, + approval_id.as_deref(), + options.actor.as_deref(), + ); + if let Some(approval_id) = approval_id { + record_approval_consumed(&mut new_state, approval_id, "apply"); + } + } recompute_state_graph_digests(&mut new_state, &desired); let residual = diff_resources( @@ -1371,6 +1506,9 @@ pub async fn apply_config_dir_with_options( { let _ = fs::remove_file(sidecar_path); } + let mut all_consumed = sweep.consumed_approvals.clone(); + all_consumed.extend(consumed_approval_ids.iter().cloned()); + mark_approvals_consumed(&backend, &all_consumed); } // On a failed state write, report the statuses that are actually on disk // (the pre-apply snapshot), not the in-memory mutations that were never @@ -1827,6 +1965,7 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St for sidecar_path in &sweep.completed_sidecars { let _ = fs::remove_file(sidecar_path); } + mark_approvals_consumed(&backend, &sweep.consumed_approvals); } Err(diagnostic) => diagnostics.push(diagnostic), } @@ -2558,6 +2697,9 @@ async fn sweep_recovery_sidecars( RecoverySidecarKind::SchemaApply => { sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; } + RecoverySidecarKind::GraphDelete => { + sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome); + } } } outcome @@ -2778,6 +2920,121 @@ async fn sweep_schema_apply_sidecar( } } +fn sweep_graph_delete_sidecar( + path: PathBuf, + sidecar: RecoverySidecar, + state: &mut ClusterState, + diagnostics: &mut Vec, + outcome: &mut SweepOutcome, +) { + let graph_address = graph_address(&sidecar.graph_id); + let root = PathBuf::from(&sidecar.graph_uri); + + if root.exists() { + // Row 8: the delete never completed. Prefix removal is idempotent and + // works on partial roots, so the repair is simply the re-proposed, + // still-approved delete on a later run — retire the stale intent. + diagnostics.push(Diagnostic::warning( + "graph_delete_incomplete", + graph_address, + "a previous graph delete did not complete; it will be re-proposed by plan and can be retried under its approval", + )); + outcome.completed_sidecars.push(path); + return; + } + + if !state.applied_revision.resources.contains_key(&graph_address) { + // Row 7: already tombstoned (or never recorded); crash fell between + // the state CAS and sidecar delete. + outcome.completed_sidecars.push(path); + return; + } + + // Row 7b: the root is gone, the ledger is stale — roll forward the + // tombstone, consume the approval the sidecar carries, audit. + tombstone_graph_subtree(state, &sidecar.graph_id, sidecar.approval_id.as_deref(), sidecar.actor.as_deref()); + state.recovery_records.insert( + sidecar.operation_id.clone(), + json!({ + "kind": "graph_delete", + "graph_id": sidecar.graph_id, + "outcome": "rolled_forward", + "recovered_at": now_rfc3339(), + "actor": sidecar.actor, + }), + ); + if let Some(approval_id) = &sidecar.approval_id { + record_approval_consumed(state, approval_id, &sidecar.operation_id); + outcome.consumed_approvals.push(approval_id.clone()); + } + diagnostics.push(Diagnostic::warning( + "cluster_recovery_rolled_forward", + graph_address, + "an interrupted graph delete had completed on disk; cluster state was rolled forward to match", + )); + outcome.completed_sidecars.push(path); +} + +/// Remove a graph's subtree (graph, schema, queries) from the ledger and +/// leave a tombstone observation. Idempotent. +fn tombstone_graph_subtree( + state: &mut ClusterState, + graph_id: &str, + approval_id: Option<&str>, + actor: Option<&str>, +) { + let graph_addr = graph_address(graph_id); + let schema_addr = schema_address(graph_id); + let query_prefix = format!("query.{graph_id}."); + state.applied_revision.resources.remove(&graph_addr); + state.applied_revision.resources.remove(&schema_addr); + state + .applied_revision + .resources + .retain(|address, _| !address.starts_with(&query_prefix)); + state.resource_statuses.remove(&graph_addr); + state.resource_statuses.remove(&schema_addr); + state + .resource_statuses + .retain(|address, _| !address.starts_with(&query_prefix)); + state.observations.insert( + graph_addr, + json!({ + "kind": "tombstone", + "deleted_at": now_rfc3339(), + "approval_id": approval_id, + "actor": actor, + }), + ); +} + +/// Record approval consumption in the state ledger. The artifact FILE is +/// rewritten with consumed_at only after the state write lands, so a failed +/// CAS leaves the approval valid for the retry. +fn record_approval_consumed(state: &mut ClusterState, approval_id: &str, operation_id: &str) { + state.approval_records.insert( + approval_id.to_string(), + json!({ + "consumed_at": now_rfc3339(), + "consumed_by_operation": operation_id, + }), + ); +} + +/// Mark approval artifact files consumed on disk (post-CAS). +fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) { + if approval_ids.is_empty() { + return; + } + let mut sink = Vec::new(); + for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) { + if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() { + artifact.consumed_at = Some(now_rfc3339()); + let _ = backend.write_approval_artifact(&artifact); + } + } +} + /// Read-only commands report pending sidecars without acting on them. fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec) { let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR); @@ -3547,11 +3804,12 @@ fn classify_changes( schema_pending.insert(graph.clone()); } } - // Subtree deletes ride the approved graph delete. NOTE: execution lands - // with the delete executor; until then approved deletes stay blocked so a - // gate-only build can never strip state without removing the root. - let rides_approved_delete = |_graph: &str| false; - let _ = approved; + // Subtree deletes ride the approved graph delete. + let rides_approved_delete = |graph: &str| { + graph_deletes.contains(graph) + && approved.contains(&graph_address(graph)) + && !pending_recovery.contains(graph) + }; for change in changes.iter_mut() { let (disposition, reason) = match resource_kind(&change.resource) { @@ -3662,6 +3920,7 @@ fn classify_changes( enum FailedGraphOrigin { GraphCreate, SchemaApply, + GraphDelete, } /// After a graph-moving operation fails mid-run, every change that depended @@ -3681,12 +3940,15 @@ fn demote_dependents_of_failed_graphs( let demote_reason = match resource_kind(&change.resource) { ResourceKind::Graph(graph) => match failed.get(&graph) { Some(FailedGraphOrigin::GraphCreate) => Some("graph_create_failed"), + Some(FailedGraphOrigin::GraphDelete) => Some("graph_delete_failed"), Some(FailedGraphOrigin::SchemaApply) => Some("dependency_not_applied"), None => None, }, ResourceKind::Schema(graph) => match failed.get(&graph) { Some(FailedGraphOrigin::SchemaApply) => Some("schema_apply_failed"), - Some(FailedGraphOrigin::GraphCreate) => Some("dependency_not_applied"), + Some(FailedGraphOrigin::GraphCreate) | Some(FailedGraphOrigin::GraphDelete) => { + Some("dependency_not_applied") + } None => None, }, ResourceKind::Query { graph, .. } if failed.contains_key(&graph) => { @@ -6606,6 +6868,200 @@ graphs: assert!(sidecar.exists()); } + /// Seed: converged knowledge subtree + a stale `old` graph subtree with a + /// real directory on disk. + fn seed_deletable_state(config_dir: &Path) { + write_applyable_state(config_dir); + let state = read_state_json(config_dir); + let g = state["applied_revision"]["resources"]["graph.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + let sc = state["applied_revision"]["resources"]["schema.knowledge"]["digest"] + .as_str() + .unwrap() + .to_string(); + write_state_resources( + config_dir, + &[ + ("graph.knowledge", g.as_str()), + ("schema.knowledge", sc.as_str()), + ("graph.old", "3333"), + ("schema.old", "4444"), + ("query.old.q", "5555"), + ], + ); + let root = config_dir.join(CLUSTER_GRAPHS_DIR).join("old.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "stale").unwrap(); + } + + #[tokio::test] + async fn apply_executes_approved_graph_delete() { + let dir = fixture(); + seed_deletable_state(dir.path()); + let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await; + assert!(approved.ok, "{:?}", approved.diagnostics); + let approval_id = approved.approval_id.clone().unwrap(); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.converged, "{out:?}"); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!(by_resource["graph.old"].disposition, Some(ApplyDisposition::Applied)); + assert_eq!(by_resource["schema.old"].disposition, Some(ApplyDisposition::Applied)); + assert_eq!(by_resource["query.old.q"].disposition, Some(ApplyDisposition::Applied)); + // The root is gone; the subtree is tombstoned out of the ledger. + assert!(!dir.path().join(CLUSTER_GRAPHS_DIR).join("old.omni").exists()); + let state = read_state_json(dir.path()); + let resources = state["applied_revision"]["resources"].as_object().unwrap(); + assert!(!resources.contains_key("graph.old")); + assert!(!resources.contains_key("schema.old")); + assert!(!resources.contains_key("query.old.q")); + assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone"); + assert_eq!(state["observations"]["graph.old"]["approval_id"], approval_id); + // Approval consumed in BOTH stores: ledger summary + artifact file. + assert!(state["approval_records"][&approval_id]["consumed_at"].is_string()); + let artifact: serde_json::Value = serde_json::from_str( + &fs::read_to_string( + dir.path() + .join(CLUSTER_APPROVALS_DIR) + .join(format!("{approval_id}.json")), + ) + .unwrap(), + ) + .unwrap(); + assert!(artifact["consumed_at"].is_string(), "{artifact}"); + // Sidecar retired. + assert!( + fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR)) + .map(|mut entries| entries.next().is_none()) + .unwrap_or(true) + ); + // A consumed approval authorizes nothing further (idempotent re-apply). + let again = apply_config_dir(dir.path()).await; + assert!(again.ok && again.converged && !again.state_written, "{again:?}"); + } + + fn write_delete_sidecar( + config_dir: &Path, + graph_id: &str, + approval_id: Option<&str>, + operation_id: &str, + ) -> PathBuf { + let dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join(format!("{operation_id}.json")); + fs::write( + &path, + serde_json::to_string_pretty(&json!({ + "schema_version": 1, + "operation_id": operation_id, + "started_at": "1970-01-01T00:00:00Z", + "kind": "graph_delete", + "graph_id": graph_id, + "graph_uri": derived_graph_uri(config_dir, graph_id), + "desired_schema_digest": "", + "approval_id": approval_id, + })) + .unwrap(), + ) + .unwrap(); + path + } + + #[tokio::test] + async fn sweep_retires_delete_sidecar_when_tombstoned() { + let dir = fixture(); + write_applyable_state(dir.path()); // no graph.old in state, no root + let sidecar = write_delete_sidecar(dir.path(), "old", None, "01DROW7"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert!( + state["recovery_records"] + .as_object() + .is_none_or(|records| records.is_empty()) + ); + } + + #[tokio::test] + async fn sweep_rolls_forward_completed_delete() { + let dir = fixture(); + seed_deletable_state(dir.path()); + // Approve, then simulate: root removed, state stale, sidecar present. + let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await; + let approval_id = approved.approval_id.unwrap(); + fs::remove_dir_all(dir.path().join(CLUSTER_GRAPHS_DIR).join("old.omni")).unwrap(); + let sidecar = write_delete_sidecar(dir.path(), "old", Some(&approval_id), "01DROW7B"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert!( + !state["applied_revision"]["resources"] + .as_object() + .unwrap() + .contains_key("graph.old") + ); + assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone"); + assert!(state["approval_records"][&approval_id]["consumed_at"].is_string()); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["kind"] == "graph_delete" + && record["outcome"] == "rolled_forward") + ); + // The artifact file is marked consumed post-CAS. + let artifact: serde_json::Value = serde_json::from_str( + &fs::read_to_string( + dir.path() + .join(CLUSTER_APPROVALS_DIR) + .join(format!("{approval_id}.json")), + ) + .unwrap(), + ) + .unwrap(); + assert!(artifact["consumed_at"].is_string()); + assert!(out.converged, "{out:?}"); + } + + #[tokio::test] + async fn sweep_reproposes_incomplete_delete() { + let dir = fixture(); + seed_deletable_state(dir.path()); // root present + let approved = approve_config_dir(dir.path(), "graph.old", "andrew").await; + assert!(approved.ok); + let sidecar = write_delete_sidecar(dir.path(), "old", approved.approval_id.as_deref(), "01DROW8"); + + // Row 8: the stale intent is retired with a warning, and the same run + // re-executes the still-approved delete to completion. + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "graph_delete_incomplete") + ); + assert!(!sidecar.exists()); + assert!(!dir.path().join(CLUSTER_GRAPHS_DIR).join("old.omni").exists()); + assert!(out.converged, "{out:?}"); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture();