diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 7d09f36..de87309 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -804,6 +804,17 @@ fn print_cluster_plan_human(output: &PlanOutput) { ); for change in &output.changes { println!(" {:?} {}", change.operation, change.resource); + if let Some(migration) = &change.migration { + if !migration.supported { + println!(" migration UNSUPPORTED:"); + } + for step in &migration.steps { + println!( + " {}", + serde_json::to_string(step).unwrap_or_else(|_| format!("{step:?}")) + ); + } + } } if output.changes.is_empty() { println!(" no changes"); diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 7f92a67..af4ac93 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -4,7 +4,8 @@ use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::process; -use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph::db::{Omnigraph, ReadTarget, SchemaApplyOptions}; +use omnigraph_compiler::SchemaMigrationPlan; use omnigraph_compiler::build_catalog; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::query::typecheck::typecheck_query_decl; @@ -182,7 +183,7 @@ pub enum ApplyDisposition { Blocked, } -#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, PartialEq)] pub struct PlanChange { pub resource: String, pub operation: PlanOperation, @@ -194,6 +195,11 @@ pub struct PlanChange { pub disposition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub reason: Option, + /// For schema updates: the engine's migration plan against the live + /// graph (RFC-004 §D7's data-aware preview). Absent when the preview is + /// unavailable (warning `schema_preview_unavailable`). + #[serde(skip_serializing_if = "Option::is_none")] + pub migration: Option, } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] @@ -580,6 +586,40 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { // Plan previews dispositions without sweeping; a pending recovery is // surfaced as the cluster_recovery_pending warning above instead. classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new()); + + // Embed real migration steps for schema updates so plan is a data-aware + // preview; failures degrade to the digest diff with a warning. + for change in &mut changes { + if change.operation != PlanOperation::Update { + continue; + } + let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else { + continue; + }; + let graph_uri = display_path( + &desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ); + let source_path = desired + .resources + .iter() + .find(|resource| resource.address == change.resource) + .and_then(|resource| resource.path.clone()); + let preview = match source_path { + Some(path) => preview_schema_migration(&graph_uri, &path).await, + None => Err("no schema source recorded".to_string()), + }; + match preview { + Ok(migration) => change.migration = Some(migration), + Err(err) => diagnostics.push(Diagnostic::warning( + "schema_preview_unavailable", + change.resource.clone(), + format!("could not preview the schema migration: {err}"), + )), + } + } let blast_radius = compute_blast_radius(&changes, &desired.dependencies); let approvals_required = compute_approvals(&changes); let ok = !has_errors(&diagnostics); @@ -2332,6 +2372,23 @@ async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterSt graph_error_count } +/// RFC-004 §D7: the data-aware preview — the engine's migration plan for a +/// desired schema against the live graph, computed read-only (no lock). +async fn preview_schema_migration( + graph_uri: &str, + schema_path: &str, +) -> Result { + let source = fs::read_to_string(schema_path).map_err(|err| err.to_string())?; + let db = Omnigraph::open_read_only(graph_uri) + .await + .map_err(|err| err.to_string())?; + let preview = db + .preview_schema_apply_with_options(&source, SchemaApplyOptions::default()) + .await + .map_err(|err| err.to_string())?; + Ok(preview.plan) +} + struct LiveGraphObservation { manifest_version: u64, schema_digest: String, @@ -2736,6 +2793,7 @@ fn diff_resources( after_digest: Some(after.clone()), disposition: None, reason: None, + migration: None, }), Some(before) if before != after => changes.push(PlanChange { resource: address.clone(), @@ -2744,6 +2802,7 @@ fn diff_resources( after_digest: Some(after.clone()), disposition: None, reason: None, + migration: None, }), Some(_) => {} } @@ -2757,6 +2816,7 @@ fn diff_resources( after_digest: None, disposition: None, reason: None, + migration: None, }); } } @@ -5500,6 +5560,59 @@ graphs: assert_eq!(state["resource_statuses"]["graph.knowledge"]["status"], "error"); } + #[tokio::test] + async fn plan_embeds_migration_preview_for_schema_update() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + fs::write( + dir.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + ) + .unwrap(); + + let out = plan_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + let schema_change = out + .changes + .iter() + .find(|change| change.resource == "schema.knowledge") + .unwrap(); + let migration = schema_change.migration.as_ref().expect("preview embedded"); + assert!(migration.supported); + assert!( + serde_json::to_string(&migration.steps) + .unwrap() + .contains("add_property"), + "{migration:?}" + ); + } + + #[tokio::test] + async fn plan_warns_when_preview_unavailable() { + let dir = fixture(); + write_applyable_state(dir.path()); // digests recorded, but no live root + fs::write( + dir.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + ) + .unwrap(); + + let out = plan_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + let schema_change = out + .changes + .iter() + .find(|change| change.resource == "schema.knowledge") + .unwrap(); + assert!(schema_change.migration.is_none()); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "schema_preview_unavailable") + ); + } + #[test] fn status_warns_on_pending_recovery_sidecar() { let dir = fixture();