diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index f26110e..3faacaa 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -20,10 +20,12 @@ use ulid::Ulid; pub mod failpoints; mod serve; +mod sweep; mod store; use store::{LocalStateBackend, StateLockGuard, StateSnapshot}; pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot}; use serve::read_verified_payload; +use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars}; pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml"; pub const CLUSTER_GRAPHS_DIR: &str = "graphs"; @@ -2291,387 +2293,6 @@ fn initial_import_state(desired: &DesiredCluster) -> ClusterState { } } -/// Recovery sweep (RFC-004 §D3): runs at the start of every state-mutating -/// cluster command, under the state lock, before the command's own work. -/// Roll-forward-only — the engine's own sidecars make each graph-level -/// operation atomic within the graph, so the cluster never rolls a graph -/// back; it converges the ledger to observable reality or refuses loudly. -/// Mutations ride the calling command's CAS-checked state write; completed -/// sidecars are deleted only after that write lands. -async fn sweep_recovery_sidecars( - backend: &LocalStateBackend, - state: &mut ClusterState, - diagnostics: &mut Vec, -) -> SweepOutcome { - let mut outcome = SweepOutcome::default(); - for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) { - match sidecar.kind { - RecoverySidecarKind::GraphCreate => { - sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; - } - RecoverySidecarKind::SchemaApply => { - sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; - } - RecoverySidecarKind::GraphDelete => { - sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome); - } - } - } - outcome -} - -async fn sweep_graph_create_sidecar( - path: PathBuf, - sidecar: RecoverySidecar, - state: &mut ClusterState, - diagnostics: &mut Vec, - outcome: &mut SweepOutcome, -) { - let graph_address = graph_address(&sidecar.graph_id); - let schema_addr = schema_address(&sidecar.graph_id); - let graph_path = PathBuf::from(&sidecar.graph_uri); - - // Row 1: nothing moved — the init never landed. The sidecar is pure - // intent; remove it and let the command's own plan re-propose the create. - if !graph_path.exists() { - let _ = fs::remove_file(&path); - return; - } - - match Omnigraph::open_read_only(&sidecar.graph_uri).await { - Ok(db) => { - let live_digest = sha256_hex(db.schema_source().as_bytes()); - let recorded = state - .applied_revision - .resources - .get(&schema_addr) - .map(|resource| resource.digest.clone()); - if recorded.as_deref() == Some(live_digest.as_str()) { - // Row 2: crash fell between the state CAS and sidecar delete. - outcome.completed_sidecars.push(path); - } else if live_digest == sidecar.desired_schema_digest { - // Row 4: the create completed on the graph; roll the cluster - // state forward to observable reality. - state.applied_revision.resources.insert( - schema_addr.clone(), - StateResource { - digest: live_digest.clone(), - applies_to: None, - }, - ); - let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); - let composite = - graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); - state - .applied_revision - .resources - .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); - set_resource_status_applied(state, &graph_address); - set_resource_status_applied(state, &schema_addr); - state.recovery_records.insert( - sidecar.operation_id.clone(), - json!({ - "kind": "graph_create", - "graph_id": sidecar.graph_id, - "outcome": "rolled_forward", - "recovered_at": now_rfc3339(), - "actor": sidecar.actor, - }), - ); - diagnostics.push(Diagnostic::warning( - "cluster_recovery_rolled_forward", - graph_address.clone(), - "an interrupted graph create had completed on the graph; cluster state was rolled forward to match", - )); - outcome.completed_sidecars.push(path); - } else { - // Row 6: the graph moved to something the sidecar did not - // intend. Refuse to guess; require refresh + operator re-plan. - set_resource_status( - state, - &graph_address, - ResourceLifecycleStatus::Drifted, - "actual_applied_state_pending", - "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", - ); - set_resource_status( - state, - &schema_addr, - ResourceLifecycleStatus::Drifted, - "actual_applied_state_pending", - "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", - ); - diagnostics.push(Diagnostic::warning( - "cluster_recovery_pending", - graph_address.clone(), - "an interrupted graph create left unexpected graph state; graph-moving work is blocked until repaired", - )); - outcome.pending_graphs.insert(sidecar.graph_id.clone()); - } - } - Err(err) => { - // Row 5: partial root (the engine's documented init gap). Never - // auto-delete — reconciler deletes are the same data-loss class - // as human deletes; the operator removes the root explicitly. - set_resource_status( - state, - &graph_address, - ResourceLifecycleStatus::Error, - "graph_create_incomplete", - "graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`", - ); - set_resource_status( - state, - &schema_addr, - ResourceLifecycleStatus::Error, - "graph_create_incomplete", - "graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`", - ); - diagnostics.push(Diagnostic::error( - "graph_create_incomplete", - graph_address.clone(), - format!( - "graph root '{}' exists but cannot be opened ({err}); remove the graph root and re-run `cluster apply`", - sidecar.graph_uri - ), - )); - outcome.pending_graphs.insert(sidecar.graph_id.clone()); - } - } -} - -async fn sweep_schema_apply_sidecar( - path: PathBuf, - sidecar: RecoverySidecar, - state: &mut ClusterState, - diagnostics: &mut Vec, - outcome: &mut SweepOutcome, -) { - let graph_address = graph_address(&sidecar.graph_id); - let schema_addr = schema_address(&sidecar.graph_id); - - // Digest-based classification: robust to unrelated manifest movement; - // the sidecar's version pins stay forensic. - let live_digest = match Omnigraph::open_read_only(&sidecar.graph_uri).await { - Ok(db) => sha256_hex(db.schema_source().as_bytes()), - Err(err) => { - // Cannot verify the interrupted operation — refuse to guess. - diagnostics.push(Diagnostic::warning( - "cluster_recovery_pending", - graph_address.clone(), - format!( - "an interrupted schema apply cannot be verified (graph '{}' did not open: {err}); graph-moving work is blocked until repaired", - sidecar.graph_uri - ), - )); - outcome.pending_graphs.insert(sidecar.graph_id.clone()); - return; - } - }; - - let recorded = state - .applied_revision - .resources - .get(&schema_addr) - .map(|resource| resource.digest.clone()); - if recorded.as_deref() == Some(live_digest.as_str()) { - // Ledger consistent with the live graph (the apply never landed, or - // landed and was recorded): the sidecar is stale intent — retire it. - outcome.completed_sidecars.push(path); - } else if live_digest == sidecar.desired_schema_digest { - // RFC-004 §D3 row 3: the schema apply completed on the graph; roll - // the cluster state forward to observable reality. - state.applied_revision.resources.insert( - schema_addr.clone(), - StateResource { - digest: live_digest.clone(), - applies_to: None, - }, - ); - let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); - let composite = graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); - state - .applied_revision - .resources - .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); - set_resource_status_applied(state, &graph_address); - set_resource_status_applied(state, &schema_addr); - state.recovery_records.insert( - sidecar.operation_id.clone(), - json!({ - "kind": "schema_apply", - "graph_id": sidecar.graph_id, - "outcome": "rolled_forward", - "recovered_at": now_rfc3339(), - "actor": sidecar.actor, - }), - ); - diagnostics.push(Diagnostic::warning( - "cluster_recovery_rolled_forward", - graph_address.clone(), - "an interrupted schema apply had completed on the graph; cluster state was rolled forward to match", - )); - outcome.completed_sidecars.push(path); - } else { - // Row 6: live schema is neither the recorded nor the desired digest. - set_resource_status( - state, - &graph_address, - ResourceLifecycleStatus::Drifted, - "actual_applied_state_pending", - "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", - ); - set_resource_status( - state, - &schema_addr, - ResourceLifecycleStatus::Drifted, - "actual_applied_state_pending", - "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", - ); - diagnostics.push(Diagnostic::warning( - "cluster_recovery_pending", - graph_address.clone(), - "an interrupted schema apply left unexpected graph state; graph-moving work is blocked until repaired", - )); - outcome.pending_graphs.insert(sidecar.graph_id.clone()); - } -} - -fn sweep_graph_delete_sidecar( - path: PathBuf, - sidecar: RecoverySidecar, - state: &mut ClusterState, - diagnostics: &mut Vec, - outcome: &mut SweepOutcome, -) { - let graph_address = graph_address(&sidecar.graph_id); - let root = PathBuf::from(&sidecar.graph_uri); - - if root.exists() { - // Row 8: the delete never completed. Prefix removal is idempotent and - // works on partial roots, so the repair is simply the re-proposed, - // still-approved delete on a later run — retire the stale intent. - diagnostics.push(Diagnostic::warning( - "graph_delete_incomplete", - graph_address, - "a previous graph delete did not complete; it will be re-proposed by plan and can be retried under its approval", - )); - outcome.completed_sidecars.push(path); - return; - } - - if !state.applied_revision.resources.contains_key(&graph_address) { - // Row 7: already tombstoned (or never recorded); crash fell between - // the state CAS and sidecar delete. - outcome.completed_sidecars.push(path); - return; - } - - // Row 7b: the root is gone, the ledger is stale — roll forward the - // tombstone, consume the approval the sidecar carries, audit. - tombstone_graph_subtree(state, &sidecar.graph_id, sidecar.approval_id.as_deref(), sidecar.actor.as_deref()); - state.recovery_records.insert( - sidecar.operation_id.clone(), - json!({ - "kind": "graph_delete", - "graph_id": sidecar.graph_id, - "outcome": "rolled_forward", - "recovered_at": now_rfc3339(), - "actor": sidecar.actor, - }), - ); - if let Some(approval_id) = &sidecar.approval_id { - record_approval_consumed(state, approval_id, &sidecar.operation_id); - outcome.consumed_approvals.push(approval_id.clone()); - } - diagnostics.push(Diagnostic::warning( - "cluster_recovery_rolled_forward", - graph_address, - "an interrupted graph delete had completed on disk; cluster state was rolled forward to match", - )); - outcome.completed_sidecars.push(path); -} - -/// Remove a graph's subtree (graph, schema, queries) from the ledger and -/// leave a tombstone observation. Idempotent. -fn tombstone_graph_subtree( - state: &mut ClusterState, - graph_id: &str, - approval_id: Option<&str>, - actor: Option<&str>, -) { - let graph_addr = graph_address(graph_id); - let schema_addr = schema_address(graph_id); - let query_prefix = format!("query.{graph_id}."); - state.applied_revision.resources.remove(&graph_addr); - state.applied_revision.resources.remove(&schema_addr); - state - .applied_revision - .resources - .retain(|address, _| !address.starts_with(&query_prefix)); - state.resource_statuses.remove(&graph_addr); - state.resource_statuses.remove(&schema_addr); - state - .resource_statuses - .retain(|address, _| !address.starts_with(&query_prefix)); - state.observations.insert( - graph_addr, - json!({ - "kind": "tombstone", - "deleted_at": now_rfc3339(), - "approval_id": approval_id, - "actor": actor, - }), - ); -} - -/// Record approval consumption in the state ledger. The artifact FILE is -/// rewritten with consumed_at only after the state write lands, so a failed -/// CAS leaves the approval valid for the retry. -fn record_approval_consumed(state: &mut ClusterState, approval_id: &str, operation_id: &str) { - state.approval_records.insert( - approval_id.to_string(), - json!({ - "consumed_at": now_rfc3339(), - "consumed_by_operation": operation_id, - }), - ); -} - -/// Mark approval artifact files consumed on disk (post-CAS). -fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) { - if approval_ids.is_empty() { - return; - } - let mut sink = Vec::new(); - for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) { - if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() { - artifact.consumed_at = Some(now_rfc3339()); - let _ = backend.write_approval_artifact(&artifact); - } - } -} - -/// Read-only commands report pending sidecars without acting on them. -fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec) { - let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR); - let Ok(entries) = fs::read_dir(&recoveries_dir) else { - return; - }; - let mut names: Vec = entries - .flatten() - .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "json")) - .map(|entry| entry.file_name().to_string_lossy().into_owned()) - .collect(); - names.sort(); - for name in names { - diagnostics.push(Diagnostic::warning( - "cluster_recovery_pending", - format!("{CLUSTER_RECOVERIES_DIR}/{name}"), - "a recovery sidecar from an interrupted apply is pending; the next state-mutating command will classify it", - )); - } -} async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize { let mut graph_error_count = 0; diff --git a/crates/omnigraph-cluster/src/sweep.rs b/crates/omnigraph-cluster/src/sweep.rs new file mode 100644 index 0000000..77ad8c5 --- /dev/null +++ b/crates/omnigraph-cluster/src/sweep.rs @@ -0,0 +1,386 @@ +//! The recovery sweep: RFC-004's roll-forward-only sidecar +//! classification (moved verbatim from lib.rs in the modularization). + +use super::*; + +/// Recovery sweep (RFC-004 §D3): runs at the start of every state-mutating +/// cluster command, under the state lock, before the command's own work. +/// Roll-forward-only — the engine's own sidecars make each graph-level +/// operation atomic within the graph, so the cluster never rolls a graph +/// back; it converges the ledger to observable reality or refuses loudly. +/// Mutations ride the calling command's CAS-checked state write; completed +/// sidecars are deleted only after that write lands. +pub(crate) async fn sweep_recovery_sidecars( + backend: &LocalStateBackend, + state: &mut ClusterState, + diagnostics: &mut Vec, +) -> SweepOutcome { + let mut outcome = SweepOutcome::default(); + for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) { + match sidecar.kind { + RecoverySidecarKind::GraphCreate => { + sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; + } + RecoverySidecarKind::SchemaApply => { + sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; + } + RecoverySidecarKind::GraphDelete => { + sweep_graph_delete_sidecar(path, sidecar, state, diagnostics, &mut outcome); + } + } + } + outcome +} + +pub(crate) async fn sweep_graph_create_sidecar( + path: PathBuf, + sidecar: RecoverySidecar, + state: &mut ClusterState, + diagnostics: &mut Vec, + outcome: &mut SweepOutcome, +) { + let graph_address = graph_address(&sidecar.graph_id); + let schema_addr = schema_address(&sidecar.graph_id); + let graph_path = PathBuf::from(&sidecar.graph_uri); + + // Row 1: nothing moved — the init never landed. The sidecar is pure + // intent; remove it and let the command's own plan re-propose the create. + if !graph_path.exists() { + let _ = fs::remove_file(&path); + return; + } + + match Omnigraph::open_read_only(&sidecar.graph_uri).await { + Ok(db) => { + let live_digest = sha256_hex(db.schema_source().as_bytes()); + let recorded = state + .applied_revision + .resources + .get(&schema_addr) + .map(|resource| resource.digest.clone()); + if recorded.as_deref() == Some(live_digest.as_str()) { + // Row 2: crash fell between the state CAS and sidecar delete. + outcome.completed_sidecars.push(path); + } else if live_digest == sidecar.desired_schema_digest { + // Row 4: the create completed on the graph; roll the cluster + // state forward to observable reality. + state.applied_revision.resources.insert( + schema_addr.clone(), + StateResource { + digest: live_digest.clone(), + applies_to: None, + }, + ); + let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); + let composite = + graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); + state + .applied_revision + .resources + .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); + set_resource_status_applied(state, &graph_address); + set_resource_status_applied(state, &schema_addr); + state.recovery_records.insert( + sidecar.operation_id.clone(), + json!({ + "kind": "graph_create", + "graph_id": sidecar.graph_id, + "outcome": "rolled_forward", + "recovered_at": now_rfc3339(), + "actor": sidecar.actor, + }), + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_rolled_forward", + graph_address.clone(), + "an interrupted graph create had completed on the graph; cluster state was rolled forward to match", + )); + outcome.completed_sidecars.push(path); + } else { + // Row 6: the graph moved to something the sidecar did not + // intend. Refuse to guess; require refresh + operator re-plan. + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + set_resource_status( + state, + &schema_addr, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + graph_address.clone(), + "an interrupted graph create left unexpected graph state; graph-moving work is blocked until repaired", + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + } + } + Err(err) => { + // Row 5: partial root (the engine's documented init gap). Never + // auto-delete — reconciler deletes are the same data-loss class + // as human deletes; the operator removes the root explicitly. + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Error, + "graph_create_incomplete", + "graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`", + ); + set_resource_status( + state, + &schema_addr, + ResourceLifecycleStatus::Error, + "graph_create_incomplete", + "graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`", + ); + diagnostics.push(Diagnostic::error( + "graph_create_incomplete", + graph_address.clone(), + format!( + "graph root '{}' exists but cannot be opened ({err}); remove the graph root and re-run `cluster apply`", + sidecar.graph_uri + ), + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + } + } +} + +pub(crate) async fn sweep_schema_apply_sidecar( + path: PathBuf, + sidecar: RecoverySidecar, + state: &mut ClusterState, + diagnostics: &mut Vec, + outcome: &mut SweepOutcome, +) { + let graph_address = graph_address(&sidecar.graph_id); + let schema_addr = schema_address(&sidecar.graph_id); + + // Digest-based classification: robust to unrelated manifest movement; + // the sidecar's version pins stay forensic. + let live_digest = match Omnigraph::open_read_only(&sidecar.graph_uri).await { + Ok(db) => sha256_hex(db.schema_source().as_bytes()), + Err(err) => { + // Cannot verify the interrupted operation — refuse to guess. + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + graph_address.clone(), + format!( + "an interrupted schema apply cannot be verified (graph '{}' did not open: {err}); graph-moving work is blocked until repaired", + sidecar.graph_uri + ), + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + return; + } + }; + + let recorded = state + .applied_revision + .resources + .get(&schema_addr) + .map(|resource| resource.digest.clone()); + if recorded.as_deref() == Some(live_digest.as_str()) { + // Ledger consistent with the live graph (the apply never landed, or + // landed and was recorded): the sidecar is stale intent — retire it. + outcome.completed_sidecars.push(path); + } else if live_digest == sidecar.desired_schema_digest { + // RFC-004 §D3 row 3: the schema apply completed on the graph; roll + // the cluster state forward to observable reality. + state.applied_revision.resources.insert( + schema_addr.clone(), + StateResource { + digest: live_digest.clone(), + applies_to: None, + }, + ); + let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); + let composite = graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); + state + .applied_revision + .resources + .insert(graph_address.clone(), StateResource { digest: composite, applies_to: None }); + set_resource_status_applied(state, &graph_address); + set_resource_status_applied(state, &schema_addr); + state.recovery_records.insert( + sidecar.operation_id.clone(), + json!({ + "kind": "schema_apply", + "graph_id": sidecar.graph_id, + "outcome": "rolled_forward", + "recovered_at": now_rfc3339(), + "actor": sidecar.actor, + }), + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_rolled_forward", + graph_address.clone(), + "an interrupted schema apply had completed on the graph; cluster state was rolled forward to match", + )); + outcome.completed_sidecars.push(path); + } else { + // Row 6: live schema is neither the recorded nor the desired digest. + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + set_resource_status( + state, + &schema_addr, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + graph_address.clone(), + "an interrupted schema apply left unexpected graph state; graph-moving work is blocked until repaired", + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + } +} + +pub(crate) fn sweep_graph_delete_sidecar( + path: PathBuf, + sidecar: RecoverySidecar, + state: &mut ClusterState, + diagnostics: &mut Vec, + outcome: &mut SweepOutcome, +) { + let graph_address = graph_address(&sidecar.graph_id); + let root = PathBuf::from(&sidecar.graph_uri); + + if root.exists() { + // Row 8: the delete never completed. Prefix removal is idempotent and + // works on partial roots, so the repair is simply the re-proposed, + // still-approved delete on a later run — retire the stale intent. + diagnostics.push(Diagnostic::warning( + "graph_delete_incomplete", + graph_address, + "a previous graph delete did not complete; it will be re-proposed by plan and can be retried under its approval", + )); + outcome.completed_sidecars.push(path); + return; + } + + if !state.applied_revision.resources.contains_key(&graph_address) { + // Row 7: already tombstoned (or never recorded); crash fell between + // the state CAS and sidecar delete. + outcome.completed_sidecars.push(path); + return; + } + + // Row 7b: the root is gone, the ledger is stale — roll forward the + // tombstone, consume the approval the sidecar carries, audit. + tombstone_graph_subtree(state, &sidecar.graph_id, sidecar.approval_id.as_deref(), sidecar.actor.as_deref()); + state.recovery_records.insert( + sidecar.operation_id.clone(), + json!({ + "kind": "graph_delete", + "graph_id": sidecar.graph_id, + "outcome": "rolled_forward", + "recovered_at": now_rfc3339(), + "actor": sidecar.actor, + }), + ); + if let Some(approval_id) = &sidecar.approval_id { + record_approval_consumed(state, approval_id, &sidecar.operation_id); + outcome.consumed_approvals.push(approval_id.clone()); + } + diagnostics.push(Diagnostic::warning( + "cluster_recovery_rolled_forward", + graph_address, + "an interrupted graph delete had completed on disk; cluster state was rolled forward to match", + )); + outcome.completed_sidecars.push(path); +} + +/// Remove a graph's subtree (graph, schema, queries) from the ledger and +/// leave a tombstone observation. Idempotent. +pub(crate) fn tombstone_graph_subtree( + state: &mut ClusterState, + graph_id: &str, + approval_id: Option<&str>, + actor: Option<&str>, +) { + let graph_addr = graph_address(graph_id); + let schema_addr = schema_address(graph_id); + let query_prefix = format!("query.{graph_id}."); + state.applied_revision.resources.remove(&graph_addr); + state.applied_revision.resources.remove(&schema_addr); + state + .applied_revision + .resources + .retain(|address, _| !address.starts_with(&query_prefix)); + state.resource_statuses.remove(&graph_addr); + state.resource_statuses.remove(&schema_addr); + state + .resource_statuses + .retain(|address, _| !address.starts_with(&query_prefix)); + state.observations.insert( + graph_addr, + json!({ + "kind": "tombstone", + "deleted_at": now_rfc3339(), + "approval_id": approval_id, + "actor": actor, + }), + ); +} + +/// Record approval consumption in the state ledger. The artifact FILE is +/// rewritten with consumed_at only after the state write lands, so a failed +/// CAS leaves the approval valid for the retry. +pub(crate) fn record_approval_consumed(state: &mut ClusterState, approval_id: &str, operation_id: &str) { + state.approval_records.insert( + approval_id.to_string(), + json!({ + "consumed_at": now_rfc3339(), + "consumed_by_operation": operation_id, + }), + ); +} + +/// Mark approval artifact files consumed on disk (post-CAS). +pub(crate) fn mark_approvals_consumed(backend: &LocalStateBackend, approval_ids: &[String]) { + if approval_ids.is_empty() { + return; + } + let mut sink = Vec::new(); + for (_, mut artifact) in backend.list_approval_artifacts(&mut sink) { + if approval_ids.contains(&artifact.approval_id) && artifact.consumed_at.is_none() { + artifact.consumed_at = Some(now_rfc3339()); + let _ = backend.write_approval_artifact(&artifact); + } + } +} + +/// Read-only commands report pending sidecars without acting on them. +pub(crate) fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec) { + let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + let Ok(entries) = fs::read_dir(&recoveries_dir) else { + return; + }; + let mut names: Vec = entries + .flatten() + .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "json")) + .map(|entry| entry.file_name().to_string_lossy().into_owned()) + .collect(); + names.sort(); + for name in names { + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + format!("{CLUSTER_RECOVERIES_DIR}/{name}"), + "a recovery sidecar from an interrupted apply is pending; the next state-mutating command will classify it", + )); + } +}