From a1ba4dc413941d7da87477285c3200945929b677 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 13:12:15 +0300 Subject: [PATCH] feat(cluster): execute schema applies in cluster apply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stage 4B (RFC-004 §D1/§D5): schema. Update changes classify Applied and execute after graph creates, sequentially and sidecar-fenced — read-write open (the engine's own recovery runs first), pre-op manifest pin recorded, apply_schema_as with allow_data_loss: false (soft drops only; hard drops wait for 4C's approval artifacts), post-op pin rewritten into the sidecar, sidecar retired only after the final state CAS. Queries gated on a same-plan schema update unblock (the migration lands first in the same run); failures — unsupported migrations, lock contention, user branches — surface as schema_apply_failed with the engine's message, demote dependents via the origin-aware demotion helper, and stop further graph-moving work. Schema evolution is now fully cluster-driven (the defer -> manual schema apply -> refresh loop is gone), and out-of-band schema drift is converged back by apply as an ordinary soft migration (axiom 8: drift correction is gated like any change; the recoverable tier needs no approval) — both pinned by reworked e2es. The multi-graph mixed e2e's deferred row is now delete-shaped, pre-staging the 4C surface. Actor: cluster apply accepts the CLI's global --as via the new ApplyOptions / apply_config_dir_with_options (apply_config_dir delegates unchanged); the actor is echoed in ApplyOutput and recorded in sidecars and audit entries, and threads to apply_schema_as so Cedar fires wherever a checker is installed. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 15 +- crates/omnigraph-cli/tests/cli.rs | 196 +++++++------ crates/omnigraph-cluster/src/lib.rs | 419 ++++++++++++++++++++++------ 3 files changed, 450 insertions(+), 180 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index de87309..942bb27 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ - ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, - ValidateOutput, apply_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir, + ApplyOptions, ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, + ValidateOutput, apply_config_dir_with_options, force_unlock_config_dir, import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; @@ -3569,7 +3569,16 @@ async fn main() -> Result<()> { finish_cluster_plan(&output, json)?; } ClusterCommand::Apply { config, json } => { - let output = apply_config_dir(config).await; + // The global --as actor attributes graph-moving operations + // (sidecars, audit entries, engine schema-apply commits). + // Cluster config stays unlayered: no omnigraph.yaml fallback. + let output = apply_config_dir_with_options( + config, + ApplyOptions { + actor: cli.as_actor.clone(), + }, + ) + .await; finish_cluster_apply(&output, json)?; } ClusterCommand::Status { config, json } => { diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 7ab7ca9..1805e29 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -984,7 +984,7 @@ query find_person($name: String) { /// deferred by cluster apply, executed by `omnigraph schema apply` against /// the graph, picked up by `cluster refresh`, and the next apply re-converges. #[test] -fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() { +fn cluster_e2e_schema_change_applied_by_cluster() { let temp = tempdir().unwrap(); write_cluster_config_fixture(temp.path()); init_cluster_derived_graph(temp.path()); @@ -993,7 +993,8 @@ fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() { let apply = cluster_json(temp.path(), "apply"); assert_eq!(apply["converged"], true, "{apply}"); - // Additive schema change: cluster apply must defer it loudly, not act. + // Additive schema change: Stage 4B applies it from the cluster — no + // manual schema apply, no refresh round-trip. fs::write( temp.path().join("people.pg"), r#" @@ -1005,40 +1006,39 @@ node Person { "#, ) .unwrap(); - let deferred = cluster_json(temp.path(), "apply"); - assert_eq!(deferred["ok"], true, "{deferred}"); - assert_eq!(deferred["applied_count"], 0, "{deferred}"); - assert_eq!(deferred["converged"], false, "{deferred}"); + + // Plan previews the real migration steps (RFC-004 §D7). + let plan = cluster_json(temp.path(), "plan"); + let schema_change = change_for(&plan, "schema.knowledge"); + assert_eq!(schema_change["disposition"], "applied", "{plan}"); + let migration = &schema_change["migration"]; + assert_eq!(migration["supported"], true, "{plan}"); assert!( - deferred["diagnostics"] + migration["steps"] .as_array() .unwrap() .iter() - .any(|diagnostic| diagnostic["code"] == "apply_unsupported_change"), - "{deferred}" + .any(|step| step["kind"] == "add_property"), + "{plan}" ); - // The graph-plane tool applies the migration... - output_success( + let evolve = cluster_json(temp.path(), "apply"); + assert_eq!(evolve["ok"], true, "{evolve}"); + assert_eq!(evolve["converged"], true, "{evolve}"); + assert_eq!(change_for(&evolve, "schema.knowledge")["disposition"], "applied"); + + // The live graph carries the new schema; the plan is empty. + let schema_show = output_success( cli() .arg("schema") - .arg("apply") - .arg(temp.path().join("graphs/knowledge.omni")) - .arg("--schema") - .arg(temp.path().join("people.pg")) - .arg("--json"), + .arg("show") + .arg(temp.path().join("graphs/knowledge.omni")), ); - // ...refresh observes it... - let refresh = cluster_json(temp.path(), "refresh"); - assert_eq!(refresh["ok"], true, "{refresh}"); - // ...and the control plane re-converges. - let reconverge = cluster_json(temp.path(), "apply"); - assert_eq!(reconverge["ok"], true, "{reconverge}"); - assert_eq!(reconverge["converged"], true, "{reconverge}"); + assert!(stdout_string(&schema_show).contains("bio"), "live schema updated"); let replan = cluster_json(temp.path(), "plan"); assert!( replan["changes"].as_array().unwrap().is_empty(), - "after schema apply + refresh + apply, the plan must be empty: {replan}" + "one cluster apply converges a schema change: {replan}" ); } @@ -1207,7 +1207,7 @@ fn cluster_e2e_lost_state_reimport_recovers_catalog() { /// the graph (no config change) must surface as drift through refresh, status, /// and plan — and apply must never silently "correct" it. #[test] -fn cluster_e2e_out_of_band_schema_change_surfaces_as_drift() { +fn cluster_e2e_out_of_band_schema_drift_then_apply_converges_it() { let temp = tempdir().unwrap(); write_cluster_config_fixture(temp.path()); init_cluster_derived_graph(temp.path()); @@ -1238,48 +1238,42 @@ node Person { .arg("--json"), ); + // Drift is visible... let refresh = cluster_json(temp.path(), "refresh"); - assert_eq!(refresh["ok"], true, "{refresh}"); assert_eq!( refresh["resource_statuses"]["schema.knowledge"]["status"], "drifted" ); - assert_eq!( - refresh["resource_statuses"]["graph.knowledge"]["status"], - "drifted" - ); - assert_eq!( - refresh["observations"]["graph.knowledge"]["schema_matches_desired"], - false - ); - - let status = cluster_json(temp.path(), "status"); - assert_eq!( - status["resource_statuses"]["schema.knowledge"]["status"], - "drifted" - ); - + // ...the plan proposes converging back to desired, with a migration + // preview (a soft drop of the out-of-band field)... let plan = cluster_json(temp.path(), "plan"); - assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred"); - assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred"); - let live_schema_digest = change_for(&plan, "schema.knowledge")["before_digest"] - .as_str() - .unwrap() - .to_string(); - - let drift_apply = cluster_json(temp.path(), "apply"); - assert_eq!(drift_apply["applied_count"], 0, "{drift_apply}"); - assert_eq!(drift_apply["converged"], false, "{drift_apply}"); - // Apply must not have "corrected" the drift: state still records the LIVE - // schema digest, not the desired one. - let state: serde_json::Value = serde_json::from_str( - &fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(), - ) - .unwrap(); - assert_eq!( - state["applied_revision"]["resources"]["schema.knowledge"]["digest"], - live_schema_digest + let schema_change = change_for(&plan, "schema.knowledge"); + assert_eq!(schema_change["disposition"], "applied", "{plan}"); + assert!( + schema_change["migration"]["steps"] + .as_array() + .unwrap() + .iter() + .any(|step| step["kind"] == "drop_property" && step["mode"] == "soft"), + "{plan}" ); + // ...and apply converges the live schema back (axiom 8: drift correction + // is gated like any change; a soft migration is the recoverable tier). + let converge = cluster_json(temp.path(), "apply"); + assert_eq!(converge["ok"], true, "{converge}"); + assert_eq!(converge["converged"], true, "{converge}"); + let schema_show = output_success( + cli() + .arg("schema") + .arg("show") + .arg(temp.path().join("graphs/knowledge.omni")), + ); + assert!( + !stdout_string(&schema_show).contains("bio"), + "out-of-band field soft-dropped back to desired" + ); + let replan = cluster_json(temp.path(), "plan"); + assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}"); } /// Disaster input fails closed: a destroyed graph root drifts the ledger, @@ -1393,12 +1387,32 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { assert!(temp.path().join("graphs/knowledge.omni").exists()); assert!(temp.path().join("graphs/engineering.omni").exists()); - // Mixed run: a knowledge schema update (4B territory — deferred) gates - // its query update (blocked), while an engineering query update is - // independent (applied) and re-derives its composite. + // Mixed run: a graph REMOVAL (4C territory — deferred) gates its query + // delete (blocked), while a knowledge query update is independent + // (applied) and re-derives its composite. All four dispositions at once. fs::write( - temp.path().join("people.pg"), - "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + temp.path().join("cluster.yaml"), + r#" +version: 1 +metadata: + name: company-brain +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +policies: + shared: + file: ./shared.policy.yaml + applies_to: [knowledge] + cluster_wide: + file: ./cluster_wide.policy.yaml + applies_to: [cluster] +"#, ) .unwrap(); fs::write( @@ -1406,31 +1420,35 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", ) .unwrap(); - fs::write( - temp.path().join("services.gq"), - "\nquery find_service($name: String) {\n match { $s: Service { name: $name } }\n return { $s.name, $s.name }\n}\n", - ) - .unwrap(); let mixed = cluster_json(temp.path(), "apply"); assert_eq!(mixed["ok"], true, "{mixed}"); assert_eq!(mixed["converged"], false, "{mixed}"); - assert_eq!(change_for(&mixed, "schema.knowledge")["disposition"], "deferred"); - assert_eq!(change_for(&mixed, "graph.knowledge")["disposition"], "deferred"); assert_eq!( - change_for(&mixed, "query.knowledge.find_person")["disposition"], - "blocked" + change_for(&mixed, "graph.engineering")["disposition"], + "deferred" ); assert_eq!( - change_for(&mixed, "query.knowledge.find_person")["reason"], - "dependency_not_applied" + change_for(&mixed, "schema.engineering")["disposition"], + "deferred" ); assert_eq!( change_for(&mixed, "query.engineering.find_service")["disposition"], - "applied" + "blocked" ); assert_eq!( - change_for(&mixed, "graph.engineering")["disposition"], + change_for(&mixed, "query.engineering.find_service")["reason"], + "dependency_not_applied" + ); + assert_eq!( + change_for(&mixed, "query.knowledge.find_person")["disposition"], + "applied" + ); + // policy.shared's applies_to narrowed, but its FILE digest is unchanged + // — applies_to lives in cluster.yaml (the config digest), so it is not a + // resource change. + assert_eq!( + change_for(&mixed, "graph.knowledge")["disposition"], "derived" ); // Deterministic ordering: changes sorted by resource address. @@ -1443,27 +1461,7 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { let mut sorted = order.clone(); sorted.sort_unstable(); assert_eq!(order, sorted, "{mixed}"); - - // The graph-plane tool applies the schema; refresh observes; converge. - output_success( - cli() - .arg("schema") - .arg("apply") - .arg(temp.path().join("graphs/knowledge.omni")) - .arg("--schema") - .arg(temp.path().join("people.pg")) - .arg("--json"), - ); - let refresh = cluster_json(temp.path(), "refresh"); - assert_eq!(refresh["ok"], true, "{refresh}"); - let converge = cluster_json(temp.path(), "apply"); - assert_eq!(converge["converged"], true, "{converge}"); - - let final_plan = cluster_json(temp.path(), "plan"); - assert!( - final_plan["changes"].as_array().unwrap().is_empty(), - "{final_plan}" - ); + // Graph deletion cannot converge until stage 4C's approval artifacts. } /// Stage 4A headline: a declared graph is created by `cluster apply` itself — diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index f8a56e7..11ebcd9 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -274,6 +274,8 @@ pub struct ForceUnlockOutput { pub struct ApplyOutput { pub ok: bool, pub config_dir: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub actor: Option, pub desired_revision: DesiredRevision, pub state_observations: StateObservations, /// Every planned change, with `disposition`/`reason` always populated. @@ -651,12 +653,29 @@ pub async fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { /// state is the publish point: a failure after payload writes leaves inert /// digest-named blobs and no success acknowledgement; re-running apply is the /// repair. +/// Options for `cluster apply`. `actor` attributes graph-moving operations +/// (recorded in sidecars and audit entries, threaded to the engine's +/// `apply_schema_as` so Cedar enforcement fires wherever a policy checker is +/// installed). +#[derive(Debug, Clone, Default)] +pub struct ApplyOptions { + pub actor: Option, +} + pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { + apply_config_dir_with_options(config_dir, ApplyOptions::default()).await +} + +pub async fn apply_config_dir_with_options( + config_dir: impl AsRef, + options: ApplyOptions, +) -> ApplyOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; let backend = LocalStateBackend::new(&outcome.config_dir); let mut observations = backend.observations(); + let actor_for_output = options.actor.clone(); let early_return = |config_dir: String, config_digest: Option, observations: StateObservations, @@ -666,6 +685,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ApplyOutput { ok: !has_errors(&diagnostics), config_dir, + actor: actor_for_output.clone(), desired_revision: DesiredRevision { config_digest, }, @@ -821,18 +841,18 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { }) .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string)) .collect(); - let mut completed_create_sidecars: Vec = Vec::new(); - let mut failed_graphs: BTreeSet = BTreeSet::new(); - let mut creates_aborted = false; + let mut completed_op_sidecars: Vec = Vec::new(); + let mut failed_graphs: BTreeMap = BTreeMap::new(); + let mut graph_moving_aborted = false; for graph_id in &graph_creates_to_run { - if creates_aborted { + if graph_moving_aborted { // A prior create failed: stop graph-moving work (loud partials). diagnostics.push(Diagnostic::warning( "graph_create_skipped", graph_address(graph_id), "skipped after an earlier graph create failed in this run", )); - failed_graphs.insert(graph_id.clone()); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); continue; } let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) @@ -849,7 +869,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { schema_version: 1, operation_id: Ulid::new().to_string(), started_at: now_rfc3339(), - actor: None, + actor: options.actor.clone(), kind: RecoverySidecarKind::GraphCreate, graph_id: graph_id.clone(), graph_uri: graph_uri.clone(), @@ -862,8 +882,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { Ok(path) => path, Err(diagnostic) => { diagnostics.push(diagnostic); - failed_graphs.insert(graph_id.clone()); - creates_aborted = true; + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); + graph_moving_aborted = true; continue; } }; @@ -871,8 +891,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { // Simulated crash before the init: the sidecar stays for the // sweep (row 1: root absent -> intent removed next run). diagnostics.push(diagnostic); - failed_graphs.insert(graph_id.clone()); - creates_aborted = true; + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); + graph_moving_aborted = true; continue; } // Re-read + re-verify the schema source under the lock — the same @@ -911,8 +931,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { Err(diagnostic) => { diagnostics.push(diagnostic); let _ = fs::remove_file(&sidecar_path); // nothing moved - failed_graphs.insert(graph_id.clone()); - creates_aborted = true; + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); + graph_moving_aborted = true; continue; } }; @@ -926,8 +946,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { )); // The sidecar stays: the sweep classifies whether the failed // init left a partial root (row 5) or nothing (row 1). - failed_graphs.insert(graph_id.clone()); - creates_aborted = true; + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate); + graph_moving_aborted = true; continue; } } @@ -955,8 +975,174 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { diagnostics, ); } - completed_create_sidecars.push(sidecar_path); + completed_op_sidecars.push(sidecar_path); } + + // Schema applies execute next (RFC-004 §D5): the first cluster operation + // that moves an EXISTING graph manifest, sidecar-fenced the same way. + let schema_updates_to_run: Vec = changes + .iter() + .filter(|change| { + change.disposition == Some(ApplyDisposition::Applied) + && change.operation == PlanOperation::Update + && matches!(resource_kind(&change.resource), ResourceKind::Schema(_)) + }) + .filter_map(|change| change.resource.strip_prefix("schema.").map(str::to_string)) + .collect(); + for graph_id in &schema_updates_to_run { + if graph_moving_aborted { + diagnostics.push(Diagnostic::warning( + "schema_apply_skipped", + schema_address(graph_id), + "skipped after an earlier graph-moving operation failed in this run", + )); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + continue; + } + let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) + else { + continue; + }; + let graph_uri = display_path( + &desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{graph_id}.omni")), + ); + // Read-write open: the engine's own recovery sweep runs here, which + // is exactly what we want before moving its manifest. + let db = match Omnigraph::open(&graph_uri).await { + Ok(db) => db, + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_apply_failed", + schema_address(graph_id), + format!("could not open graph at '{graph_uri}': {err}"), + )); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + }; + let observed_manifest_version = match db.snapshot_of(ReadTarget::branch("main")).await { + Ok(snapshot) => Some(snapshot.version()), + Err(_) => None, + }; + let mut sidecar = RecoverySidecar { + schema_version: 1, + operation_id: Ulid::new().to_string(), + started_at: now_rfc3339(), + actor: options.actor.clone(), + kind: RecoverySidecarKind::SchemaApply, + graph_id: graph_id.clone(), + graph_uri: graph_uri.clone(), + observed_manifest_version, + expected_manifest_version: None, + desired_schema_digest: desired_graph.schema_digest.clone(), + state_cas_base: expected_cas.clone(), + }; + let sidecar_path = match backend.write_recovery_sidecar(&sidecar) { + Ok(path) => path, + Err(diagnostic) => { + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + }; + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_schema_apply") { + // Simulated crash before the engine call: the sidecar stays; the + // sweep retires it next run (ledger still consistent with live). + diagnostics.push(diagnostic); + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + // Re-read + digest-verify the desired schema source under the lock. + let schema_source = source_paths + .get(schema_address(graph_id).as_str()) + .ok_or_else(|| { + Diagnostic::error( + "schema_apply_failed", + schema_address(graph_id), + "no schema source recorded for graph", + ) + }) + .and_then(|path| { + fs::read_to_string(Path::new(path)).map_err(|err| { + Diagnostic::error( + "schema_apply_failed", + schema_address(graph_id), + format!("could not read schema source '{path}': {err}"), + ) + }) + }) + .and_then(|source| { + if sha256_hex(source.as_bytes()) == desired_graph.schema_digest { + Ok(source) + } else { + Err(Diagnostic::error( + "resource_content_changed", + schema_address(graph_id), + "schema source changed while apply was running; re-run `cluster apply`", + )) + } + }); + let schema_source = match schema_source { + Ok(source) => source, + Err(diagnostic) => { + diagnostics.push(diagnostic); + let _ = fs::remove_file(&sidecar_path); // nothing moved + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + }; + // Soft drops only: allow_data_loss stays false until the approval + // artifacts of stage 4C exist (RFC-004 §D4). + match db + .apply_schema_as( + &schema_source, + SchemaApplyOptions::default(), + options.actor.as_deref(), + ) + .await + { + Ok(result) => { + sidecar.expected_manifest_version = Some(result.manifest_version); + if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) { + diagnostics.push(diagnostic); + } + } + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_apply_failed", + schema_address(graph_id), + format!("schema apply failed on '{graph_uri}': {err}"), + )); + // Sidecar stays; the sweep retires it (live digest unchanged + // == ledger consistent) or flags real movement. + failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply); + graph_moving_aborted = true; + continue; + } + } + // Crash point: the manifest moved, the ledger does not record it yet. + // A failure here acknowledges nothing; the sweep rolls forward. + if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_schema_apply") { + diagnostics.push(diagnostic); + return early_return( + display_path(&desired.config_dir), + Some(desired.config_digest), + observations, + changes, + state.resource_statuses, + diagnostics, + ); + } + completed_op_sidecars.push(sidecar_path); + } + if !failed_graphs.is_empty() { demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies); } @@ -1116,7 +1302,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { for sidecar_path in sweep .completed_sidecars .iter() - .chain(completed_create_sidecars.iter()) + .chain(completed_op_sidecars.iter()) { let _ = fs::remove_file(sidecar_path); } @@ -1148,6 +1334,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef) -> ApplyOutput { ApplyOutput { ok: !has_errors(&diagnostics), config_dir: display_path(&desired.config_dir), + actor: options.actor.clone(), desired_revision: DesiredRevision { config_digest: Some(desired.config_digest), }, @@ -3006,9 +3193,10 @@ fn classify_changes( PlanOperation::Create => { schema_creates.insert(graph); } - // Schema updates (4B) and deletes (4C) are still pending in - // this stage and block dependents. - _ => { + // Schema updates execute in-run before catalog writes (4B) + // and no longer block dependents; deletes (4C) still do. + PlanOperation::Update => {} + PlanOperation::Delete => { schema_pending.insert(graph); } }, @@ -3042,7 +3230,12 @@ fn classify_changes( // Applied with the graph create — the init carries it. (ApplyDisposition::Applied, None) } - PlanOperation::Create if graph_creates.contains(&graph) => { + PlanOperation::Update if !pending_recovery.contains(&graph) => { + // Stage 4B: schema updates execute via the engine's + // schema apply (soft drops only; allow_data_loss is 4C). + (ApplyDisposition::Applied, None) + } + PlanOperation::Create | PlanOperation::Update => { (ApplyDisposition::Blocked, Some("cluster_recovery_pending")) } _ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")), @@ -3112,13 +3305,20 @@ fn classify_changes( } } -/// After a graph create fails mid-run, every change that depended on that -/// graph (its schema, its queries, policies referencing it) flips from -/// Applied to Blocked so the output and the persisted statuses tell the -/// truth about what this run actually executed. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FailedGraphOrigin { + GraphCreate, + SchemaApply, +} + +/// After a graph-moving operation fails mid-run, every change that depended +/// on that graph flips from Applied to Blocked so the output and the +/// persisted statuses tell the truth about what this run actually executed. +/// The originating change carries the failure code; dependents carry +/// `dependency_not_applied`. fn demote_dependents_of_failed_graphs( changes: &mut [PlanChange], - failed: &BTreeSet, + failed: &BTreeMap, dependencies: &[Dependency], ) { for change in changes.iter_mut() { @@ -3126,11 +3326,17 @@ fn demote_dependents_of_failed_graphs( continue; } let demote_reason = match resource_kind(&change.resource) { - ResourceKind::Graph(graph) if failed.contains(&graph) => Some("graph_create_failed"), - ResourceKind::Schema(graph) if failed.contains(&graph) => { - Some("dependency_not_applied") - } - ResourceKind::Query { graph, .. } if failed.contains(&graph) => { + ResourceKind::Graph(graph) => match failed.get(&graph) { + Some(FailedGraphOrigin::GraphCreate) => Some("graph_create_failed"), + Some(FailedGraphOrigin::SchemaApply) => Some("dependency_not_applied"), + None => None, + }, + ResourceKind::Schema(graph) => match failed.get(&graph) { + Some(FailedGraphOrigin::SchemaApply) => Some("schema_apply_failed"), + Some(FailedGraphOrigin::GraphCreate) => Some("dependency_not_applied"), + None => None, + }, + ResourceKind::Query { graph, .. } if failed.contains_key(&graph) => { Some("dependency_not_applied") } ResourceKind::Policy(_) => { @@ -3139,7 +3345,7 @@ fn demote_dependents_of_failed_graphs( && dep .to .strip_prefix("graph.") - .is_some_and(|graph| failed.contains(graph)) + .is_some_and(|graph| failed.contains_key(graph)) }); blocked.then_some("dependency_not_applied") } @@ -4849,19 +5055,22 @@ graphs: } #[tokio::test] - async fn apply_defers_schema_change_and_blocks_dependent_query() { + async fn apply_schema_update_and_dependent_query_in_one_run() { let dir = fixture(); + init_derived_graph(dir.path()).await; write_applyable_state(dir.path()); - // Change the schema after seeding state: schema.knowledge now differs. + // Schema update + a query update that depends on the new field: one + // apply executes the schema migration first, then the catalog write. + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); fs::write( - dir.path().join("people.pg"), - "\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n", + dir.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name, $p.bio }\n}\n", ) .unwrap(); let out = apply_config_dir(dir.path()).await; assert!(out.ok, "{:?}", out.diagnostics); - assert!(!out.converged); + assert!(out.converged, "{out:?}"); let by_resource: BTreeMap<&str, &PlanChange> = out .changes .iter() @@ -4869,58 +5078,112 @@ graphs: .collect(); assert_eq!( by_resource["schema.knowledge"].disposition, - Some(ApplyDisposition::Deferred) - ); - assert_eq!( - by_resource["graph.knowledge"].disposition, - Some(ApplyDisposition::Deferred) + Some(ApplyDisposition::Applied) ); assert_eq!( by_resource["query.knowledge.find_person"].disposition, + Some(ApplyDisposition::Applied) + ); + assert_eq!( + by_resource["graph.knowledge"].disposition, + Some(ApplyDisposition::Derived) + ); + // The live graph carries the new schema. + let db = Omnigraph::open_read_only(&derived_graph_uri(dir.path(), "knowledge")) + .await + .unwrap(); + let desired = validate_config_dir(dir.path()); + assert_eq!( + sha256_hex(db.schema_source().as_bytes()), + desired.resource_digests["schema.knowledge"] + ); + let state = read_state_json(dir.path()); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + desired.resource_digests["schema.knowledge"] + ); + // Sidecar retired after the CAS landed. + assert!( + !dir.path().join(CLUSTER_RECOVERIES_DIR).exists() + || fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR)) + .unwrap() + .next() + .is_none() + ); + } + + #[tokio::test] + async fn apply_unsupported_schema_change_fails_loudly() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_applyable_state(dir.path()); + // Property type changes are unsupported by the engine planner. + fs::write( + dir.path().join("people.pg"), + "\nnode Person {\n name: String @key\n age: I64?\n}\n", + ) + .unwrap(); + + let out = apply_config_dir(dir.path()).await; + assert!(!out.ok); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "schema_apply_failed" + && diagnostic.message.contains("changing property type") + })); + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); + assert_eq!( + by_resource["schema.knowledge"].disposition, Some(ApplyDisposition::Blocked) ); assert_eq!( - by_resource["query.knowledge.find_person"].reason.as_deref(), - Some("dependency_not_applied") + by_resource["schema.knowledge"].reason.as_deref(), + Some("schema_apply_failed") ); - // Policy is independent of the schema and still applies. - assert_eq!( - by_resource["policy.base"].disposition, - Some(ApplyDisposition::Applied) - ); - assert!( - out.diagnostics - .iter() - .any(|diagnostic| diagnostic.code == "apply_unsupported_change") - ); - assert!( - out.diagnostics - .iter() - .any(|diagnostic| diagnostic.code == "apply_dependency_blocked") - ); - + // The live schema and the ledger are unchanged. let state = read_state_json(dir.path()); + let desired = validate_config_dir(dir.path()); + assert_ne!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + desired.resource_digests["schema.knowledge"] + ); + // Second run: the sweep retires the stale sidecar (ledger consistent) + // and the run fails just as loudly — idempotent loudness. + let second = apply_config_dir(dir.path()).await; + assert!(!second.ok); + assert!( + second + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "schema_apply_failed") + ); + } + + #[tokio::test] + async fn apply_blocks_schema_update_while_recovery_pending() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + write_state_resources(dir.path(), &[("schema.knowledge", "stale-digest")]); + fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap(); + // A pending sidecar whose intent matches neither live nor recorded. + write_schema_apply_sidecar(dir.path(), "knowledge", "intended-digest", "01PENDS"); + + let out = apply_config_dir(dir.path()).await; + let by_resource: BTreeMap<&str, &PlanChange> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), change)) + .collect(); assert_eq!( - state["resource_statuses"]["query.knowledge.find_person"]["status"], - "blocked" + by_resource["schema.knowledge"].disposition, + Some(ApplyDisposition::Blocked) ); - // The blocked query wrote no payload and no state digest. - assert!( - state["applied_revision"]["resources"] - .get("query.knowledge.find_person") - .is_none() - ); - assert!( - !dir.path() - .join(CLUSTER_RESOURCES_DIR) - .join("query") - .exists() - ); - // Not converged: the applied config digest must not be claimed. - assert!( - state["applied_revision"] - .get("config_digest") - .is_none_or(serde_json::Value::is_null) + assert_eq!( + by_resource["schema.knowledge"].reason.as_deref(), + Some("cluster_recovery_pending") ); }