diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index af4ac93..f8a56e7 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -450,7 +450,8 @@ struct RecoverySidecar { #[serde(rename_all = "snake_case")] enum RecoverySidecarKind { GraphCreate, - // SchemaApply and GraphDelete arrive with stages 4B/4C. + SchemaApply, + // GraphDelete arrives with stage 4C. } #[derive(Debug, Default)] @@ -2090,6 +2091,9 @@ async fn sweep_recovery_sidecars( RecoverySidecarKind::GraphCreate => { sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; } + RecoverySidecarKind::SchemaApply => { + sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; + } } } outcome @@ -2214,6 +2218,102 @@ async fn sweep_graph_create_sidecar( } } +async fn sweep_schema_apply_sidecar( + path: PathBuf, + sidecar: RecoverySidecar, + state: &mut ClusterState, + diagnostics: &mut Vec, + outcome: &mut SweepOutcome, +) { + let graph_address = graph_address(&sidecar.graph_id); + let schema_addr = schema_address(&sidecar.graph_id); + + // Digest-based classification: robust to unrelated manifest movement; + // the sidecar's version pins stay forensic. + let live_digest = match Omnigraph::open_read_only(&sidecar.graph_uri).await { + Ok(db) => sha256_hex(db.schema_source().as_bytes()), + Err(err) => { + // Cannot verify the interrupted operation — refuse to guess. + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + graph_address.clone(), + format!( + "an interrupted schema apply cannot be verified (graph '{}' did not open: {err}); graph-moving work is blocked until repaired", + sidecar.graph_uri + ), + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + return; + } + }; + + let recorded = state + .applied_revision + .resources + .get(&schema_addr) + .map(|resource| resource.digest.clone()); + if recorded.as_deref() == Some(live_digest.as_str()) { + // Ledger consistent with the live graph (the apply never landed, or + // landed and was recorded): the sidecar is stale intent — retire it. + outcome.completed_sidecars.push(path); + } else if live_digest == sidecar.desired_schema_digest { + // RFC-004 §D3 row 3: the schema apply completed on the graph; roll + // the cluster state forward to observable reality. + state.applied_revision.resources.insert( + schema_addr.clone(), + StateResource { + digest: live_digest.clone(), + }, + ); + let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); + let composite = graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); + state + .applied_revision + .resources + .insert(graph_address.clone(), StateResource { digest: composite }); + set_resource_status_applied(state, &graph_address); + set_resource_status_applied(state, &schema_addr); + state.recovery_records.insert( + sidecar.operation_id.clone(), + json!({ + "kind": "schema_apply", + "graph_id": sidecar.graph_id, + "outcome": "rolled_forward", + "recovered_at": now_rfc3339(), + "actor": sidecar.actor, + }), + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_rolled_forward", + graph_address.clone(), + "an interrupted schema apply had completed on the graph; cluster state was rolled forward to match", + )); + outcome.completed_sidecars.push(path); + } else { + // Row 6: live schema is neither the recorded nor the desired digest. + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + set_resource_status( + state, + &schema_addr, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + graph_address.clone(), + "an interrupted schema apply left unexpected graph state; graph-moving work is blocked until repaired", + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + } +} + /// Read-only commands report pending sidecars without acting on them. fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec) { let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR); @@ -5613,6 +5713,135 @@ graphs: ); } + fn write_schema_apply_sidecar( + config_dir: &Path, + graph_id: &str, + desired_schema_digest: &str, + operation_id: &str, + ) -> PathBuf { + let dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join(format!("{operation_id}.json")); + fs::write( + &path, + serde_json::to_string_pretty(&json!({ + "schema_version": 1, + "operation_id": operation_id, + "started_at": "1970-01-01T00:00:00Z", + "kind": "schema_apply", + "graph_id": graph_id, + "graph_uri": derived_graph_uri(config_dir, graph_id), + "desired_schema_digest": desired_schema_digest, + })) + .unwrap(), + ) + .unwrap(); + path + } + + const SCHEMA_V2: &str = "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n"; + + #[tokio::test] + async fn sweep_retires_schema_sidecar_when_ledger_consistent() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); // state digest == live digest + let sidecar = + write_schema_apply_sidecar(dir.path(), "knowledge", "never-applied", "01SROW1"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert!( + state["recovery_records"] + .as_object() + .is_none_or(|records| records.is_empty()) + ); + } + + #[tokio::test] + async fn sweep_rolls_forward_completed_schema_apply() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + // The schema apply completed on the graph out-of-process... + let graph_uri = derived_graph_uri(dir.path(), "knowledge"); + let db = Omnigraph::open(&graph_uri).await.unwrap(); + db.apply_schema(SCHEMA_V2).await.unwrap(); + // ...the desired config matches it, and the sidecar records the intent. + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + let desired = validate_config_dir(dir.path()); + let v2_digest = desired.resource_digests["schema.knowledge"].clone(); + let sidecar = write_schema_apply_sidecar(dir.path(), "knowledge", &v2_digest, "01SROW3"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + v2_digest + ); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["kind"] == "schema_apply" + && record["outcome"] == "rolled_forward") + ); + assert!(out.converged, "{out:?}"); + } + + #[tokio::test] + async fn sweep_flags_unexpected_schema_apply_state_as_pending() { + let dir = fixture(); + init_derived_graph(dir.path()).await; // live = v1 + write_state_resources(dir.path(), &[("schema.knowledge", "stale-digest")]); + // Sidecar intended a digest that is neither live nor recorded. + let sidecar = + write_schema_apply_sidecar(dir.path(), "knowledge", "intended-digest", "01SROW6"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); // warnings only + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_pending") + ); + assert!(sidecar.exists()); + let state = read_state_json(dir.path()); + assert_eq!( + state["resource_statuses"]["schema.knowledge"]["status"], + "drifted" + ); + } + + #[tokio::test] + async fn sweep_keeps_schema_sidecar_for_unopenable_root() { + let dir = fixture(); + write_applyable_state(dir.path()); + let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni"); + fs::create_dir_all(&root).unwrap(); // exists, won't open + let sidecar = + write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SROWX"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); // warning: cannot verify + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_pending") + ); + assert!(sidecar.exists()); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture();