feat(cluster): embed schema migration previews in cluster plan

RFC-004 §D7's data-aware preview: for every schema update, plan opens the
live graph read-only and embeds the engine's migration plan (supported flag
+ typed steps) in the change record; the human renderer prints the steps.
Preview failures (unreachable graph, planner error) degrade to the digest
diff with a schema_preview_unavailable warning — planning never blocks.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 13:04:19 +03:00
parent b313075476
commit ca63a9340b
2 changed files with 126 additions and 2 deletions

View file

@ -804,6 +804,17 @@ fn print_cluster_plan_human(output: &PlanOutput) {
);
for change in &output.changes {
println!(" {:?} {}", change.operation, change.resource);
if let Some(migration) = &change.migration {
if !migration.supported {
println!(" migration UNSUPPORTED:");
}
for step in &migration.steps {
println!(
" {}",
serde_json::to_string(step).unwrap_or_else(|_| format!("{step:?}"))
);
}
}
}
if output.changes.is_empty() {
println!(" no changes");

View file

@ -4,7 +4,8 @@ use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::process;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::db::{Omnigraph, ReadTarget, SchemaApplyOptions};
use omnigraph_compiler::SchemaMigrationPlan;
use omnigraph_compiler::build_catalog;
use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::query::typecheck::typecheck_query_decl;
@ -182,7 +183,7 @@ pub enum ApplyDisposition {
Blocked,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct PlanChange {
pub resource: String,
pub operation: PlanOperation,
@ -194,6 +195,11 @@ pub struct PlanChange {
pub disposition: Option<ApplyDisposition>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
/// For schema updates: the engine's migration plan against the live
/// graph (RFC-004 §D7's data-aware preview). Absent when the preview is
/// unavailable (warning `schema_preview_unavailable`).
#[serde(skip_serializing_if = "Option::is_none")]
pub migration: Option<SchemaMigrationPlan>,
}
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
@ -580,6 +586,40 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
// Plan previews dispositions without sweeping; a pending recovery is
// surfaced as the cluster_recovery_pending warning above instead.
classify_changes(&mut changes, &desired.dependencies, &BTreeSet::new());
// Embed real migration steps for schema updates so plan is a data-aware
// preview; failures degrade to the digest diff with a warning.
for change in &mut changes {
if change.operation != PlanOperation::Update {
continue;
}
let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else {
continue;
};
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
let source_path = desired
.resources
.iter()
.find(|resource| resource.address == change.resource)
.and_then(|resource| resource.path.clone());
let preview = match source_path {
Some(path) => preview_schema_migration(&graph_uri, &path).await,
None => Err("no schema source recorded".to_string()),
};
match preview {
Ok(migration) => change.migration = Some(migration),
Err(err) => diagnostics.push(Diagnostic::warning(
"schema_preview_unavailable",
change.resource.clone(),
format!("could not preview the schema migration: {err}"),
)),
}
}
let blast_radius = compute_blast_radius(&changes, &desired.dependencies);
let approvals_required = compute_approvals(&changes);
let ok = !has_errors(&diagnostics);
@ -2332,6 +2372,23 @@ async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterSt
graph_error_count
}
/// RFC-004 §D7: the data-aware preview — the engine's migration plan for a
/// desired schema against the live graph, computed read-only (no lock).
async fn preview_schema_migration(
graph_uri: &str,
schema_path: &str,
) -> Result<SchemaMigrationPlan, String> {
let source = fs::read_to_string(schema_path).map_err(|err| err.to_string())?;
let db = Omnigraph::open_read_only(graph_uri)
.await
.map_err(|err| err.to_string())?;
let preview = db
.preview_schema_apply_with_options(&source, SchemaApplyOptions::default())
.await
.map_err(|err| err.to_string())?;
Ok(preview.plan)
}
struct LiveGraphObservation {
manifest_version: u64,
schema_digest: String,
@ -2736,6 +2793,7 @@ fn diff_resources(
after_digest: Some(after.clone()),
disposition: None,
reason: None,
migration: None,
}),
Some(before) if before != after => changes.push(PlanChange {
resource: address.clone(),
@ -2744,6 +2802,7 @@ fn diff_resources(
after_digest: Some(after.clone()),
disposition: None,
reason: None,
migration: None,
}),
Some(_) => {}
}
@ -2757,6 +2816,7 @@ fn diff_resources(
after_digest: None,
disposition: None,
reason: None,
migration: None,
});
}
}
@ -5500,6 +5560,59 @@ graphs:
assert_eq!(state["resource_statuses"]["graph.knowledge"]["status"], "error");
}
#[tokio::test]
async fn plan_embeds_migration_preview_for_schema_update() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
fs::write(
dir.path().join("people.pg"),
"\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n",
)
.unwrap();
let out = plan_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
let schema_change = out
.changes
.iter()
.find(|change| change.resource == "schema.knowledge")
.unwrap();
let migration = schema_change.migration.as_ref().expect("preview embedded");
assert!(migration.supported);
assert!(
serde_json::to_string(&migration.steps)
.unwrap()
.contains("add_property"),
"{migration:?}"
);
}
#[tokio::test]
async fn plan_warns_when_preview_unavailable() {
let dir = fixture();
write_applyable_state(dir.path()); // digests recorded, but no live root
fs::write(
dir.path().join("people.pg"),
"\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n",
)
.unwrap();
let out = plan_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
let schema_change = out
.changes
.iter()
.find(|change| change.resource == "schema.knowledge")
.unwrap();
assert!(schema_change.migration.is_none());
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "schema_preview_unavailable")
);
}
#[test]
fn status_warns_on_pending_recovery_sidecar() {
let dir = fixture();