From b313075476a5c49d747b425b6bc6bd245258a93e Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 13:02:12 +0300 Subject: [PATCH 1/6] refactor(cluster): make plan_config_dir async Mechanical conversion ahead of Stage 4B (plan will preview schema migrations against live graphs): signature, CLI dispatch, and test callers. Zero behavior change. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 2 +- crates/omnigraph-cluster/src/lib.rs | 74 ++++++++++++++--------------- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 08c1fab..7d09f36 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -3554,7 +3554,7 @@ async fn main() -> Result<()> { finish_cluster_validate(&output, json)?; } ClusterCommand::Plan { config, json } => { - let output = plan_config_dir(config); + let output = plan_config_dir(config).await; finish_cluster_plan(&output, json)?; } ClusterCommand::Apply { config, json } => { diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 863691c..7f92a67 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -499,7 +499,7 @@ pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { } } -pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { +pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); @@ -3681,10 +3681,10 @@ graphs: ); } - #[test] - fn missing_state_plans_creates() { + #[tokio::test] + async fn missing_state_plans_creates() { let dir = fixture(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.state_observations.state_found); assert!(!out.state_observations.locked); @@ -3698,10 +3698,10 @@ graphs: assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn config_digest_ignores_yaml_comments_and_formatting() { + #[tokio::test] + async fn config_digest_ignores_yaml_comments_and_formatting() { let dir = fixture(); - let first = plan_config_dir(dir.path()); + let first = plan_config_dir(dir.path()).await; assert!(first.ok, "{:?}", first.diagnostics); fs::write( @@ -3724,7 +3724,7 @@ policies: ) .unwrap(); - let second = plan_config_dir(dir.path()); + let second = plan_config_dir(dir.path()).await; assert!(second.ok, "{:?}", second.diagnostics); assert_eq!( first.desired_revision.config_digest, @@ -3732,10 +3732,10 @@ policies: ); } - #[test] - fn existing_state_plans_update_and_delete_deterministically() { + #[tokio::test] + async fn existing_state_plans_update_and_delete_deterministically() { let dir = fixture(); - let first = plan_config_dir(dir.path()); + let first = plan_config_dir(dir.path()).await; let state_dir = dir.path().join("__cluster"); fs::create_dir_all(&state_dir).unwrap(); fs::write( @@ -3755,7 +3755,7 @@ policies: ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); let rendered: Vec<_> = out .changes @@ -3773,8 +3773,8 @@ policies: ); } - #[test] - fn old_minimal_state_json_still_plans_with_default_revision() { + #[tokio::test] + async fn old_minimal_state_json_still_plans_with_default_revision() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -3792,7 +3792,7 @@ policies: ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert_eq!(out.state_observations.state_revision, 0); assert!(out.state_observations.state_cas.is_some()); @@ -4018,12 +4018,12 @@ graphs: assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); } - #[test] - fn plan_succeeds_after_force_unlock() { + #[tokio::test] + async fn plan_succeeds_after_force_unlock() { let dir = fixture(); write_lock_file(dir.path(), "held-lock", "plan"); - let locked = plan_config_dir(dir.path()); + let locked = plan_config_dir(dir.path()).await; assert!(!locked.ok); assert!( locked @@ -4035,12 +4035,12 @@ graphs: let unlocked = force_unlock_config_dir(dir.path(), "held-lock"); assert!(unlocked.ok, "{:?}", unlocked.diagnostics); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); } - #[test] - fn plan_reports_state_cas_revision_and_removes_lock() { + #[tokio::test] + async fn plan_reports_state_cas_revision_and_removes_lock() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -4056,7 +4056,7 @@ graphs: }"#; fs::write(state_dir.join("state.json"), state).unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert_eq!(out.state_observations.state_revision, 7); assert_eq!( @@ -4073,8 +4073,8 @@ graphs: ); } - #[test] - fn existing_lock_makes_plan_fail() { + #[tokio::test] + async fn existing_lock_makes_plan_fail() { let dir = fixture(); let state_dir = dir.path().join(CLUSTER_STATE_DIR); fs::create_dir_all(&state_dir).unwrap(); @@ -4090,7 +4090,7 @@ graphs: ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(!out.ok); assert!(out.state_observations.locked); assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); @@ -4111,8 +4111,8 @@ graphs: })); } - #[test] - fn state_lock_false_bypasses_lock_with_warning() { + #[tokio::test] + async fn state_lock_false_bypasses_lock_with_warning() { let dir = fixture(); fs::write( dir.path().join(CLUSTER_CONFIG_FILE), @@ -4128,7 +4128,7 @@ graphs: ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); assert!(!out.state_observations.locked); assert!(!out.state_observations.lock_acquired); @@ -4153,15 +4153,15 @@ graphs: assert_eq!(out.diagnostics[0].code, "unsupported_state_backend"); } - #[test] - fn external_state_backend_plan_rejected() { + #[tokio::test] + async fn external_state_backend_plan_rejected() { let dir = fixture(); fs::write( dir.path().join(CLUSTER_CONFIG_FILE), "version: 1\nstate:\n backend: s3://bucket/state\ngraphs: {}\n", ) .unwrap(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(!out.ok); assert!( out.diagnostics @@ -4304,7 +4304,7 @@ graphs: assert!(!out.resource_digests.contains_key("graph.knowledge")); assert_eq!(out.observations["graph.knowledge"]["exists"], false); - let plan = plan_config_dir(dir.path()); + let plan = plan_config_dir(dir.path()).await; assert!(plan.ok, "{:?}", plan.diagnostics); assert!(plan.changes.iter().any(|change| { change.resource == "graph.knowledge" && change.operation == PlanOperation::Create @@ -4342,7 +4342,7 @@ graphs: false ); - let plan = plan_config_dir(dir.path()); + let plan = plan_config_dir(dir.path()).await; assert!(plan.ok, "{:?}", plan.diagnostics); assert!(plan.changes.iter().any(|change| { change.resource == "schema.knowledge" && change.operation == PlanOperation::Update @@ -5233,7 +5233,7 @@ graphs: let refresh = refresh_config_dir(dir.path()).await; assert!(refresh.ok, "{:?}", refresh.diagnostics); - let plan = plan_config_dir(dir.path()); + let plan = plan_config_dir(dir.path()).await; let query_change = plan .changes .iter() @@ -5516,10 +5516,10 @@ graphs: ); } - #[test] - fn plan_annotates_apply_dispositions() { + #[tokio::test] + async fn plan_annotates_apply_dispositions() { let dir = fixture(); - let out = plan_config_dir(dir.path()); + let out = plan_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); let by_resource: BTreeMap<&str, &PlanChange> = out .changes From ca63a9340b64bcbe90a8b60e3d90af5794cc616f Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 13:04:19 +0300 Subject: [PATCH 2/6] feat(cluster): embed schema migration previews in cluster plan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RFC-004 §D7's data-aware preview: for every schema update, plan opens the live graph read-only and embeds the engine's migration plan (supported flag + typed steps) in the change record; the human renderer prints the steps. Preview failures (unreachable graph, planner error) degrade to the digest diff with a schema_preview_unavailable warning — planning never blocks. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 11 +++ crates/omnigraph-cluster/src/lib.rs | 117 +++++++++++++++++++++++++++- 2 files changed, 126 insertions(+), 2 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 7d09f36..de87309 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -804,6 +804,17 @@ fn print_cluster_plan_human(output: &PlanOutput) { ); for change in &output.changes { println!(" {:?} {}", change.operation, change.resource); + if let Some(migration) = &change.migration { + if !migration.supported { + println!(" migration UNSUPPORTED:"); + } + for step in &migration.steps { + println!( + " {}", + serde_json::to_string(step).unwrap_or_else(|_| format!("{step:?}")) + ); + } + } } if output.changes.is_empty() { println!(" no changes"); diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 7f92a67..af4ac93 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -4,7 +4,8 @@ use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::process; -use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::db::{Omnigraph, ReadTarget, SchemaApplyOptions}; +use omnigraph_compiler::SchemaMigrationPlan; use omnigraph_compiler::build_catalog; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::query::typecheck::typecheck_query_decl; @@ -182,7 +183,7 @@ pub enum ApplyDisposition { Blocked, } -#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, PartialEq)] pub struct PlanChange { pub resource: String, pub operation: PlanOperation, @@ -194,6 +195,11 @@ pub struct PlanChange { pub disposition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub reason: Option, + /// For schema updates: the engine's migration plan against the live + /// graph (RFC-004 §D7's data-aware preview). Absent when the preview is + /// unavailable (warning `schema_preview_unavailable`). + #[serde(skip_serializing_if = "Option::is_none")] + pub migration: Option, } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] @@ -580,6 +586,40 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new()); + + // Embed real migration steps for schema updates so plan is a data-aware + // preview; failures degrade to the digest diff with a warning. + for change in &mut changes { + if change.operation != PlanOperation::Update { + continue; + } + let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else { + continue; + }; + let graph_uri = display_path( + &desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ); + let source_path = desired + .resources + .iter() + .find(|resource| resource.address == change.resource) + .and_then(|resource| resource.path.clone()); + let preview = match source_path { + Some(path) => preview_schema_migration(&graph_uri, &path).await, + None => Err("no schema source recorded".to_string()), + }; + match preview { + Ok(migration) => change.migration = Some(migration), + Err(err) => diagnostics.push(Diagnostic::warning( + "schema_preview_unavailable", + change.resource.clone(), + format!("could not preview the schema migration: {err}"), + )), + } + } let blast_radius = compute_blast_radius(&changes, &desired.dependencies); let approvals_required = compute_approvals(&changes); let ok = !has_errors(&diagnostics); @@ -2332,6 +2372,23 @@ async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterSt graph_error_count } +/// RFC-004 §D7: the data-aware preview — the engine's migration plan for a +/// desired schema against the live graph, computed read-only (no lock). +async fn preview_schema_migration( + graph_uri: &str, + schema_path: &str, +) -> Result { + let source = fs::read_to_string(schema_path).map_err(|err| err.to_string())?; + let db = Omnigraph::open_read_only(graph_uri) + .await + .map_err(|err| err.to_string())?; + let preview = db + .preview_schema_apply_with_options(&source, SchemaApplyOptions::default()) + .await + .map_err(|err| err.to_string())?; + Ok(preview.plan) +} + struct LiveGraphObservation { manifest_version: u64, schema_digest: String, @@ -2736,6 +2793,7 @@ fn diff_resources( after_digest: Some(after.clone()), disposition: None, reason: None, + migration: None, }), Some(before) if before != after => changes.push(PlanChange { resource: address.clone(), @@ -2744,6 +2802,7 @@ fn diff_resources( after_digest: Some(after.clone()), disposition: None, reason: None, + migration: None, }), Some(_) => {} } @@ -2757,6 +2816,7 @@ fn diff_resources( after_digest: None, disposition: None, reason: None, + migration: None, }); } } @@ -5500,6 +5560,59 @@ graphs: assert_eq!(state["resource_statuses"]["graph.knowledge"]["status"], "error"); } + #[tokio::test] + async fn plan_embeds_migration_preview_for_schema_update() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + fs::write( + dir.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + ) + .unwrap(); + + let out = plan_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + let schema_change = out + .changes + .iter() + .find(|change| change.resource == "schema.knowledge") + .unwrap(); + let migration = schema_change.migration.as_ref().expect("preview embedded"); + assert!(migration.supported); + assert!( + serde_json::to_string(&migration.steps) + .unwrap() + .contains("add_property"), + "{migration:?}" + ); + } + + #[tokio::test] + async fn plan_warns_when_preview_unavailable() { + let dir = fixture(); + write_applyable_state(dir.path()); // digests recorded, but no live root + fs::write( + dir.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + ) + .unwrap(); + + let out = plan_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + let schema_change = out + .changes + .iter() + .find(|change| change.resource == "schema.knowledge") + .unwrap(); + assert!(schema_change.migration.is_none()); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "schema_preview_unavailable") + ); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture(); From 0571c05ebb9a928fe96b6733656751a3e7102d2f Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 13:05:42 +0300 Subject: [PATCH 3/6] feat(cluster): schema-apply recovery sidecar kind and sweep RecoverySidecarKind::SchemaApply with digest-based sweep classification (robust to unrelated manifest movement; version pins stay forensic): ledger-consistent -> sidecar retired (RFC-004 rows 1+2); live digest matches the intended schema, state stale -> roll forward with composite recompute and a recovery_records audit entry (row 3); unverifiable or unexpected digests -> pending, kept, graph-moving work blocked (rows 1-unopenable/6). Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/lib.rs | 231 +++++++++++++++++++++++++++- 1 file changed, 230 insertions(+), 1 deletion(-) diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index af4ac93..f8a56e7 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -450,7 +450,8 @@ struct RecoverySidecar { #[serde(rename_all = "snake_case")] enum RecoverySidecarKind { GraphCreate, - // SchemaApply and GraphDelete arrive with stages 4B/4C. + SchemaApply, + // GraphDelete arrives with stage 4C. } #[derive(Debug, Default)] @@ -2090,6 +2091,9 @@ async fn sweep_recovery_sidecars( RecoverySidecarKind::GraphCreate => { sweep_graph_create_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; } + RecoverySidecarKind::SchemaApply => { + sweep_schema_apply_sidecar(path, sidecar, state, diagnostics, &mut outcome).await; + } } } outcome @@ -2214,6 +2218,102 @@ async fn sweep_graph_create_sidecar( } } +async fn sweep_schema_apply_sidecar( + path: PathBuf, + sidecar: RecoverySidecar, + state: &mut ClusterState, + diagnostics: &mut Vec, + outcome: &mut SweepOutcome, +) { + let graph_address = graph_address(&sidecar.graph_id); + let schema_addr = schema_address(&sidecar.graph_id); + + // Digest-based classification: robust to unrelated manifest movement; + // the sidecar's version pins stay forensic. + let live_digest = match Omnigraph::open_read_only(&sidecar.graph_uri).await { + Ok(db) => sha256_hex(db.schema_source().as_bytes()), + Err(err) => { + // Cannot verify the interrupted operation — refuse to guess. + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + graph_address.clone(), + format!( + "an interrupted schema apply cannot be verified (graph '{}' did not open: {err}); graph-moving work is blocked until repaired", + sidecar.graph_uri + ), + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + return; + } + }; + + let recorded = state + .applied_revision + .resources + .get(&schema_addr) + .map(|resource| resource.digest.clone()); + if recorded.as_deref() == Some(live_digest.as_str()) { + // Ledger consistent with the live graph (the apply never landed, or + // landed and was recorded): the sidecar is stale intent — retire it. + outcome.completed_sidecars.push(path); + } else if live_digest == sidecar.desired_schema_digest { + // RFC-004 §D3 row 3: the schema apply completed on the graph; roll + // the cluster state forward to observable reality. + state.applied_revision.resources.insert( + schema_addr.clone(), + StateResource { + digest: live_digest.clone(), + }, + ); + let query_digests = state_query_digests_for_graph(state, &sidecar.graph_id); + let composite = graph_digest(&sidecar.graph_id, Some(&live_digest), Some(&query_digests)); + state + .applied_revision + .resources + .insert(graph_address.clone(), StateResource { digest: composite }); + set_resource_status_applied(state, &graph_address); + set_resource_status_applied(state, &schema_addr); + state.recovery_records.insert( + sidecar.operation_id.clone(), + json!({ + "kind": "schema_apply", + "graph_id": sidecar.graph_id, + "outcome": "rolled_forward", + "recovered_at": now_rfc3339(), + "actor": sidecar.actor, + }), + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_rolled_forward", + graph_address.clone(), + "an interrupted schema apply had completed on the graph; cluster state was rolled forward to match", + )); + outcome.completed_sidecars.push(path); + } else { + // Row 6: live schema is neither the recorded nor the desired digest. + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + set_resource_status( + state, + &schema_addr, + ResourceLifecycleStatus::Drifted, + "actual_applied_state_pending", + "graph state does not match the interrupted operation; run `cluster refresh` and re-plan", + ); + diagnostics.push(Diagnostic::warning( + "cluster_recovery_pending", + graph_address.clone(), + "an interrupted schema apply left unexpected graph state; graph-moving work is blocked until repaired", + )); + outcome.pending_graphs.insert(sidecar.graph_id.clone()); + } +} + /// Read-only commands report pending sidecars without acting on them. fn warn_pending_recovery_sidecars(config_dir: &Path, diagnostics: &mut Vec) { let recoveries_dir = config_dir.join(CLUSTER_RECOVERIES_DIR); @@ -5613,6 +5713,135 @@ graphs: ); } + fn write_schema_apply_sidecar( + config_dir: &Path, + graph_id: &str, + desired_schema_digest: &str, + operation_id: &str, + ) -> PathBuf { + let dir = config_dir.join(CLUSTER_RECOVERIES_DIR); + fs::create_dir_all(&dir).unwrap(); + let path = dir.join(format!("{operation_id}.json")); + fs::write( + &path, + serde_json::to_string_pretty(&json!({ + "schema_version": 1, + "operation_id": operation_id, + "started_at": "1970-01-01T00:00:00Z", + "kind": "schema_apply", + "graph_id": graph_id, + "graph_uri": derived_graph_uri(config_dir, graph_id), + "desired_schema_digest": desired_schema_digest, + })) + .unwrap(), + ) + .unwrap(); + path + } + + const SCHEMA_V2: &str = "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n"; + + #[tokio::test] + async fn sweep_retires_schema_sidecar_when_ledger_consistent() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); // state digest == live digest + let sidecar = + write_schema_apply_sidecar(dir.path(), "knowledge", "never-applied", "01SROW1"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert!( + state["recovery_records"] + .as_object() + .is_none_or(|records| records.is_empty()) + ); + } + + #[tokio::test] + async fn sweep_rolls_forward_completed_schema_apply() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + // The schema apply completed on the graph out-of-process... + let graph_uri = derived_graph_uri(dir.path(), "knowledge"); + let db = Omnigraph::open(&graph_uri).await.unwrap(); + db.apply_schema(SCHEMA_V2).await.unwrap(); + // ...the desired config matches it, and the sidecar records the intent. + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + let desired = validate_config_dir(dir.path()); + let v2_digest = desired.resource_digests["schema.knowledge"].clone(); + let sidecar = write_schema_apply_sidecar(dir.path(), "knowledge", &v2_digest, "01SROW3"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(!sidecar.exists()); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + v2_digest + ); + assert!( + state["recovery_records"] + .as_object() + .unwrap() + .values() + .any(|record| record["kind"] == "schema_apply" + && record["outcome"] == "rolled_forward") + ); + assert!(out.converged, "{out:?}"); + } + + #[tokio::test] + async fn sweep_flags_unexpected_schema_apply_state_as_pending() { + let dir = fixture(); + init_derived_graph(dir.path()).await; // live = v1 + write_state_resources(dir.path(), &[("schema.knowledge", "stale-digest")]); + // Sidecar intended a digest that is neither live nor recorded. + let sidecar = + write_schema_apply_sidecar(dir.path(), "knowledge", "intended-digest", "01SROW6"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); // warnings only + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_pending") + ); + assert!(sidecar.exists()); + let state = read_state_json(dir.path()); + assert_eq!( + state["resource_statuses"]["schema.knowledge"]["status"], + "drifted" + ); + } + + #[tokio::test] + async fn sweep_keeps_schema_sidecar_for_unopenable_root() { + let dir = fixture(); + write_applyable_state(dir.path()); + let root = dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni"); + fs::create_dir_all(&root).unwrap(); // exists, won't open + let sidecar = + write_schema_apply_sidecar(dir.path(), "knowledge", "whatever", "01SROWX"); + + let out = apply_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); // warning: cannot verify + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_pending") + ); + assert!(sidecar.exists()); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture(); From a1ba4dc413941d7da87477285c3200945929b677 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 13:12:15 +0300 Subject: [PATCH 4/6] feat(cluster): execute schema applies in cluster apply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 4B (RFC-004 §D1/§D5): schema. Update changes classify Applied and execute after graph creates, sequentially and sidecar-fenced — read-write open (the engine's own recovery runs first), pre-op manifest pin recorded, apply_schema_as with allow_data_loss: false (soft drops only; hard drops wait for 4C's approval artifacts), post-op pin rewritten into the sidecar, sidecar retired only after the final state CAS. Queries gated on a same-plan schema update unblock (the migration lands first in the same run); failures — unsupported migrations, lock contention, user branches — surface as schema_apply_failed with the engine's message, demote dependents via the origin-aware demotion helper, and stop further graph-moving work. Schema evolution is now fully cluster-driven (the defer -> manual schema apply -> refresh loop is gone), and out-of-band schema drift is converged back by apply as an ordinary soft migration (axiom 8: drift correction is gated like any change; the recoverable tier needs no approval) — both pinned by reworked e2es. The multi-graph mixed e2e's deferred row is now delete-shaped, pre-staging the 4C surface. Actor: cluster apply accepts the CLI's global --as via the new ApplyOptions / apply_config_dir_with_options (apply_config_dir delegates unchanged); the actor is echoed in ApplyOutput and recorded in sidecars and audit entries, and threads to apply_schema_as so Cedar fires wherever a checker is installed. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 15 +- crates/omnigraph-cli/tests/cli.rs | 196 +++++++------ crates/omnigraph-cluster/src/lib.rs | 419 ++++++++++++++++++++++------ 3 files changed, 450 insertions(+), 180 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index de87309..942bb27 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ - ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, - ValidateOutput, apply_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir, + ApplyOptions, ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, + ValidateOutput, apply_config_dir_with_options, force_unlock_config_dir, import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; @@ -3569,7 +3569,16 @@ async fn main() -> Result<()> { finish_cluster_plan(&output, json)?; } ClusterCommand::Apply { config, json } => { - let output = apply_config_dir(config).await; + // The global --as actor attributes graph-moving operations + // (sidecars, audit entries, engine schema-apply commits). + // Cluster config stays unlayered: no omnigraph.yaml fallback. + let output = apply_config_dir_with_options( + config, + ApplyOptions { + actor: cli.as_actor.clone(), + }, + ) + .await; finish_cluster_apply(&output, json)?; } ClusterCommand::Status { config, json } => { diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 7ab7ca9..1805e29 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -984,7 +984,7 @@ query find_person($name: String) { /// deferred by cluster apply, executed by `omnigraph schema apply` against /// the graph, picked up by `cluster refresh`, and the next apply re-converges. #[test] -fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() { +fn cluster_e2e_schema_change_applied_by_cluster() { let temp = tempdir().unwrap(); write_cluster_config_fixture(temp.path()); init_cluster_derived_graph(temp.path()); @@ -993,7 +993,8 @@ fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() { let apply = cluster_json(temp.path(), "apply"); assert_eq!(apply["converged"], true, "{apply}"); - // Additive schema change: cluster apply must defer it loudly, not act. + // Additive schema change: Stage 4B applies it from the cluster — no + // manual schema apply, no refresh round-trip. fs::write( temp.path().join("people.pg"), r#" @@ -1005,40 +1006,39 @@ node Person { "#, ) .unwrap(); - let deferred = cluster_json(temp.path(), "apply"); - assert_eq!(deferred["ok"], true, "{deferred}"); - assert_eq!(deferred["applied_count"], 0, "{deferred}"); - assert_eq!(deferred["converged"], false, "{deferred}"); + + // Plan previews the real migration steps (RFC-004 §D7). + let plan = cluster_json(temp.path(), "plan"); + let schema_change = change_for(&plan, "schema.knowledge"); + assert_eq!(schema_change["disposition"], "applied", "{plan}"); + let migration = &schema_change["migration"]; + assert_eq!(migration["supported"], true, "{plan}"); assert!( - deferred["diagnostics"] + migration["steps"] .as_array() .unwrap() .iter() - .any(|diagnostic| diagnostic["code"] == "apply_unsupported_change"), - "{deferred}" + .any(|step| step["kind"] == "add_property"), + "{plan}" ); - // The graph-plane tool applies the migration... - output_success( + let evolve = cluster_json(temp.path(), "apply"); + assert_eq!(evolve["ok"], true, "{evolve}"); + assert_eq!(evolve["converged"], true, "{evolve}"); + assert_eq!(change_for(&evolve, "schema.knowledge")["disposition"], "applied"); + + // The live graph carries the new schema; the plan is empty. + let schema_show = output_success( cli() .arg("schema") - .arg("apply") - .arg(temp.path().join("graphs/knowledge.omni")) - .arg("--schema") - .arg(temp.path().join("people.pg")) - .arg("--json"), + .arg("show") + .arg(temp.path().join("graphs/knowledge.omni")), ); - // ...refresh observes it... - let refresh = cluster_json(temp.path(), "refresh"); - assert_eq!(refresh["ok"], true, "{refresh}"); - // ...and the control plane re-converges. - let reconverge = cluster_json(temp.path(), "apply"); - assert_eq!(reconverge["ok"], true, "{reconverge}"); - assert_eq!(reconverge["converged"], true, "{reconverge}"); + assert!(stdout_string(&schema_show).contains("bio"), "live schema updated"); let replan = cluster_json(temp.path(), "plan"); assert!( replan["changes"].as_array().unwrap().is_empty(), - "after schema apply + refresh + apply, the plan must be empty: {replan}" + "one cluster apply converges a schema change: {replan}" ); } @@ -1207,7 +1207,7 @@ fn cluster_e2e_lost_state_reimport_recovers_catalog() { /// the graph (no config change) must surface as drift through refresh, status, /// and plan — and apply must never silently "correct" it. #[test] -fn cluster_e2e_out_of_band_schema_change_surfaces_as_drift() { +fn cluster_e2e_out_of_band_schema_drift_then_apply_converges_it() { let temp = tempdir().unwrap(); write_cluster_config_fixture(temp.path()); init_cluster_derived_graph(temp.path()); @@ -1238,48 +1238,42 @@ node Person { .arg("--json"), ); + // Drift is visible... let refresh = cluster_json(temp.path(), "refresh"); - assert_eq!(refresh["ok"], true, "{refresh}"); assert_eq!( refresh["resource_statuses"]["schema.knowledge"]["status"], "drifted" ); - assert_eq!( - refresh["resource_statuses"]["graph.knowledge"]["status"], - "drifted" - ); - assert_eq!( - refresh["observations"]["graph.knowledge"]["schema_matches_desired"], - false - ); - - let status = cluster_json(temp.path(), "status"); - assert_eq!( - status["resource_statuses"]["schema.knowledge"]["status"], - "drifted" - ); - + // ...the plan proposes converging back to desired, with a migration + // preview (a soft drop of the out-of-band field)... let plan = cluster_json(temp.path(), "plan"); - assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred"); - assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred"); - let live_schema_digest = change_for(&plan, "schema.knowledge")["before_digest"] - .as_str() - .unwrap() - .to_string(); - - let drift_apply = cluster_json(temp.path(), "apply"); - assert_eq!(drift_apply["applied_count"], 0, "{drift_apply}"); - assert_eq!(drift_apply["converged"], false, "{drift_apply}"); - // Apply must not have "corrected" the drift: state still records the LIVE - // schema digest, not the desired one. - let state: serde_json::Value = serde_json::from_str( - &fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(), - ) - .unwrap(); - assert_eq!( - state["applied_revision"]["resources"]["schema.knowledge"]["digest"], - live_schema_digest + let schema_change = change_for(&plan, "schema.knowledge"); + assert_eq!(schema_change["disposition"], "applied", "{plan}"); + assert!( + schema_change["migration"]["steps"] + .as_array() + .unwrap() + .iter() + .any(|step| step["kind"] == "drop_property" && step["mode"] == "soft"), + "{plan}" ); + // ...and apply converges the live schema back (axiom 8: drift correction + // is gated like any change; a soft migration is the recoverable tier). + let converge = cluster_json(temp.path(), "apply"); + assert_eq!(converge["ok"], true, "{converge}"); + assert_eq!(converge["converged"], true, "{converge}"); + let schema_show = output_success( + cli() + .arg("schema") + .arg("show") + .arg(temp.path().join("graphs/knowledge.omni")), + ); + assert!( + !stdout_string(&schema_show).contains("bio"), + "out-of-band field soft-dropped back to desired" + ); + let replan = cluster_json(temp.path(), "plan"); + assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}"); } /// Disaster input fails closed: a destroyed graph root drifts the ledger, @@ -1393,12 +1387,32 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { assert!(temp.path().join("graphs/knowledge.omni").exists()); assert!(temp.path().join("graphs/engineering.omni").exists()); - // Mixed run: a knowledge schema update (4B territory — deferred) gates - // its query update (blocked), while an engineering query update is - // independent (applied) and re-derives its composite. + // Mixed run: a graph REMOVAL (4C territory — deferred) gates its query + // delete (blocked), while a knowledge query update is independent + // (applied) and re-derives its composite. All four dispositions at once. fs::write( - temp.path().join("people.pg"), - "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + temp.path().join("cluster.yaml"), + r#" +version: 1 +metadata: + name: company-brain +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +policies: + shared: + file: ./shared.policy.yaml + applies_to: [knowledge] + cluster_wide: + file: ./cluster_wide.policy.yaml + applies_to: [cluster] +"#, ) .unwrap(); fs::write( @@ -1406,31 +1420,35 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", ) .unwrap(); - fs::write( - temp.path().join("services.gq"), - "\nquery find_service($name: String) {\n match { $s: Service { name: $name } }\n return { $s.name, $s.name }\n}\n", - ) - .unwrap(); let mixed = cluster_json(temp.path(), "apply"); assert_eq!(mixed["ok"], true, "{mixed}"); assert_eq!(mixed["converged"], false, "{mixed}"); - assert_eq!(change_for(&mixed, "schema.knowledge")["disposition"], "deferred"); - assert_eq!(change_for(&mixed, "graph.knowledge")["disposition"], "deferred"); assert_eq!( - change_for(&mixed, "query.knowledge.find_person")["disposition"], - "blocked" + change_for(&mixed, "graph.engineering")["disposition"], + "deferred" ); assert_eq!( - change_for(&mixed, "query.knowledge.find_person")["reason"], - "dependency_not_applied" + change_for(&mixed, "schema.engineering")["disposition"], + "deferred" ); assert_eq!( change_for(&mixed, "query.engineering.find_service")["disposition"], - "applied" + "blocked" ); assert_eq!( - change_for(&mixed, "graph.engineering")["disposition"], + change_for(&mixed, "query.engineering.find_service")["reason"], + "dependency_not_applied" + ); + assert_eq!( + change_for(&mixed, "query.knowledge.find_person")["disposition"], + "applied" + ); + // policy.shared's applies_to narrowed, but its FILE digest is unchanged + // — applies_to lives in cluster.yaml (the config digest), so it is not a + // resource change. + assert_eq!( + change_for(&mixed, "graph.knowledge")["disposition"], "derived" ); // Deterministic ordering: changes sorted by resource address. @@ -1443,27 +1461,7 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { let mut sorted = order.clone(); sorted.sort_unstable(); assert_eq!(order, sorted, "{mixed}"); - - // The graph-plane tool applies the schema; refresh observes; converge. - output_success( - cli() - .arg("schema") - .arg("apply") - .arg(temp.path().join("graphs/knowledge.omni")) - .arg("--schema") - .arg(temp.path().join("people.pg")) - .arg("--json"), - ); - let refresh = cluster_json(temp.path(), "refresh"); - assert_eq!(refresh["ok"], true, "{refresh}"); - let converge = cluster_json(temp.path(), "apply"); - assert_eq!(converge["converged"], true, "{converge}"); - - let final_plan = cluster_json(temp.path(), "plan"); - assert!( - final_plan["changes"].as_array().unwrap().is_empty(), - "{final_plan}" - ); + // Graph deletion cannot converge until stage 4C's approval artifacts. } /// Stage 4A headline: a declared graph is created by `cluster apply` itself — diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index f8a56e7..11ebcd9 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -274,6 +274,8 @@ pub struct ForceUnlockOutput { pub struct ApplyOutput { pub ok: bool, pub config_dir: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub actor: Option, pub desired_revision: DesiredRevision, pub state_observations: StateObservations, /// Every planned change, with `disposition`/`reason` always populated. @@ -651,12 +653,29 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { /// state is the publish point: a failure after payload writes leaves inert /// digest-named blobs and no success acknowledgement; re-running apply is the /// repair. +/// Options for `cluster apply`. `actor` attributes graph-moving operations +/// (recorded in sidecars and audit entries, threaded to the engine's +/// `apply_schema_as` so Cedar enforcement fires wherever a policy checker is +/// installed). +#[derive(Debug, Clone, Default)] +pub struct ApplyOptions { + pub actor: Option, +} + pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { + apply_config_dir_with_options(config_dir, ApplyOptions::default()).await +} + +pub async fn apply_config_dir_with_options( + config_dir: impl AsRef, + options: ApplyOptions, +) -> ApplyOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); let mut observations = backend.observations(); + let actor_for_output = options.actor.clone(); let early_return = |config_dir: String, config_digest: Option, observations: StateObservations, @@ -666,6 +685,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ApplyOutput { ok: !has_errors(&diagnostics), config_dir, + actor: actor_for_output.clone(), desired_revision: DesiredRevision { config_digest, }, @@ -821,18 +841,18 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { }) .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) .collect(); - let mut completed_create_sidecars: Vec = Vec::new(); - let mut failed_graphs: BTreeSet = BTreeSet::new(); - let mut creates_aborted = false; + let mut completed_op_sidecars: Vec = Vec::new(); + let mut failed_graphs: BTreeMap = BTreeMap::new(); + let mut graph_moving_aborted = false; for graph_id in &graph_creates_to_run { - if creates_aborted { + if graph_moving_aborted { // A prior create failed: stop graph-moving work (loud partials). diagnostics.push(Diagnostic::warning( "graph_create_skipped", graph_address(graph_id), "skipped after an earlier graph create failed in this run", )); - failed_graphs.insert(graph_id.clone()); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); continue; } let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) @@ -849,7 +869,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { schema_version: 1, operation_id: Ulid::new().to_string(), started_at: now_rfc3339(), - actor: None, + actor: options.actor.clone(), kind: RecoverySidecarKind::GraphCreate, graph_id: graph_id.clone(), graph_uri: graph_uri.clone(), @@ -862,8 +882,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); - failed_graphs.insert(graph_id.clone()); - creates_aborted = true; + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); + graph_moving_aborted = true; continue; } }; @@ -871,8 +891,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { // Simulated crash before the init: the sidecar stays for the // sweep (row 1: root absent -> intent removed next run). diagnostics.push(diagnostic); - failed_graphs.insert(graph_id.clone()); - creates_aborted = true; + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); + graph_moving_aborted = true; continue; } // Re-read + re-verify the schema source under the lock — the same @@ -911,8 +931,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { Err(diagnostic) => { diagnostics.push(diagnostic); let _ = fs::remove_file(&sidecar_path); // nothing moved - failed_graphs.insert(graph_id.clone()); - creates_aborted = true; + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); + graph_moving_aborted = true; continue; } }; @@ -926,8 +946,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { )); // The sidecar stays: the sweep classifies whether the failed // init left a partial root (row 5) or nothing (row 1). - failed_graphs.insert(graph_id.clone()); - creates_aborted = true; + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); + graph_moving_aborted = true; continue; } } @@ -955,8 +975,174 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { diagnostics, ); } - completed_create_sidecars.push(sidecar_path); + completed_op_sidecars.push(sidecar_path); } + + // Schema applies execute next (RFC-004 §D5): the first cluster operation + // that moves an EXISTING graph manifest, sidecar-fenced the same way. + let schema_updates_to_run: Vec = changes + .iter() + .filter(|change| { + change.disposition == Some(ApplyDisposition::Applied) + && change.operation == PlanOperation::Update + && matches!(resource_kind(&change.resource), ResourceKind::Schema(_)) + }) + .filter_map(|change| change.resource.strip_prefix("schema.").map(str::to_string)) + .collect(); + for graph_id in &schema_updates_to_run { + if graph_moving_aborted { + diagnostics.push(Diagnostic::warning( + "schema_apply_skipped", + schema_address(graph_id), + "skipped after an earlier graph-moving operation failed in this run", + )); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + continue; + } + let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) + else { + continue; + }; + let graph_uri = display_path( + &desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ); + // Read-write open: the engine's own recovery sweep runs here, which + // is exactly what we want before moving its manifest. + let db = match Omnigraph::open(&graph_uri).await { + Ok(db) => db, + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_apply_failed", + schema_address(graph_id), + format!("could not open graph at '{graph_uri}': {err}"), + )); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + }; + let observed_manifest_version = match db.snapshot_of(ReadTarget::branch("main")).await { + Ok(snapshot) => Some(snapshot.version()), + Err(_) => None, + }; + let mut sidecar = RecoverySidecar { + schema_version: 1, + operation_id: Ulid::new().to_string(), + started_at: now_rfc3339(), + actor: options.actor.clone(), + kind: RecoverySidecarKind::SchemaApply, + graph_id: graph_id.clone(), + graph_uri: graph_uri.clone(), + observed_manifest_version, + expected_manifest_version: None, + desired_schema_digest: desired_graph.schema_digest.clone(), + state_cas_base: expected_cas.clone(), + }; + let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + Ok(path) => path, + Err(diagnostic) => { + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + }; + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_schema_apply") { + // Simulated crash before the engine call: the sidecar stays; the + // sweep retires it next run (ledger still consistent with live). + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + // Re-read + digest-verify the desired schema source under the lock. + let schema_source = source_paths + .get(schema_address(graph_id).as_str()) + .ok_or_else(|| { + Diagnostic::error( + "schema_apply_failed", + schema_address(graph_id), + "no schema source recorded for graph", + ) + }) + .and_then(|path| { + fs::read_to_string(Path::new(path)).map_err(|err| { + Diagnostic::error( + "schema_apply_failed", + schema_address(graph_id), + format!("could not read schema source '{path}': {err}"), + ) + }) + }) + .and_then(|source| { + if sha256_hex(source.as_bytes()) == desired_graph.schema_digest { + Ok(source) + } else { + Err(Diagnostic::error( + "resource_content_changed", + schema_address(graph_id), + "schema source changed while apply was running; re-run `cluster apply`", + )) + } + }); + let schema_source = match schema_source { + Ok(source) => source, + Err(diagnostic) => { + diagnostics.push(diagnostic); + let _ = fs::remove_file(&sidecar_path); // nothing moved + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + }; + // Soft drops only: allow_data_loss stays false until the approval + // artifacts of stage 4C exist (RFC-004 §D4). + match db + .apply_schema_as( + &schema_source, + SchemaApplyOptions::default(), + options.actor.as_deref(), + ) + .await + { + Ok(result) => { + sidecar.expected_manifest_version = Some(result.manifest_version); + if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { + diagnostics.push(diagnostic); + } + } + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_apply_failed", + schema_address(graph_id), + format!("schema apply failed on '{graph_uri}': {err}"), + )); + // Sidecar stays; the sweep retires it (live digest unchanged + // == ledger consistent) or flags real movement. + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + } + // Crash point: the manifest moved, the ledger does not record it yet. + // A failure here acknowledges nothing; the sweep rolls forward. + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_schema_apply") { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + completed_op_sidecars.push(sidecar_path); + } + if !failed_graphs.is_empty() { demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies); } @@ -1116,7 +1302,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { for sidecar_path in sweep .completed_sidecars .iter() - .chain(completed_create_sidecars.iter()) + .chain(completed_op_sidecars.iter()) { let _ = fs::remove_file(sidecar_path); } @@ -1148,6 +1334,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ApplyOutput { ok: !has_errors(&diagnostics), config_dir: display_path(&desired.config_dir), + actor: options.actor.clone(), desired_revision: DesiredRevision { config_digest: Some(desired.config_digest), }, @@ -3006,9 +3193,10 @@ fn classify_changes( PlanOperation::Create => { schema_creates.insert(graph); } - // Schema updates (4B) and deletes (4C) are still pending in - // this stage and block dependents. - _ => { + // Schema updates execute in-run before catalog writes (4B) + // and no longer block dependents; deletes (4C) still do. + PlanOperation::Update => {} + PlanOperation::Delete => { schema_pending.insert(graph); } }, @@ -3042,7 +3230,12 @@ fn classify_changes( // Applied with the graph create — the init carries it. (ApplyDisposition::Applied, None) } - PlanOperation::Create if graph_creates.contains(&graph) => { + PlanOperation::Update if !pending_recovery.contains(&graph) => { + // Stage 4B: schema updates execute via the engine's + // schema apply (soft drops only; allow_data_loss is 4C). + (ApplyDisposition::Applied, None) + } + PlanOperation::Create | PlanOperation::Update => { (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) } _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), @@ -3112,13 +3305,20 @@ fn classify_changes( } } -/// After a graph create fails mid-run, every change that depended on that -/// graph (its schema, its queries, policies referencing it) flips from -/// Applied to Blocked so the output and the persisted statuses tell the -/// truth about what this run actually executed. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FailedGraphOrigin { + GraphCreate, + SchemaApply, +} + +/// After a graph-moving operation fails mid-run, every change that depended +/// on that graph flips from Applied to Blocked so the output and the +/// persisted statuses tell the truth about what this run actually executed. +/// The originating change carries the failure code; dependents carry +/// `dependency_not_applied`. fn demote_dependents_of_failed_graphs( changes: &mut [PlanChange], - failed: &BTreeSet, + failed: &BTreeMap, dependencies: &[Dependency], ) { for change in changes.iter_mut() { @@ -3126,11 +3326,17 @@ fn demote_dependents_of_failed_graphs( continue; } let demote_reason = match resource_kind(&change.resource) { - ResourceKind::Graph(graph) if failed.contains(&graph) => Some("graph_create_failed"), - ResourceKind::Schema(graph) if failed.contains(&graph) => { - Some("dependency_not_applied") - } - ResourceKind::Query { graph, .. } if failed.contains(&graph) => { + ResourceKind::Graph(graph) => match failed.get(&graph) { + Some(FailedGraphOrigin::GraphCreate) => Some("graph_create_failed"), + Some(FailedGraphOrigin::SchemaApply) => Some("dependency_not_applied"), + None => None, + }, + ResourceKind::Schema(graph) => match failed.get(&graph) { + Some(FailedGraphOrigin::SchemaApply) => Some("schema_apply_failed"), + Some(FailedGraphOrigin::GraphCreate) => Some("dependency_not_applied"), + None => None, + }, + ResourceKind::Query { graph, .. } if failed.contains_key(&graph) => { Some("dependency_not_applied") } ResourceKind::Policy(_) => { @@ -3139,7 +3345,7 @@ fn demote_dependents_of_failed_graphs( && dep .to .strip_prefix("graph.") - .is_some_and(|graph| failed.contains(graph)) + .is_some_and(|graph| failed.contains_key(graph)) }); blocked.then_some("dependency_not_applied") } @@ -4849,19 +5055,22 @@ graphs: } #[tokio::test] - async fn apply_defers_schema_change_and_blocks_dependent_query() { + async fn apply_schema_update_and_dependent_query_in_one_run() { let dir = fixture(); + init_derived_graph(dir.path()).await; write_applyable_state(dir.path()); - // Change the schema after seeding state: schema.knowledge now differs. + // Schema update + a query update that depends on the new field: one + // apply executes the schema migration first, then the catalog write. + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); fs::write( - dir.path().join("people.pg"), - "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + dir.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name, $p.bio }\n}\n", ) .unwrap(); let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); - assert!(!out.converged); + assert!(out.converged, "{out:?}"); let by_resource: BTreeMap<&str, &PlanChange> = out .changes .iter() @@ -4869,58 +5078,112 @@ graphs: .collect(); assert_eq!( by_resource["schema.knowledge"].disposition, - Some(ApplyDisposition::Deferred) - ); - assert_eq!( - by_resource["graph.knowledge"].disposition, - Some(ApplyDisposition::Deferred) + Some(ApplyDisposition::Applied) ); assert_eq!( by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Applied) + ); + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Derived) + ); + // The live graph carries the new schema. + let db = Omnigraph::open_read_only(&derived_graph_uri(dir.path(), "knowledge")) + .await + .unwrap(); + let desired = validate_config_dir(dir.path()); + assert_eq!( + sha256_hex(db.schema_source().as_bytes()), + desired.resource_digests["schema.knowledge"] + ); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + desired.resource_digests["schema.knowledge"] + ); + // Sidecar retired after the CAS landed. + assert!( + !dir.path().join(CLUSTER_RECOVERIES_DIR).exists() + || fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR)) + .unwrap() + .next() + .is_none() + ); + } + + #[tokio::test] + async fn apply_unsupported_schema_change_fails_loudly() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + // Property type changes are unsupported by the engine planner. + fs::write( + dir.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I64?\n}\n", + ) + .unwrap(); + + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "schema_apply_failed" + && diagnostic.message.contains("changing property type") + })); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["schema.knowledge"].disposition, Some(ApplyDisposition::Blocked) ); assert_eq!( - by_resource["query.knowledge.find_person"].reason.as_deref(), - Some("dependency_not_applied") + by_resource["schema.knowledge"].reason.as_deref(), + Some("schema_apply_failed") ); - // Policy is independent of the schema and still applies. - assert_eq!( - by_resource["policy.base"].disposition, - Some(ApplyDisposition::Applied) - ); - assert!( - out.diagnostics - .iter() - .any(|diagnostic| diagnostic.code == "apply_unsupported_change") - ); - assert!( - out.diagnostics - .iter() - .any(|diagnostic| diagnostic.code == "apply_dependency_blocked") - ); - + // The live schema and the ledger are unchanged. let state = read_state_json(dir.path()); + let desired = validate_config_dir(dir.path()); + assert_ne!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + desired.resource_digests["schema.knowledge"] + ); + // Second run: the sweep retires the stale sidecar (ledger consistent) + // and the run fails just as loudly — idempotent loudness. + let second = apply_config_dir(dir.path()).await; + assert!(!second.ok); + assert!( + second + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "schema_apply_failed") + ); + } + + #[tokio::test] + async fn apply_blocks_schema_update_while_recovery_pending() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_state_resources(dir.path(), &[("schema.knowledge", "stale-digest")]); + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + // A pending sidecar whose intent matches neither live nor recorded. + write_schema_apply_sidecar(dir.path(), "knowledge", "intended-digest", "01PENDS"); + + let out = apply_config_dir(dir.path()).await; + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); assert_eq!( - state["resource_statuses"]["query.knowledge.find_person"]["status"], - "blocked" + by_resource["schema.knowledge"].disposition, + Some(ApplyDisposition::Blocked) ); - // The blocked query wrote no payload and no state digest. - assert!( - state["applied_revision"]["resources"] - .get("query.knowledge.find_person") - .is_none() - ); - assert!( - !dir.path() - .join(CLUSTER_RESOURCES_DIR) - .join("query") - .exists() - ); - // Not converged: the applied config digest must not be claimed. - assert!( - state["applied_revision"] - .get("config_digest") - .is_none_or(serde_json::Value::is_null) + assert_eq!( + by_resource["schema.knowledge"].reason.as_deref(), + Some("cluster_recovery_pending") ); } From 80cae4e8e1cf947b17f57826d6cd43b1bf71c54a Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 13:13:15 +0300 Subject: [PATCH 5/6] test(cluster): failpoint coverage for schema-apply crash windows - Crash before the engine call: sidecar (carrying the --as actor) survives, live schema and ledger untouched, no ack; the next run's sweep retires the stale intent and the same run applies and converges. - Crash after the engine call, before the state CAS: the manifest moved with the post-op pin in the sidecar, state.json byte-identical; the next run's sweep rolls the ledger forward with a schema_apply audit entry and the run converges. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/tests/failpoints.rs | 124 ++++++++++++++++++- 1 file changed, 123 insertions(+), 1 deletion(-) diff --git a/crates/omnigraph-cluster/tests/failpoints.rs b/crates/omnigraph-cluster/tests/failpoints.rs index ec8ddfb..cc91b85 100644 --- a/crates/omnigraph-cluster/tests/failpoints.rs +++ b/crates/omnigraph-cluster/tests/failpoints.rs @@ -14,7 +14,10 @@ use std::path::{Path, PathBuf}; use fail::FailScenario; use omnigraph_cluster::failpoints::ScopedFailPoint; -use omnigraph_cluster::{apply_config_dir, validate_config_dir}; +use omnigraph::db::Omnigraph; +use omnigraph_cluster::{ + ApplyOptions, apply_config_dir, apply_config_dir_with_options, validate_config_dir, +}; use tempfile::tempdir; const SCHEMA: &str = r#" @@ -345,3 +348,122 @@ async fn create_crash_after_init_rolls_state_forward() { ); scenario.teardown(); } + +const SCHEMA_V2: &str = r#" +node Person { + name: String @key + age: I32? + bio: String? +} +"#; + +async fn converge_with_live_graph(dir: &Path) { + let graph_dir = dir.join("graphs"); + fs::create_dir_all(&graph_dir).unwrap(); + Omnigraph::init( + graph_dir.join("knowledge.omni").to_string_lossy().as_ref(), + SCHEMA, + ) + .await + .unwrap(); + seed_applyable_state(dir); + let out = apply_config_dir(dir).await; + assert!(out.ok && out.converged, "{:?}", out.diagnostics); +} + +async fn live_schema_digest(dir: &Path) -> String { + let uri = dir.join("graphs/knowledge.omni"); + let db = Omnigraph::open_read_only(uri.to_string_lossy().as_ref()) + .await + .unwrap(); + use sha2::{Digest, Sha256}; + let digest = Sha256::digest(db.schema_source().as_bytes()); + digest.iter().map(|byte| format!("{byte:02x}")).collect() +} + +/// Crash before the engine schema apply: sidecar (with actor) survives, the +/// live schema and ledger are untouched; the next run's sweep retires the +/// stale intent and the same run applies and converges. +#[tokio::test] +async fn schema_crash_before_apply_recovers_via_sweep() { + let scenario = FailScenario::setup(); + let dir = fixture(); + converge_with_live_graph(dir.path()).await; + let pre_digest = live_schema_digest(dir.path()).await; + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.before_schema_apply", "return"); + let out = apply_config_dir_with_options( + dir.path(), + ApplyOptions { + actor: Some("test-actor".to_string()), + }, + ) + .await; + assert!(!out.ok); + assert_eq!(out.actor.as_deref(), Some("test-actor")); + let sidecars = recovery_sidecars(dir.path()); + assert_eq!(sidecars.len(), 1); + let sidecar: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); + assert_eq!(sidecar["kind"], "schema_apply"); + assert_eq!(sidecar["actor"], "test-actor"); + // Nothing moved. + assert_eq!(live_schema_digest(dir.path()).await, pre_digest); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!(recovered.converged); + assert!(recovery_sidecars(dir.path()).is_empty()); + assert_ne!(live_schema_digest(dir.path()).await, pre_digest); + scenario.teardown(); +} + +/// Crash after the engine schema apply, before the state CAS: the manifest +/// moved, the ledger is stale, nothing acknowledged; the next run's sweep +/// rolls the ledger forward with an audit entry and the run converges. +#[tokio::test] +async fn schema_crash_after_apply_rolls_state_forward() { + let scenario = FailScenario::setup(); + let dir = fixture(); + converge_with_live_graph(dir.path()).await; + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + let state_before = fs::read(state_path(dir.path())).unwrap(); + let desired = validate_config_dir(dir.path()); + let v2_digest = desired.resource_digests["schema.knowledge"].clone(); + + { + let _failpoint = ScopedFailPoint::new("cluster_apply.after_schema_apply", "return"); + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(!out.state_written); + // The live schema moved; the ledger is byte-identical (no ack). + assert_eq!(live_schema_digest(dir.path()).await, v2_digest); + assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before); + let sidecars = recovery_sidecars(dir.path()); + assert_eq!(sidecars.len(), 1); + let sidecar: serde_json::Value = + serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap(); + assert!(sidecar["expected_manifest_version"].is_number(), "{sidecar}"); + } + + let recovered = apply_config_dir(dir.path()).await; + assert!(recovered.ok, "{:?}", recovered.diagnostics); + assert!( + recovered + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward") + ); + assert!(recovered.converged); + assert!(recovery_sidecars(dir.path()).is_empty()); + let state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap(); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + v2_digest + ); + scenario.teardown(); +} From f217352c93d84513dc538e5a560c3dd438decf8a Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 13:14:20 +0300 Subject: [PATCH 6/6] docs(cluster): document Stage 4B schema apply Co-Authored-By: Claude Fable 5 --- docs/dev/testing.md | 2 +- docs/user/cluster-config.md | 57 +++++++++++++++++++++++++++++++------ 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/docs/dev/testing.md b/docs/dev/testing.md index c171f53..5402ccf 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, and Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows) | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), and Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 7ff49e8..9de305a 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -1,6 +1,6 @@ # Cluster Config -**Status:** Stage 4A graph-create apply preview. +**Status:** Stage 4B schema-apply preview. Cluster config is the future control-plane configuration surface for a whole OmniGraph deployment. In this stage, OmniGraph can validate a local @@ -8,11 +8,12 @@ OmniGraph deployment. In this stage, OmniGraph can validate a local local JSON state ledger, explicitly refresh/import graph observations into that ledger, manually remove a held local state lock by exact lock id, and **apply the executable subset of the plan** — stored-query and policy-bundle -catalog writes, and **graph creation**: a declared graph that does not exist -yet is initialized by apply itself at the derived root. It does not change -existing schemas (deferred to a later stage), move existing graph manifests, -start servers, or serve anything it applies: the server still boots from -`omnigraph.yaml`. +catalog writes, **graph creation** (a declared graph that does not exist yet +is initialized by apply at the derived root), and **schema updates**: a +changed schema is migrated on the live graph by apply itself, soft drops +only. It does not delete graphs (a later stage), perform data-loss +migrations, start servers, or serve anything it applies: the server still +boots from `omnigraph.yaml`. ## Commands @@ -156,7 +157,8 @@ condition in `reason`). ## Apply `cluster apply` executes the executable subset of the plan — stored-query and -policy-bundle changes, and graph creates. There is no confirm flag: `cluster plan` is the preview, +policy-bundle changes, graph creates, and schema updates. There is no confirm +flag: `cluster plan` is the preview, and apply recomputes the same diff under the state lock before executing, so a stale preview can never be applied. Apply requires an existing `state.json` (`state_missing` directs you to `cluster import` first). @@ -212,7 +214,46 @@ will execute it, producing an **empty** graph at the root. The data was already lost when the root vanished; the create is visible in the plan (disposition `applied`) before anything runs. -Schema changes to existing graphs are never executed by this stage. They are +### Schema updates + +A `schema.` update (the declared schema differs from what state records) +is executed by apply via the engine's schema-apply, after graph creates and +before catalog writes — so a query change that depends on the new schema +applies in the same run. Each schema apply is sidecar-fenced like a create: +pre-operation manifest version recorded, post-operation version written back, +sidecar retired only after the state update lands; the recovery sweep +classifies survivors by schema digest (consistent ledger → retired; completed +on the graph → state rolled forward with an audit entry; anything else → +`drifted`/`actual_applied_state_pending`, kept). + +Migrations run with **soft drops only** — a removed property disappears from +the current version while prior versions retain the data (reversible until +`cleanup`). Data-loss migrations (`allow_data_loss`) are not reachable from +cluster apply until the approval-artifact stage. Unsupported migrations +(e.g. changing a property's type), engine lock contention, or graphs with +user branches fail loudly as `schema_apply_failed` with the engine's message; +dependent changes are demoted to `blocked` and graph-moving work stops for +the run. + +`cluster plan` previews schema updates with the engine's real migration plan: +each schema change carries a `migration` field (`supported` + typed steps), +and the human output prints the steps. If the live graph cannot be opened the +preview degrades to the digest diff with a `schema_preview_unavailable` +warning. + +**Drift is converged, not just reported.** A schema changed out-of-band on +the live graph shows up as `drifted` after `refresh`, and the next plan +proposes migrating it back to the declared schema — apply executes that like +any other soft migration. Drift correction is gated by the same rules as any +change; nothing about it is hidden (the plan shows the steps, including soft +drops of out-of-band fields). + +**Attribution.** `cluster apply --as ` records the operator identity +in recovery sidecars and audit entries and threads it to the engine's +schema-apply (so commit attribution and Cedar enforcement — wherever a policy +checker is installed — work unchanged). + +Schema deletes (removing a graph) are never executed by this stage. They are reported as `deferred` (warning `apply_unsupported_change`), and query/policy changes that depend on them are `blocked` (warning `apply_dependency_blocked`, status `blocked` in state). A partially-applicable plan still exits 0 with warnings;