From bf8cc7a753a73073373bc28a1c3947140e7456dc Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 04:50:42 +0300 Subject: [PATCH] feat(cluster): graph-create recovery sidecars and sweep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RFC-004 §D2/§D3 for the graph_create kind. RecoverySidecar records intent under __cluster/recoveries/{ulid}.json; the roll-forward-only sweep runs at the start of apply/refresh/import under the state lock and classifies each survivor by observation: root absent -> intent removed (row 1); outcome already recorded -> retired (row 2); create completed but state stale -> ledger rolled forward with a recovery_records audit entry (row 4); partial root -> Error/graph_create_incomplete, kept, never auto-deleted (row 5); unexpected schema -> Drifted/actual_applied_state_pending, kept (row 6). Sweep mutations ride the command's existing CAS write; completed sidecars are deleted only after that write lands. Read-only status/plan warn (cluster_recovery_pending) without acting. The apply payload gate now counts only payload-phase errors so kept-sidecar diagnostics don't abort the run before their statuses persist. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/lib.rs | 554 +++++++++++++++++++++++++++- 1 file changed, 548 insertions(+), 6 deletions(-) diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 56513ca..fc13bab 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -24,6 +24,7 @@ pub const CLUSTER_STATE_DIR: &str = "__cluster"; pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json"; pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources"; +pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries"; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -415,11 +416,53 @@ struct StateLockFile { pid: u32, } +/// Recovery-intent record for a graph-moving apply operation (RFC-004 §D2). +/// Written under the state lock before the engine call that can create or +/// move a graph manifest; deleted only after the cluster state CAS that +/// records the outcome lands. The sweep (§D3) classifies survivors. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct RecoverySidecar { + schema_version: u32, + operation_id: String, + started_at: String, + #[serde(default)] + actor: Option, + kind: RecoverySidecarKind, + graph_id: String, + graph_uri: String, + #[serde(default)] + observed_manifest_version: Option, + #[serde(default)] + expected_manifest_version: Option, + desired_schema_digest: String, + #[serde(default)] + state_cas_base: Option, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +enum RecoverySidecarKind { + GraphCreate, + // SchemaApply and GraphDelete arrive with stages 4B/4C. +} + +#[derive(Debug, Default)] +struct SweepOutcome { + /// Graphs whose sidecar was kept (rows 5/6): graph-moving work for them + /// is blocked until the operator repairs and re-observes. + pending_graphs: BTreeSet, + /// Sidecars whose outcome is recorded (rows 2/4): deleted only after the + /// command's state write lands, so a CAS failure re-sweeps them. + completed_sidecars: Vec, +} + #[derive(Debug)] struct LocalStateBackend { state_dir: PathBuf, state_path: PathBuf, lock_path: PathBuf, + recoveries_dir: PathBuf, } #[derive(Debug)] @@ -513,6 +556,10 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { None }; + // Plan is read-only: pending sidecars are reported, never acted on + // (RFC-004 open question 3 keeps read-only commands warn-only). + warn_pending_recovery_sidecars(&desired.config_dir, &mut diagnostics); + let mut prior_resources = BTreeMap::new(); if !has_errors(&diagnostics) { match backend.read_state(&mut observations) { @@ -656,7 +703,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { } }; let expected_cas = snapshot.state_cas; - let Some(state) = snapshot.state else { + let Some(mut state) = snapshot.state else { diagnostics.push(Diagnostic::error( "state_missing", CLUSTER_STATE_FILE, @@ -672,9 +719,16 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ); }; + // Snapshot the as-read state BEFORE the sweep so sweep mutations count as + // changes for the final dirty check and get persisted by the state CAS. + let before_value = + serde_json::to_value(&state).expect("cluster state must serialize deterministically"); + let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await; + let prior_resources = state_resource_digests(&state); let mut changes = diff_resources(&prior_resources, &desired.resource_digests); classify_changes(&mut changes, &desired.dependencies); + let _ = &sweep.pending_graphs; // consumed by the graph-create executor (4A C4) // Defensive invariant: nothing the approval gate covers may be executable. // Today approvals only cover graph/schema deletes (always deferred); this @@ -723,6 +777,9 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { // Payload phase: content-addressed writes before the state CAS. Any // failure aborts before state moves; blobs already written are inert. + // Gate on payload-phase errors only — sweep errors (e.g. a kept row-5 + // sidecar) must not abort the run, or their statuses would never persist. + let errors_before_payloads = count_errors(&diagnostics); let source_paths: BTreeMap<&str, &str> = desired .resources .iter() @@ -761,7 +818,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { diagnostics.push(diagnostic); } } - if has_errors(&diagnostics) { + if count_errors(&diagnostics) > errors_before_payloads { return early_return( display_path(&desired.config_dir), Some(desired.config_digest), @@ -788,9 +845,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { } // State mutation. Apply owns query/policy statuses only; graph/schema - // statuses belong to refresh/import observation and must not be clobbered. - let before_value = - serde_json::to_value(&state).expect("cluster state must serialize deterministically"); + // statuses belong to refresh/import observation and must not be clobbered + // (the sweep above is the one exception: it owns recovery statuses). let mut new_state = state.clone(); for change in &changes { match change.disposition { @@ -855,6 +911,13 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { } } } + // Completed (rows 2/4) sweep sidecars are deleted only once their outcome + // is durably recorded; on a failed write they stay and re-sweep next run. + if !state_write_failed { + for sidecar_path in &sweep.completed_sidecars { + let _ = fs::remove_file(sidecar_path); + } + } // On a failed state write, report the statuses that are actually on disk // (the pre-apply snapshot), not the in-memory mutations that were never // persisted — automation reading `resource_statuses` independently of `ok` @@ -902,6 +965,7 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let backend = LocalStateBackend::new(&parsed.config_dir); let mut observations = backend.observations(); backend.observe_lock(&mut observations, &mut diagnostics); + warn_pending_recovery_sidecars(&parsed.config_dir, &mut diagnostics); let mut resource_digests = BTreeMap::new(); let mut resource_statuses = BTreeMap::new(); @@ -1107,6 +1171,11 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St (StateSyncOperation::Import, None) => initial_import_state(&desired), }; + // Recovery sweep first (RFC-004 §D3): classify any interrupted graph + // operation before observation/verification so a rolled-forward outcome + // is what those passes see. + let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await; + // Catalog payload verification must run BEFORE graph observation: removing // a drifted query digest first means the live-graph composite recompute // below already excludes it, so the persisted graph. composite stays @@ -1177,7 +1246,13 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St } match backend.write_state(&state, expected_cas.as_deref(), &mut observations) { - Ok(()) => {} + Ok(()) => { + // Completed sweep sidecars are deleted only after their outcome + // is durably recorded; on failure they stay and re-sweep. + for sidecar_path in &sweep.completed_sidecars { + let _ = fs::remove_file(sidecar_path); + } + } Err(diagnostic) => diagnostics.push(diagnostic), } @@ -1307,10 +1382,104 @@ impl LocalStateBackend { Self { state_path: config_dir.join(CLUSTER_STATE_FILE), lock_path: config_dir.join(CLUSTER_LOCK_FILE), + recoveries_dir: config_dir.join(CLUSTER_RECOVERIES_DIR), state_dir, } } + /// List recovery sidecars in ULID (filename) order. Unparseable files are + /// reported as warnings and skipped — they stay on disk for the operator. + fn list_recovery_sidecars( + &self, + diagnostics: &mut Vec, + ) -> Vec<(PathBuf, RecoverySidecar)> { + let mut paths = Vec::new(); + match fs::read_dir(&self.recoveries_dir) { + Ok(entries) => { + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().is_some_and(|ext| ext == "json") { + paths.push(path); + } + } + } + Err(err) if err.kind() == ErrorKind::NotFound => {} + Err(err) => { + diagnostics.push(Diagnostic::warning( + "recovery_sidecar_read_error", + CLUSTER_RECOVERIES_DIR, + format!("could not list recovery sidecars: {err}"), + )); + } + } + paths.sort(); + let mut sidecars = Vec::new(); + for path in paths { + match fs::read_to_string(&path) + .map_err(|err| err.to_string()) + .and_then(|text| { + serde_json::from_str::(&text).map_err(|err| err.to_string()) + }) { + Ok(sidecar) if sidecar.schema_version == 1 => sidecars.push((path, sidecar)), + Ok(sidecar) => diagnostics.push(Diagnostic::warning( + "unsupported_recovery_sidecar_version", + display_path(&path), + format!( + "unsupported recovery sidecar version {}; leaving it in place", + sidecar.schema_version + ), + )), + Err(err) => diagnostics.push(Diagnostic::warning( + "invalid_recovery_sidecar", + display_path(&path), + format!("could not parse recovery sidecar ({err}); leaving it in place"), + )), + } + } + sidecars + } + + /// Atomically write (or rewrite) a recovery sidecar; returns its path. + fn write_recovery_sidecar(&self, sidecar: &RecoverySidecar) -> Result { + fs::create_dir_all(&self.recoveries_dir).map_err(|err| { + Diagnostic::error( + "recovery_sidecar_write_error", + CLUSTER_RECOVERIES_DIR, + format!("could not create recoveries directory: {err}"), + ) + })?; + let target = self + .recoveries_dir + .join(format!("{}.json", sidecar.operation_id)); + let mut payload = serde_json::to_string_pretty(sidecar).map_err(|err| { + Diagnostic::error( + "recovery_sidecar_write_error", + display_path(&target), + format!("could not encode recovery sidecar: {err}"), + ) + })?; + payload.push('\n'); + let tmp_path = self + .recoveries_dir + .join(format!("{}.json.tmp.{}", sidecar.operation_id, Ulid::new())); + fs::write(&tmp_path, payload.as_bytes()).map_err(|err| { + Diagnostic::error( + "recovery_sidecar_write_error", + display_path(&tmp_path), + format!("could not write recovery sidecar: {err}"), + ) + })?; + if let Err(err) = fs::rename(&tmp_path, &target) { + let _ = fs::remove_file(&tmp_path); + return Err(Diagnostic::error( + "recovery_sidecar_write_error", + display_path(&target), + format!("could not move recovery sidecar into place: {err}"), + )); + } + Ok(target) + } + fn observations(&self) -> StateObservations { StateObservations { state_path: display_path(&self.state_path), @@ -1701,6 +1870,169 @@ fn initial_import_state(desired: &DesiredCluster) -> ClusterState { } } +/// Recovery sweep (RFC-004 §D3): runs at the start of every state-mutating +/// cluster command, under the state lock, before the command's own work. +/// Roll-forward-only — the engine's own sidecars make each graph-level +/// operation atomic within the graph, so the cluster never rolls a graph +/// back; it converges the ledger to observable reality or refuses loudly. +/// Mutations ride the calling command's CAS-checked state write; completed +/// sidecars are deleted only after that write lands. +async fn sweep_recovery_sidecars( + backend: &LocalStateBackend, + state: &mut ClusterState, + diagnostics: &mut Vec, +) -> SweepOutcome { + let mut outcome = SweepOutcome::default(); + for (path, sidecar) in backend.list_recovery_sidecars(diagnostics) { + match sidecar.kind { + RecoverySidecarKind::GraphCreate => { + sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; + } + } + } + outcome +} + +async fn sweep_graph_create_sidecar( + path: PathBuf, + sidecar: RecoverySidecar, + state: &mut ClusterState, + diagnostics: &mut Vec, + outcome: &mut SweepOutcome, +) { + let graph_address = graph_address(&sidecar.graph_id); + let schema_addr = schema_address(&sidecar.graph_id); + let graph_path = PathBuf::from(&sidecar.graph_uri); + + // Row 1: nothing moved — the init never landed. The sidecar is pure + // intent; remove it and let the command's own plan re-propose the create. + if !graph_path.exists() { + let _ = fs::remove_file(&path); + return; + } + + match Omnigraph::open_read_only(&sidecar.graph_uri).await { + Ok(db) => { + let live_digest = sha256_hex(db.schema_source().as_bytes()); + let recorded = state + .applied_revision + .resources + .get(&schema_addr) + .map(|resource| resource.digest.clone()); + if recorded.as_deref() == Some(live_digest.as_str()) { + // Row 2: crash fell between the state CAS and sidecar delete. + outcome.completed_sidecars.push(path); + } else if live_digest == sidecar.desired_schema_digest { + // Row 4: the create completed on the graph; roll the cluster + // state forward to observable reality. + state.applied_revision.resources.insert( + schema_addr.clone(), + StateResource { + digest: live_digest.clone(), + }, + ); + let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); + let composite = + graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); + state + .applied_revision + .resources + .insert(graph_address.clone(), StateResource { digest: composite }); + set_resource_status_applied(state, &graph_address); + set_resource_status_applied(state, &schema_addr); + state.recovery_records.insert( + sidecar.operation_id.clone(), + json!({ + "kind": "graph_create", + "graph_id": sidecar.graph_id, + "outcome": "rolled_forward", + "recovered_at": now_rfc3339(), + "actor": sidecar.actor, + }), + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_rolled_forward", + graph_address.clone(), + "an interrupted graph create had completed on the graph; cluster state was rolled forward to match", + )); + outcome.completed_sidecars.push(path); + } else { + // Row 6: the graph moved to something the sidecar did not + // intend. Refuse to guess; require refresh + operator re-plan. + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + set_resource_status( + state, + &schema_addr, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + graph_address.clone(), + "an interrupted graph create left unexpected graph state; graph-moving work is blocked until repaired", + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + } + } + Err(err) => { + // Row 5: partial root (the engine's documented init gap). Never + // auto-delete — reconciler deletes are the same data-loss class + // as human deletes; the operator removes the root explicitly. + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Error, + "graph_create_incomplete", + "graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`", + ); + set_resource_status( + state, + &schema_addr, + ResourceLifecycleStatus::Error, + "graph_create_incomplete", + "graph root exists but cannot be opened; remove the graph root and re-run `cluster apply`", + ); + diagnostics.push(Diagnostic::error( + "graph_create_incomplete", + graph_address.clone(), + format!( + "graph root '{}' exists but cannot be opened ({err}); remove the graph root and re-run `cluster apply`", + sidecar.graph_uri + ), + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + } + } +} + +/// Read-only commands report pending sidecars without acting on them. +fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec) { + let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + let Ok(entries) = fs::read_dir(&recoveries_dir) else { + return; + }; + let mut names: Vec = entries + .flatten() + .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "json")) + .map(|entry| entry.file_name().to_string_lossy().into_owned()) + .collect(); + names.sort(); + for name in names { + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + format!("{CLUSTER_RECOVERIES_DIR}/{name}"), + "a recovery sidecar from an interrupted apply is pending; the next state-mutating command will classify it", + )); + } +} + async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize { let mut graph_error_count = 0; for graph in &desired.graphs { @@ -2868,6 +3200,13 @@ fn has_errors(diagnostics: &[Diagnostic]) -> bool { .any(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error) } +fn count_errors(diagnostics: &[Diagnostic]) -> usize { + diagnostics + .iter() + .filter(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error) + .count() +} + fn display_path(path: &Path) -> String { path.display().to_string() } @@ -4621,6 +4960,209 @@ graphs: ); } + // ---- recovery sidecars + sweep (Stage 4A) ---- + + fn derived_graph_uri(config_dir: &Path, graph_id: &str) -> String { + display_path( + &config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ) + } + + fn write_create_sidecar( + config_dir: &Path, + graph_id: &str, + desired_schema_digest: &str, + operation_id: &str, + ) -> PathBuf { + let dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join(format!("{operation_id}.json")); + fs::write( + &path, + serde_json::to_string_pretty(&json!({ + "schema_version": 1, + "operation_id": operation_id, + "started_at": "1970-01-01T00:00:00Z", + "kind": "graph_create", + "graph_id": graph_id, + "graph_uri": derived_graph_uri(config_dir, graph_id), + "desired_schema_digest": desired_schema_digest, + })) + .unwrap(), + ) + .unwrap(); + path + } + + #[tokio::test] + async fn sweep_removes_sidecar_when_root_absent() { + let dir = fixture(); + write_applyable_state(dir.path()); + let sidecar = write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01ROW1"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + // Row 1: nothing moved; intent removed, run proceeds normally. + assert!(!sidecar.exists()); + assert!(out.converged); + } + + #[tokio::test] + async fn sweep_rolls_forward_completed_create() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_state_resources(dir.path(), &[]); // state predates the create + let desired = validate_config_dir(dir.path()); + let schema_digest = desired.resource_digests["schema.knowledge"].clone(); + let sidecar = write_create_sidecar(dir.path(), "knowledge", &schema_digest, "01ROW4"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + // Row 4: ledger converged to observable reality, audit recorded, + // sidecar retired after the CAS landed. + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + schema_digest + ); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["outcome"] == "rolled_forward" + && record["graph_id"] == "knowledge") + ); + assert!(!sidecar.exists()); + // With the graph rolled forward, the same run converges the catalog. + assert!(out.converged, "{out:?}"); + } + + #[tokio::test] + async fn sweep_completes_already_recorded_create() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); // state already records graph+schema + let desired = validate_config_dir(dir.path()); + let sidecar = write_create_sidecar( + dir.path(), + "knowledge", + &desired.resource_digests["schema.knowledge"], + "01ROW2", + ); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + // Row 2: outcome was already durable; no audit entry, sidecar retired. + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert!( + state["recovery_records"] + .as_object() + .is_none_or(|records| records.is_empty()), + "{state}" + ); + } + + #[tokio::test] + async fn sweep_keeps_sidecar_for_incomplete_root() { + let dir = fixture(); + write_applyable_state(dir.path()); + // A root that exists but cannot be opened: the engine's partial-init gap. + let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni"); + fs::create_dir_all(&root).unwrap(); + fs::write(root.join("_schema.pg"), "junk").unwrap(); + let sidecar = write_create_sidecar(dir.path(), "knowledge", "whatever", "01ROW5"); + + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "graph_create_incomplete") + ); + // Row 5: never auto-delete; sidecar and root stay for the operator, + // and the Error status is persisted by the run's state write. + assert!(sidecar.exists()); + assert!(root.exists()); + let state = read_state_json(dir.path()); + assert_eq!(state["resource_statuses"]["graph.knowledge"]["status"], "error"); + assert!( + state["resource_statuses"]["graph.knowledge"]["conditions"] + .as_array() + .unwrap() + .iter() + .any(|condition| condition == "graph_create_incomplete") + ); + } + + #[tokio::test] + async fn sweep_flags_unexpected_schema_as_pending() { + let dir = fixture(); + write_state_resources(dir.path(), &[]); + // Live graph exists with a schema the sidecar never intended. + let graph_dir = dir.path().join(CLUSTER_GRAPHS_DIR); + fs::create_dir_all(&graph_dir).unwrap(); + Omnigraph::init( + &derived_graph_uri(dir.path(), "knowledge"), + "\nnode Other {\n name: String @key\n}\n", + ) + .await + .unwrap(); + let desired = validate_config_dir(dir.path()); + let sidecar = write_create_sidecar( + dir.path(), + "knowledge", + &desired.resource_digests["schema.knowledge"], + "01ROW6", + ); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); // warning, not error + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_pending") + ); + // Row 6: refuse to guess; sidecar kept, Drifted persisted. + assert!(sidecar.exists()); + let state = read_state_json(dir.path()); + assert_eq!( + state["resource_statuses"]["graph.knowledge"]["status"], + "drifted" + ); + assert!( + state["resource_statuses"]["graph.knowledge"]["conditions"] + .as_array() + .unwrap() + .iter() + .any(|condition| condition == "actual_applied_state_pending") + ); + } + + #[test] + fn status_warns_on_pending_recovery_sidecar() { + let dir = fixture(); + write_applyable_state(dir.path()); + write_create_sidecar(dir.path(), "knowledge", "irrelevant", "01STATUS"); + + let out = status_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_pending" + && diagnostic.severity == DiagnosticSeverity::Warning) + ); + } + #[test] fn plan_annotates_apply_dispositions() { let dir = fixture();