feat(cluster): execute schema applies in cluster apply

Stage 4B (RFC-004 §D1/§D5): schema.<id> Update changes classify Applied and
execute after graph creates, sequentially and sidecar-fenced — read-write
open (the engine's own recovery runs first), pre-op manifest pin recorded,
apply_schema_as with allow_data_loss: false (soft drops only; hard drops wait
for 4C's approval artifacts), post-op pin rewritten into the sidecar, sidecar
retired only after the final state CAS. Queries gated on a same-plan schema
update unblock (the migration lands first in the same run); failures —
unsupported migrations, lock contention, user branches — surface as
schema_apply_failed with the engine's message, demote dependents via the
origin-aware demotion helper, and stop further graph-moving work.

Schema evolution is now fully cluster-driven (the defer -> manual schema
apply -> refresh loop is gone), and out-of-band schema drift is converged
back by apply as an ordinary soft migration (axiom 8: drift correction is
gated like any change; the recoverable tier needs no approval) — both pinned
by reworked e2es. The multi-graph mixed e2e's deferred row is now
delete-shaped, pre-staging the 4C surface.

Actor: cluster apply accepts the CLI's global --as via the new ApplyOptions /
apply_config_dir_with_options (apply_config_dir delegates unchanged); the
actor is echoed in ApplyOutput and recorded in sidecars and audit entries,
and threads to apply_schema_as so Cedar fires wherever a checker is
installed.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 13:12:15 +03:00
parent 0571c05ebb
commit a1ba4dc413
3 changed files with 450 additions and 180 deletions

View file

@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
use omnigraph_cluster::{
ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, apply_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir,
ApplyOptions, ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, apply_config_dir_with_options, force_unlock_config_dir, import_config_dir, plan_config_dir,
refresh_config_dir, status_config_dir, validate_config_dir,
};
use omnigraph_compiler::query::parser::parse_query;
@ -3569,7 +3569,16 @@ async fn main() -> Result<()> {
finish_cluster_plan(&output, json)?;
}
ClusterCommand::Apply { config, json } => {
let output = apply_config_dir(config).await;
// The global --as actor attributes graph-moving operations
// (sidecars, audit entries, engine schema-apply commits).
// Cluster config stays unlayered: no omnigraph.yaml fallback.
let output = apply_config_dir_with_options(
config,
ApplyOptions {
actor: cli.as_actor.clone(),
},
)
.await;
finish_cluster_apply(&output, json)?;
}
ClusterCommand::Status { config, json } => {

View file

@ -984,7 +984,7 @@ query find_person($name: String) {
/// deferred by cluster apply, executed by `omnigraph schema apply` against
/// the graph, picked up by `cluster refresh`, and the next apply re-converges.
#[test]
fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() {
fn cluster_e2e_schema_change_applied_by_cluster() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
@ -993,7 +993,8 @@ fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() {
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
// Additive schema change: cluster apply must defer it loudly, not act.
// Additive schema change: Stage 4B applies it from the cluster — no
// manual schema apply, no refresh round-trip.
fs::write(
temp.path().join("people.pg"),
r#"
@ -1005,40 +1006,39 @@ node Person {
"#,
)
.unwrap();
let deferred = cluster_json(temp.path(), "apply");
assert_eq!(deferred["ok"], true, "{deferred}");
assert_eq!(deferred["applied_count"], 0, "{deferred}");
assert_eq!(deferred["converged"], false, "{deferred}");
// Plan previews the real migration steps (RFC-004 §D7).
let plan = cluster_json(temp.path(), "plan");
let schema_change = change_for(&plan, "schema.knowledge");
assert_eq!(schema_change["disposition"], "applied", "{plan}");
let migration = &schema_change["migration"];
assert_eq!(migration["supported"], true, "{plan}");
assert!(
deferred["diagnostics"]
migration["steps"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "apply_unsupported_change"),
"{deferred}"
.any(|step| step["kind"] == "add_property"),
"{plan}"
);
// The graph-plane tool applies the migration...
output_success(
let evolve = cluster_json(temp.path(), "apply");
assert_eq!(evolve["ok"], true, "{evolve}");
assert_eq!(evolve["converged"], true, "{evolve}");
assert_eq!(change_for(&evolve, "schema.knowledge")["disposition"], "applied");
// The live graph carries the new schema; the plan is empty.
let schema_show = output_success(
cli()
.arg("schema")
.arg("apply")
.arg(temp.path().join("graphs/knowledge.omni"))
.arg("--schema")
.arg(temp.path().join("people.pg"))
.arg("--json"),
.arg("show")
.arg(temp.path().join("graphs/knowledge.omni")),
);
// ...refresh observes it...
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
// ...and the control plane re-converges.
let reconverge = cluster_json(temp.path(), "apply");
assert_eq!(reconverge["ok"], true, "{reconverge}");
assert_eq!(reconverge["converged"], true, "{reconverge}");
assert!(stdout_string(&schema_show).contains("bio"), "live schema updated");
let replan = cluster_json(temp.path(), "plan");
assert!(
replan["changes"].as_array().unwrap().is_empty(),
"after schema apply + refresh + apply, the plan must be empty: {replan}"
"one cluster apply converges a schema change: {replan}"
);
}
@ -1207,7 +1207,7 @@ fn cluster_e2e_lost_state_reimport_recovers_catalog() {
/// the graph (no config change) must surface as drift through refresh, status,
/// and plan — and apply must never silently "correct" it.
#[test]
fn cluster_e2e_out_of_band_schema_change_surfaces_as_drift() {
fn cluster_e2e_out_of_band_schema_drift_then_apply_converges_it() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
@ -1238,48 +1238,42 @@ node Person {
.arg("--json"),
);
// Drift is visible...
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
assert_eq!(
refresh["resource_statuses"]["schema.knowledge"]["status"],
"drifted"
);
assert_eq!(
refresh["resource_statuses"]["graph.knowledge"]["status"],
"drifted"
);
assert_eq!(
refresh["observations"]["graph.knowledge"]["schema_matches_desired"],
false
);
let status = cluster_json(temp.path(), "status");
assert_eq!(
status["resource_statuses"]["schema.knowledge"]["status"],
"drifted"
);
// ...the plan proposes converging back to desired, with a migration
// preview (a soft drop of the out-of-band field)...
let plan = cluster_json(temp.path(), "plan");
assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred");
assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred");
let live_schema_digest = change_for(&plan, "schema.knowledge")["before_digest"]
.as_str()
.unwrap()
.to_string();
let drift_apply = cluster_json(temp.path(), "apply");
assert_eq!(drift_apply["applied_count"], 0, "{drift_apply}");
assert_eq!(drift_apply["converged"], false, "{drift_apply}");
// Apply must not have "corrected" the drift: state still records the LIVE
// schema digest, not the desired one.
let state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
)
.unwrap();
assert_eq!(
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
live_schema_digest
let schema_change = change_for(&plan, "schema.knowledge");
assert_eq!(schema_change["disposition"], "applied", "{plan}");
assert!(
schema_change["migration"]["steps"]
.as_array()
.unwrap()
.iter()
.any(|step| step["kind"] == "drop_property" && step["mode"] == "soft"),
"{plan}"
);
// ...and apply converges the live schema back (axiom 8: drift correction
// is gated like any change; a soft migration is the recoverable tier).
let converge = cluster_json(temp.path(), "apply");
assert_eq!(converge["ok"], true, "{converge}");
assert_eq!(converge["converged"], true, "{converge}");
let schema_show = output_success(
cli()
.arg("schema")
.arg("show")
.arg(temp.path().join("graphs/knowledge.omni")),
);
assert!(
!stdout_string(&schema_show).contains("bio"),
"out-of-band field soft-dropped back to desired"
);
let replan = cluster_json(temp.path(), "plan");
assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}");
}
/// Disaster input fails closed: a destroyed graph root drifts the ledger,
@ -1393,12 +1387,32 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
assert!(temp.path().join("graphs/knowledge.omni").exists());
assert!(temp.path().join("graphs/engineering.omni").exists());
// Mixed run: a knowledge schema update (4B territory — deferred) gates
// its query update (blocked), while an engineering query update is
// independent (applied) and re-derives its composite.
// Mixed run: a graph REMOVAL (4C territory — deferred) gates its query
// delete (blocked), while a knowledge query update is independent
// (applied) and re-derives its composite. All four dispositions at once.
fs::write(
temp.path().join("people.pg"),
"\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n",
temp.path().join("cluster.yaml"),
r#"
version: 1
metadata:
name: company-brain
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
shared:
file: ./shared.policy.yaml
applies_to: [knowledge]
cluster_wide:
file: ./cluster_wide.policy.yaml
applies_to: [cluster]
"#,
)
.unwrap();
fs::write(
@ -1406,31 +1420,35 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
)
.unwrap();
fs::write(
temp.path().join("services.gq"),
"\nquery find_service($name: String) {\n match { $s: Service { name: $name } }\n return { $s.name, $s.name }\n}\n",
)
.unwrap();
let mixed = cluster_json(temp.path(), "apply");
assert_eq!(mixed["ok"], true, "{mixed}");
assert_eq!(mixed["converged"], false, "{mixed}");
assert_eq!(change_for(&mixed, "schema.knowledge")["disposition"], "deferred");
assert_eq!(change_for(&mixed, "graph.knowledge")["disposition"], "deferred");
assert_eq!(
change_for(&mixed, "query.knowledge.find_person")["disposition"],
"blocked"
change_for(&mixed, "graph.engineering")["disposition"],
"deferred"
);
assert_eq!(
change_for(&mixed, "query.knowledge.find_person")["reason"],
"dependency_not_applied"
change_for(&mixed, "schema.engineering")["disposition"],
"deferred"
);
assert_eq!(
change_for(&mixed, "query.engineering.find_service")["disposition"],
"applied"
"blocked"
);
assert_eq!(
change_for(&mixed, "graph.engineering")["disposition"],
change_for(&mixed, "query.engineering.find_service")["reason"],
"dependency_not_applied"
);
assert_eq!(
change_for(&mixed, "query.knowledge.find_person")["disposition"],
"applied"
);
// policy.shared's applies_to narrowed, but its FILE digest is unchanged
// — applies_to lives in cluster.yaml (the config digest), so it is not a
// resource change.
assert_eq!(
change_for(&mixed, "graph.knowledge")["disposition"],
"derived"
);
// Deterministic ordering: changes sorted by resource address.
@ -1443,27 +1461,7 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
let mut sorted = order.clone();
sorted.sort_unstable();
assert_eq!(order, sorted, "{mixed}");
// The graph-plane tool applies the schema; refresh observes; converge.
output_success(
cli()
.arg("schema")
.arg("apply")
.arg(temp.path().join("graphs/knowledge.omni"))
.arg("--schema")
.arg(temp.path().join("people.pg"))
.arg("--json"),
);
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
let converge = cluster_json(temp.path(), "apply");
assert_eq!(converge["converged"], true, "{converge}");
let final_plan = cluster_json(temp.path(), "plan");
assert!(
final_plan["changes"].as_array().unwrap().is_empty(),
"{final_plan}"
);
// Graph deletion cannot converge until stage 4C's approval artifacts.
}
/// Stage 4A headline: a declared graph is created by `cluster apply` itself —

View file

@ -274,6 +274,8 @@ pub struct ForceUnlockOutput {
pub struct ApplyOutput {
pub ok: bool,
pub config_dir: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub actor: Option<String>,
pub desired_revision: DesiredRevision,
pub state_observations: StateObservations,
/// Every planned change, with `disposition`/`reason` always populated.
@ -651,12 +653,29 @@ pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
/// state is the publish point: a failure after payload writes leaves inert
/// digest-named blobs and no success acknowledgement; re-running apply is the
/// repair.
/// Options for `cluster apply`. `actor` attributes graph-moving operations
/// (recorded in sidecars and audit entries, threaded to the engine's
/// `apply_schema_as` so Cedar enforcement fires wherever a policy checker is
/// installed).
#[derive(Debug, Clone, Default)]
pub struct ApplyOptions {
pub actor: Option<String>,
}
pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
apply_config_dir_with_options(config_dir, ApplyOptions::default()).await
}
pub async fn apply_config_dir_with_options(
config_dir: impl AsRef<Path>,
options: ApplyOptions,
) -> ApplyOutput {
let outcome = load_desired(config_dir.as_ref());
let mut diagnostics = outcome.diagnostics;
let backend = LocalStateBackend::new(&outcome.config_dir);
let mut observations = backend.observations();
let actor_for_output = options.actor.clone();
let early_return = |config_dir: String,
config_digest: Option<String>,
observations: StateObservations,
@ -666,6 +685,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
ApplyOutput {
ok: !has_errors(&diagnostics),
config_dir,
actor: actor_for_output.clone(),
desired_revision: DesiredRevision {
config_digest,
},
@ -821,18 +841,18 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
})
.filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
.collect();
let mut completed_create_sidecars: Vec<PathBuf> = Vec::new();
let mut failed_graphs: BTreeSet<String> = BTreeSet::new();
let mut creates_aborted = false;
let mut completed_op_sidecars: Vec<PathBuf> = Vec::new();
let mut failed_graphs: BTreeMap<String, FailedGraphOrigin> = BTreeMap::new();
let mut graph_moving_aborted = false;
for graph_id in &graph_creates_to_run {
if creates_aborted {
if graph_moving_aborted {
// A prior create failed: stop graph-moving work (loud partials).
diagnostics.push(Diagnostic::warning(
"graph_create_skipped",
graph_address(graph_id),
"skipped after an earlier graph create failed in this run",
));
failed_graphs.insert(graph_id.clone());
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
continue;
}
let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id)
@ -849,7 +869,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
schema_version: 1,
operation_id: Ulid::new().to_string(),
started_at: now_rfc3339(),
actor: None,
actor: options.actor.clone(),
kind: RecoverySidecarKind::GraphCreate,
graph_id: graph_id.clone(),
graph_uri: graph_uri.clone(),
@ -862,8 +882,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
Ok(path) => path,
Err(diagnostic) => {
diagnostics.push(diagnostic);
failed_graphs.insert(graph_id.clone());
creates_aborted = true;
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
graph_moving_aborted = true;
continue;
}
};
@ -871,8 +891,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
// Simulated crash before the init: the sidecar stays for the
// sweep (row 1: root absent -> intent removed next run).
diagnostics.push(diagnostic);
failed_graphs.insert(graph_id.clone());
creates_aborted = true;
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
graph_moving_aborted = true;
continue;
}
// Re-read + re-verify the schema source under the lock — the same
@ -911,8 +931,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
Err(diagnostic) => {
diagnostics.push(diagnostic);
let _ = fs::remove_file(&sidecar_path); // nothing moved
failed_graphs.insert(graph_id.clone());
creates_aborted = true;
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
graph_moving_aborted = true;
continue;
}
};
@ -926,8 +946,8 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
));
// The sidecar stays: the sweep classifies whether the failed
// init left a partial root (row 5) or nothing (row 1).
failed_graphs.insert(graph_id.clone());
creates_aborted = true;
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
graph_moving_aborted = true;
continue;
}
}
@ -955,8 +975,174 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
diagnostics,
);
}
completed_create_sidecars.push(sidecar_path);
completed_op_sidecars.push(sidecar_path);
}
// Schema applies execute next (RFC-004 §D5): the first cluster operation
// that moves an EXISTING graph manifest, sidecar-fenced the same way.
let schema_updates_to_run: Vec<String> = changes
.iter()
.filter(|change| {
change.disposition == Some(ApplyDisposition::Applied)
&& change.operation == PlanOperation::Update
&& matches!(resource_kind(&change.resource), ResourceKind::Schema(_))
})
.filter_map(|change| change.resource.strip_prefix("schema.").map(str::to_string))
.collect();
for graph_id in &schema_updates_to_run {
if graph_moving_aborted {
diagnostics.push(Diagnostic::warning(
"schema_apply_skipped",
schema_address(graph_id),
"skipped after an earlier graph-moving operation failed in this run",
));
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
continue;
}
let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id)
else {
continue;
};
let graph_uri = display_path(
&desired
.config_dir
.join(CLUSTER_GRAPHS_DIR)
.join(format!("{graph_id}.omni")),
);
// Read-write open: the engine's own recovery sweep runs here, which
// is exactly what we want before moving its manifest.
let db = match Omnigraph::open(&graph_uri).await {
Ok(db) => db,
Err(err) => {
diagnostics.push(Diagnostic::error(
"schema_apply_failed",
schema_address(graph_id),
format!("could not open graph at '{graph_uri}': {err}"),
));
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
graph_moving_aborted = true;
continue;
}
};
let observed_manifest_version = match db.snapshot_of(ReadTarget::branch("main")).await {
Ok(snapshot) => Some(snapshot.version()),
Err(_) => None,
};
let mut sidecar = RecoverySidecar {
schema_version: 1,
operation_id: Ulid::new().to_string(),
started_at: now_rfc3339(),
actor: options.actor.clone(),
kind: RecoverySidecarKind::SchemaApply,
graph_id: graph_id.clone(),
graph_uri: graph_uri.clone(),
observed_manifest_version,
expected_manifest_version: None,
desired_schema_digest: desired_graph.schema_digest.clone(),
state_cas_base: expected_cas.clone(),
};
let sidecar_path = match backend.write_recovery_sidecar(&sidecar) {
Ok(path) => path,
Err(diagnostic) => {
diagnostics.push(diagnostic);
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
graph_moving_aborted = true;
continue;
}
};
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.before_schema_apply") {
// Simulated crash before the engine call: the sidecar stays; the
// sweep retires it next run (ledger still consistent with live).
diagnostics.push(diagnostic);
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
graph_moving_aborted = true;
continue;
}
// Re-read + digest-verify the desired schema source under the lock.
let schema_source = source_paths
.get(schema_address(graph_id).as_str())
.ok_or_else(|| {
Diagnostic::error(
"schema_apply_failed",
schema_address(graph_id),
"no schema source recorded for graph",
)
})
.and_then(|path| {
fs::read_to_string(Path::new(path)).map_err(|err| {
Diagnostic::error(
"schema_apply_failed",
schema_address(graph_id),
format!("could not read schema source '{path}': {err}"),
)
})
})
.and_then(|source| {
if sha256_hex(source.as_bytes()) == desired_graph.schema_digest {
Ok(source)
} else {
Err(Diagnostic::error(
"resource_content_changed",
schema_address(graph_id),
"schema source changed while apply was running; re-run `cluster apply`",
))
}
});
let schema_source = match schema_source {
Ok(source) => source,
Err(diagnostic) => {
diagnostics.push(diagnostic);
let _ = fs::remove_file(&sidecar_path); // nothing moved
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
graph_moving_aborted = true;
continue;
}
};
// Soft drops only: allow_data_loss stays false until the approval
// artifacts of stage 4C exist (RFC-004 §D4).
match db
.apply_schema_as(
&schema_source,
SchemaApplyOptions::default(),
options.actor.as_deref(),
)
.await
{
Ok(result) => {
sidecar.expected_manifest_version = Some(result.manifest_version);
if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar) {
diagnostics.push(diagnostic);
}
}
Err(err) => {
diagnostics.push(Diagnostic::error(
"schema_apply_failed",
schema_address(graph_id),
format!("schema apply failed on '{graph_uri}': {err}"),
));
// Sidecar stays; the sweep retires it (live digest unchanged
// == ledger consistent) or flags real movement.
failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
graph_moving_aborted = true;
continue;
}
}
// Crash point: the manifest moved, the ledger does not record it yet.
// A failure here acknowledges nothing; the sweep rolls forward.
if let Err(diagnostic) = failpoints::maybe_fail("cluster_apply.after_schema_apply") {
diagnostics.push(diagnostic);
return early_return(
display_path(&desired.config_dir),
Some(desired.config_digest),
observations,
changes,
state.resource_statuses,
diagnostics,
);
}
completed_op_sidecars.push(sidecar_path);
}
if !failed_graphs.is_empty() {
demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies);
}
@ -1116,7 +1302,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
for sidecar_path in sweep
.completed_sidecars
.iter()
.chain(completed_create_sidecars.iter())
.chain(completed_op_sidecars.iter())
{
let _ = fs::remove_file(sidecar_path);
}
@ -1148,6 +1334,7 @@ pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
ApplyOutput {
ok: !has_errors(&diagnostics),
config_dir: display_path(&desired.config_dir),
actor: options.actor.clone(),
desired_revision: DesiredRevision {
config_digest: Some(desired.config_digest),
},
@ -3006,9 +3193,10 @@ fn classify_changes(
PlanOperation::Create => {
schema_creates.insert(graph);
}
// Schema updates (4B) and deletes (4C) are still pending in
// this stage and block dependents.
_ => {
// Schema updates execute in-run before catalog writes (4B)
// and no longer block dependents; deletes (4C) still do.
PlanOperation::Update => {}
PlanOperation::Delete => {
schema_pending.insert(graph);
}
},
@ -3042,7 +3230,12 @@ fn classify_changes(
// Applied with the graph create — the init carries it.
(ApplyDisposition::Applied, None)
}
PlanOperation::Create if graph_creates.contains(&graph) => {
PlanOperation::Update if !pending_recovery.contains(&graph) => {
// Stage 4B: schema updates execute via the engine's
// schema apply (soft drops only; allow_data_loss is 4C).
(ApplyDisposition::Applied, None)
}
PlanOperation::Create | PlanOperation::Update => {
(ApplyDisposition::Blocked, Some("cluster_recovery_pending"))
}
_ => (ApplyDisposition::Deferred, Some("apply_unsupported_kind")),
@ -3112,13 +3305,20 @@ fn classify_changes(
}
}
/// After a graph create fails mid-run, every change that depended on that
/// graph (its schema, its queries, policies referencing it) flips from
/// Applied to Blocked so the output and the persisted statuses tell the
/// truth about what this run actually executed.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FailedGraphOrigin {
GraphCreate,
SchemaApply,
}
/// After a graph-moving operation fails mid-run, every change that depended
/// on that graph flips from Applied to Blocked so the output and the
/// persisted statuses tell the truth about what this run actually executed.
/// The originating change carries the failure code; dependents carry
/// `dependency_not_applied`.
fn demote_dependents_of_failed_graphs(
changes: &mut [PlanChange],
failed: &BTreeSet<String>,
failed: &BTreeMap<String, FailedGraphOrigin>,
dependencies: &[Dependency],
) {
for change in changes.iter_mut() {
@ -3126,11 +3326,17 @@ fn demote_dependents_of_failed_graphs(
continue;
}
let demote_reason = match resource_kind(&change.resource) {
ResourceKind::Graph(graph) if failed.contains(&graph) => Some("graph_create_failed"),
ResourceKind::Schema(graph) if failed.contains(&graph) => {
Some("dependency_not_applied")
}
ResourceKind::Query { graph, .. } if failed.contains(&graph) => {
ResourceKind::Graph(graph) => match failed.get(&graph) {
Some(FailedGraphOrigin::GraphCreate) => Some("graph_create_failed"),
Some(FailedGraphOrigin::SchemaApply) => Some("dependency_not_applied"),
None => None,
},
ResourceKind::Schema(graph) => match failed.get(&graph) {
Some(FailedGraphOrigin::SchemaApply) => Some("schema_apply_failed"),
Some(FailedGraphOrigin::GraphCreate) => Some("dependency_not_applied"),
None => None,
},
ResourceKind::Query { graph, .. } if failed.contains_key(&graph) => {
Some("dependency_not_applied")
}
ResourceKind::Policy(_) => {
@ -3139,7 +3345,7 @@ fn demote_dependents_of_failed_graphs(
&& dep
.to
.strip_prefix("graph.")
.is_some_and(|graph| failed.contains(graph))
.is_some_and(|graph| failed.contains_key(graph))
});
blocked.then_some("dependency_not_applied")
}
@ -4849,19 +5055,22 @@ graphs:
}
#[tokio::test]
async fn apply_defers_schema_change_and_blocks_dependent_query() {
async fn apply_schema_update_and_dependent_query_in_one_run() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
// Change the schema after seeding state: schema.knowledge now differs.
// Schema update + a query update that depends on the new field: one
// apply executes the schema migration first, then the catalog write.
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
fs::write(
dir.path().join("people.pg"),
"\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n",
dir.path().join("people.gq"),
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name, $p.bio }\n}\n",
)
.unwrap();
let out = apply_config_dir(dir.path()).await;
assert!(out.ok, "{:?}", out.diagnostics);
assert!(!out.converged);
assert!(out.converged, "{out:?}");
let by_resource: BTreeMap<&str, &PlanChange> = out
.changes
.iter()
@ -4869,58 +5078,112 @@ graphs:
.collect();
assert_eq!(
by_resource["schema.knowledge"].disposition,
Some(ApplyDisposition::Deferred)
);
assert_eq!(
by_resource["graph.knowledge"].disposition,
Some(ApplyDisposition::Deferred)
Some(ApplyDisposition::Applied)
);
assert_eq!(
by_resource["query.knowledge.find_person"].disposition,
Some(ApplyDisposition::Applied)
);
assert_eq!(
by_resource["graph.knowledge"].disposition,
Some(ApplyDisposition::Derived)
);
// The live graph carries the new schema.
let db = Omnigraph::open_read_only(&derived_graph_uri(dir.path(), "knowledge"))
.await
.unwrap();
let desired = validate_config_dir(dir.path());
assert_eq!(
sha256_hex(db.schema_source().as_bytes()),
desired.resource_digests["schema.knowledge"]
);
let state = read_state_json(dir.path());
assert_eq!(
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
desired.resource_digests["schema.knowledge"]
);
// Sidecar retired after the CAS landed.
assert!(
!dir.path().join(CLUSTER_RECOVERIES_DIR).exists()
|| fs::read_dir(dir.path().join(CLUSTER_RECOVERIES_DIR))
.unwrap()
.next()
.is_none()
);
}
#[tokio::test]
async fn apply_unsupported_schema_change_fails_loudly() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_applyable_state(dir.path());
// Property type changes are unsupported by the engine planner.
fs::write(
dir.path().join("people.pg"),
"\nnode Person {\n name: String @key\n age: I64?\n}\n",
)
.unwrap();
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(out.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "schema_apply_failed"
&& diagnostic.message.contains("changing property type")
}));
let by_resource: BTreeMap<&str, &PlanChange> = out
.changes
.iter()
.map(|change| (change.resource.as_str(), change))
.collect();
assert_eq!(
by_resource["schema.knowledge"].disposition,
Some(ApplyDisposition::Blocked)
);
assert_eq!(
by_resource["query.knowledge.find_person"].reason.as_deref(),
Some("dependency_not_applied")
by_resource["schema.knowledge"].reason.as_deref(),
Some("schema_apply_failed")
);
// Policy is independent of the schema and still applies.
assert_eq!(
by_resource["policy.base"].disposition,
Some(ApplyDisposition::Applied)
);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "apply_unsupported_change")
);
assert!(
out.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "apply_dependency_blocked")
);
// The live schema and the ledger are unchanged.
let state = read_state_json(dir.path());
let desired = validate_config_dir(dir.path());
assert_ne!(
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
desired.resource_digests["schema.knowledge"]
);
// Second run: the sweep retires the stale sidecar (ledger consistent)
// and the run fails just as loudly — idempotent loudness.
let second = apply_config_dir(dir.path()).await;
assert!(!second.ok);
assert!(
second
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "schema_apply_failed")
);
}
#[tokio::test]
async fn apply_blocks_schema_update_while_recovery_pending() {
let dir = fixture();
init_derived_graph(dir.path()).await;
write_state_resources(dir.path(), &[("schema.knowledge", "stale-digest")]);
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
// A pending sidecar whose intent matches neither live nor recorded.
write_schema_apply_sidecar(dir.path(), "knowledge", "intended-digest", "01PENDS");
let out = apply_config_dir(dir.path()).await;
let by_resource: BTreeMap<&str, &PlanChange> = out
.changes
.iter()
.map(|change| (change.resource.as_str(), change))
.collect();
assert_eq!(
state["resource_statuses"]["query.knowledge.find_person"]["status"],
"blocked"
by_resource["schema.knowledge"].disposition,
Some(ApplyDisposition::Blocked)
);
// The blocked query wrote no payload and no state digest.
assert!(
state["applied_revision"]["resources"]
.get("query.knowledge.find_person")
.is_none()
);
assert!(
!dir.path()
.join(CLUSTER_RESOURCES_DIR)
.join("query")
.exists()
);
// Not converged: the applied config digest must not be claimed.
assert!(
state["applied_revision"]
.get("config_digest")
.is_none_or(serde_json::Value::is_null)
assert_eq!(
by_resource["schema.knowledge"].reason.as_deref(),
Some("cluster_recovery_pending")
);
}