Merge pull request #171 from ModernRelay/feat/cluster-schema-apply-4b

feat(cluster): Stage 4B — cluster-driven schema apply
This commit is contained in:
Andrew Altshuler 2026-06-10 14:03:31 +03:00 committed by GitHub
commit f799d4578c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1017 additions and 231 deletions

View file

@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
use omnigraph_cluster::{
ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, apply_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir,
ApplyOptions, ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, apply_config_dir_with_options, force_unlock_config_dir, import_config_dir, plan_config_dir,
refresh_config_dir, status_config_dir, validate_config_dir,
};
use omnigraph_compiler::query::parser::parse_query;
@ -804,6 +804,17 @@ fn print_cluster_plan_human(output: &PlanOutput) {
);
for change in &output.changes {
println!(" {:?} {}", change.operation, change.resource);
if let Some(migration) = &change.migration {
if !migration.supported {
println!(" migration UNSUPPORTED:");
}
for step in &migration.steps {
println!(
" {}",
serde_json::to_string(step).unwrap_or_else(|_| format!("{step:?}"))
);
}
}
}
if output.changes.is_empty() {
println!(" no changes");
@ -3554,11 +3565,20 @@ async fn main() -> Result<()> {
finish_cluster_validate(&output, json)?;
}
ClusterCommand::Plan { config, json } => {
let output = plan_config_dir(config);
let output = plan_config_dir(config).await;
finish_cluster_plan(&output, json)?;
}
ClusterCommand::Apply { config, json } => {
let output = apply_config_dir(config).await;
// The global --as actor attributes graph-moving operations
// (sidecars, audit entries, engine schema-apply commits).
// Cluster config stays unlayered: no omnigraph.yaml fallback.
let output = apply_config_dir_with_options(
config,
ApplyOptions {
actor: cli.as_actor.clone(),
},
)
.await;
finish_cluster_apply(&output, json)?;
}
ClusterCommand::Status { config, json } => {

View file

@ -984,7 +984,7 @@ query find_person($name: String) {
/// deferred by cluster apply, executed by `omnigraph schema apply` against
/// the graph, picked up by `cluster refresh`, and the next apply re-converges.
#[test]
fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() {
fn cluster_e2e_schema_change_applied_by_cluster() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
@ -993,7 +993,8 @@ fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() {
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
// Additive schema change: cluster apply must defer it loudly, not act.
// Additive schema change: Stage 4B applies it from the cluster — no
// manual schema apply, no refresh round-trip.
fs::write(
temp.path().join("people.pg"),
r#"
@ -1005,40 +1006,39 @@ node Person {
"#,
)
.unwrap();
let deferred = cluster_json(temp.path(), "apply");
assert_eq!(deferred["ok"], true, "{deferred}");
assert_eq!(deferred["applied_count"], 0, "{deferred}");
assert_eq!(deferred["converged"], false, "{deferred}");
// Plan previews the real migration steps (RFC-004 §D7).
let plan = cluster_json(temp.path(), "plan");
let schema_change = change_for(&plan, "schema.knowledge");
assert_eq!(schema_change["disposition"], "applied", "{plan}");
let migration = &schema_change["migration"];
assert_eq!(migration["supported"], true, "{plan}");
assert!(
deferred["diagnostics"]
migration["steps"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "apply_unsupported_change"),
"{deferred}"
.any(|step| step["kind"] == "add_property"),
"{plan}"
);
// The graph-plane tool applies the migration...
output_success(
let evolve = cluster_json(temp.path(), "apply");
assert_eq!(evolve["ok"], true, "{evolve}");
assert_eq!(evolve["converged"], true, "{evolve}");
assert_eq!(change_for(&evolve, "schema.knowledge")["disposition"], "applied");
// The live graph carries the new schema; the plan is empty.
let schema_show = output_success(
cli()
.arg("schema")
.arg("apply")
.arg(temp.path().join("graphs/knowledge.omni"))
.arg("--schema")
.arg(temp.path().join("people.pg"))
.arg("--json"),
.arg("show")
.arg(temp.path().join("graphs/knowledge.omni")),
);
// ...refresh observes it...
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
// ...and the control plane re-converges.
let reconverge = cluster_json(temp.path(), "apply");
assert_eq!(reconverge["ok"], true, "{reconverge}");
assert_eq!(reconverge["converged"], true, "{reconverge}");
assert!(stdout_string(&schema_show).contains("bio"), "live schema updated");
let replan = cluster_json(temp.path(), "plan");
assert!(
replan["changes"].as_array().unwrap().is_empty(),
"after schema apply + refresh + apply, the plan must be empty: {replan}"
"one cluster apply converges a schema change: {replan}"
);
}
@ -1207,7 +1207,7 @@ fn cluster_e2e_lost_state_reimport_recovers_catalog() {
/// the graph (no config change) must surface as drift through refresh, status,
/// and plan — and apply must never silently "correct" it.
#[test]
fn cluster_e2e_out_of_band_schema_change_surfaces_as_drift() {
fn cluster_e2e_out_of_band_schema_drift_then_apply_converges_it() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
@ -1238,48 +1238,42 @@ node Person {
.arg("--json"),
);
// Drift is visible...
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
assert_eq!(
refresh["resource_statuses"]["schema.knowledge"]["status"],
"drifted"
);
assert_eq!(
refresh["resource_statuses"]["graph.knowledge"]["status"],
"drifted"
);
assert_eq!(
refresh["observations"]["graph.knowledge"]["schema_matches_desired"],
false
);
let status = cluster_json(temp.path(), "status");
assert_eq!(
status["resource_statuses"]["schema.knowledge"]["status"],
"drifted"
);
// ...the plan proposes converging back to desired, with a migration
// preview (a soft drop of the out-of-band field)...
let plan = cluster_json(temp.path(), "plan");
assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred");
assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred");
let live_schema_digest = change_for(&plan, "schema.knowledge")["before_digest"]
.as_str()
.unwrap()
.to_string();
let drift_apply = cluster_json(temp.path(), "apply");
assert_eq!(drift_apply["applied_count"], 0, "{drift_apply}");
assert_eq!(drift_apply["converged"], false, "{drift_apply}");
// Apply must not have "corrected" the drift: state still records the LIVE
// schema digest, not the desired one.
let state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
)
.unwrap();
assert_eq!(
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
live_schema_digest
let schema_change = change_for(&plan, "schema.knowledge");
assert_eq!(schema_change["disposition"], "applied", "{plan}");
assert!(
schema_change["migration"]["steps"]
.as_array()
.unwrap()
.iter()
.any(|step| step["kind"] == "drop_property" && step["mode"] == "soft"),
"{plan}"
);
// ...and apply converges the live schema back (axiom 8: drift correction
// is gated like any change; a soft migration is the recoverable tier).
let converge = cluster_json(temp.path(), "apply");
assert_eq!(converge["ok"], true, "{converge}");
assert_eq!(converge["converged"], true, "{converge}");
let schema_show = output_success(
cli()
.arg("schema")
.arg("show")
.arg(temp.path().join("graphs/knowledge.omni")),
);
assert!(
!stdout_string(&schema_show).contains("bio"),
"out-of-band field soft-dropped back to desired"
);
let replan = cluster_json(temp.path(), "plan");
assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}");
}
/// Disaster input fails closed: a destroyed graph root drifts the ledger,
@ -1393,12 +1387,32 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
assert!(temp.path().join("graphs/knowledge.omni").exists());
assert!(temp.path().join("graphs/engineering.omni").exists());
// Mixed run: a knowledge schema update (4B territory — deferred) gates
// its query update (blocked), while an engineering query update is
// independent (applied) and re-derives its composite.
// Mixed run: a graph REMOVAL (4C territory — deferred) gates its query
// delete (blocked), while a knowledge query update is independent
// (applied) and re-derives its composite. All four dispositions at once.
fs::write(
temp.path().join("people.pg"),
"\nnode Person {\n name: String @key\n age: I32?\n bio: String?\n}\n",
temp.path().join("cluster.yaml"),
r#"
version: 1
metadata:
name: company-brain
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
shared:
file: ./shared.policy.yaml
applies_to: [knowledge]
cluster_wide:
file: ./cluster_wide.policy.yaml
applies_to: [cluster]
"#,
)
.unwrap();
fs::write(
@ -1406,31 +1420,35 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
)
.unwrap();
fs::write(
temp.path().join("services.gq"),
"\nquery find_service($name: String) {\n match { $s: Service { name: $name } }\n return { $s.name, $s.name }\n}\n",
)
.unwrap();
let mixed = cluster_json(temp.path(), "apply");
assert_eq!(mixed["ok"], true, "{mixed}");
assert_eq!(mixed["converged"], false, "{mixed}");
assert_eq!(change_for(&mixed, "schema.knowledge")["disposition"], "deferred");
assert_eq!(change_for(&mixed, "graph.knowledge")["disposition"], "deferred");
assert_eq!(
change_for(&mixed, "query.knowledge.find_person")["disposition"],
"blocked"
change_for(&mixed, "graph.engineering")["disposition"],
"deferred"
);
assert_eq!(
change_for(&mixed, "query.knowledge.find_person")["reason"],
"dependency_not_applied"
change_for(&mixed, "schema.engineering")["disposition"],
"deferred"
);
assert_eq!(
change_for(&mixed, "query.engineering.find_service")["disposition"],
"applied"
"blocked"
);
assert_eq!(
change_for(&mixed, "graph.engineering")["disposition"],
change_for(&mixed, "query.engineering.find_service")["reason"],
"dependency_not_applied"
);
assert_eq!(
change_for(&mixed, "query.knowledge.find_person")["disposition"],
"applied"
);
// policy.shared's applies_to narrowed, but its FILE digest is unchanged
// — applies_to lives in cluster.yaml (the config digest), so it is not a
// resource change.
assert_eq!(
change_for(&mixed, "graph.knowledge")["disposition"],
"derived"
);
// Deterministic ordering: changes sorted by resource address.
@ -1443,27 +1461,7 @@ fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
let mut sorted = order.clone();
sorted.sort_unstable();
assert_eq!(order, sorted, "{mixed}");
// The graph-plane tool applies the schema; refresh observes; converge.
output_success(
cli()
.arg("schema")
.arg("apply")
.arg(temp.path().join("graphs/knowledge.omni"))
.arg("--schema")
.arg(temp.path().join("people.pg"))
.arg("--json"),
);
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
let converge = cluster_json(temp.path(), "apply");
assert_eq!(converge["converged"], true, "{converge}");
let final_plan = cluster_json(temp.path(), "plan");
assert!(
final_plan["changes"].as_array().unwrap().is_empty(),
"{final_plan}"
);
// Graph deletion cannot converge until stage 4C's approval artifacts.
}
/// Stage 4A headline: a declared graph is created by `cluster apply` itself —

File diff suppressed because it is too large Load diff

View file

@ -14,7 +14,10 @@ use std::path::{Path, PathBuf};
use fail::FailScenario;
use omnigraph_cluster::failpoints::ScopedFailPoint;
use omnigraph_cluster::{apply_config_dir, validate_config_dir};
use omnigraph::db::Omnigraph;
use omnigraph_cluster::{
ApplyOptions, apply_config_dir, apply_config_dir_with_options, validate_config_dir,
};
use tempfile::tempdir;
const SCHEMA: &str = r#"
@ -345,3 +348,122 @@ async fn create_crash_after_init_rolls_state_forward() {
);
scenario.teardown();
}
const SCHEMA_V2: &str = r#"
node Person {
name: String @key
age: I32?
bio: String?
}
"#;
async fn converge_with_live_graph(dir: &Path) {
let graph_dir = dir.join("graphs");
fs::create_dir_all(&graph_dir).unwrap();
Omnigraph::init(
graph_dir.join("knowledge.omni").to_string_lossy().as_ref(),
SCHEMA,
)
.await
.unwrap();
seed_applyable_state(dir);
let out = apply_config_dir(dir).await;
assert!(out.ok && out.converged, "{:?}", out.diagnostics);
}
async fn live_schema_digest(dir: &Path) -> String {
let uri = dir.join("graphs/knowledge.omni");
let db = Omnigraph::open_read_only(uri.to_string_lossy().as_ref())
.await
.unwrap();
use sha2::{Digest, Sha256};
let digest = Sha256::digest(db.schema_source().as_bytes());
digest.iter().map(|byte| format!("{byte:02x}")).collect()
}
/// Crash before the engine schema apply: sidecar (with actor) survives, the
/// live schema and ledger are untouched; the next run's sweep retires the
/// stale intent and the same run applies and converges.
#[tokio::test]
async fn schema_crash_before_apply_recovers_via_sweep() {
let scenario = FailScenario::setup();
let dir = fixture();
converge_with_live_graph(dir.path()).await;
let pre_digest = live_schema_digest(dir.path()).await;
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_schema_apply", "return");
let out = apply_config_dir_with_options(
dir.path(),
ApplyOptions {
actor: Some("test-actor".to_string()),
},
)
.await;
assert!(!out.ok);
assert_eq!(out.actor.as_deref(), Some("test-actor"));
let sidecars = recovery_sidecars(dir.path());
assert_eq!(sidecars.len(), 1);
let sidecar: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
assert_eq!(sidecar["kind"], "schema_apply");
assert_eq!(sidecar["actor"], "test-actor");
// Nothing moved.
assert_eq!(live_schema_digest(dir.path()).await, pre_digest);
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(recovered.converged);
assert!(recovery_sidecars(dir.path()).is_empty());
assert_ne!(live_schema_digest(dir.path()).await, pre_digest);
scenario.teardown();
}
/// Crash after the engine schema apply, before the state CAS: the manifest
/// moved, the ledger is stale, nothing acknowledged; the next run's sweep
/// rolls the ledger forward with an audit entry and the run converges.
#[tokio::test]
async fn schema_crash_after_apply_rolls_state_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
converge_with_live_graph(dir.path()).await;
fs::write(dir.path().join("people.pg"), SCHEMA_V2).unwrap();
let state_before = fs::read(state_path(dir.path())).unwrap();
let desired = validate_config_dir(dir.path());
let v2_digest = desired.resource_digests["schema.knowledge"].clone();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_schema_apply", "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
// The live schema moved; the ledger is byte-identical (no ack).
assert_eq!(live_schema_digest(dir.path()).await, v2_digest);
assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before);
let sidecars = recovery_sidecars(dir.path());
assert_eq!(sidecars.len(), 1);
let sidecar: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
assert!(sidecar["expected_manifest_version"].is_number(), "{sidecar}");
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(
recovered
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
);
assert!(recovered.converged);
assert!(recovery_sidecars(dir.path()).is_empty());
let state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap();
assert_eq!(
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
v2_digest
);
scenario.teardown();
}

View file

@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
|---|---|---|
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, and Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows) |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), and Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows) |
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) |
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |

View file

@ -1,6 +1,6 @@
# Cluster Config
**Status:** Stage 4A graph-create apply preview.
**Status:** Stage 4B schema-apply preview.
Cluster config is the future control-plane configuration surface for a whole
OmniGraph deployment. In this stage, OmniGraph can validate a local
@ -8,11 +8,12 @@ OmniGraph deployment. In this stage, OmniGraph can validate a local
local JSON state ledger, explicitly refresh/import graph observations into
that ledger, manually remove a held local state lock by exact lock id, and
**apply the executable subset of the plan** — stored-query and policy-bundle
catalog writes, and **graph creation**: a declared graph that does not exist
yet is initialized by apply itself at the derived root. It does not change
existing schemas (deferred to a later stage), move existing graph manifests,
start servers, or serve anything it applies: the server still boots from
`omnigraph.yaml`.
catalog writes, **graph creation** (a declared graph that does not exist yet
is initialized by apply at the derived root), and **schema updates**: a
changed schema is migrated on the live graph by apply itself, soft drops
only. It does not delete graphs (a later stage), perform data-loss
migrations, start servers, or serve anything it applies: the server still
boots from `omnigraph.yaml`.
## Commands
@ -156,7 +157,8 @@ condition in `reason`).
## Apply
`cluster apply` executes the executable subset of the plan — stored-query and
policy-bundle changes, and graph creates. There is no confirm flag: `cluster plan` is the preview,
policy-bundle changes, graph creates, and schema updates. There is no confirm
flag: `cluster plan` is the preview,
and apply recomputes the same diff under the state lock before executing, so a
stale preview can never be applied. Apply requires an existing `state.json`
(`state_missing` directs you to `cluster import` first).
@ -212,7 +214,46 @@ will execute it, producing an **empty** graph at the root. The data was
already lost when the root vanished; the create is visible in the plan
(disposition `applied`) before anything runs.
Schema changes to existing graphs are never executed by this stage. They are
### Schema updates
A `schema.<id>` update (the declared schema differs from what state records)
is executed by apply via the engine's schema-apply, after graph creates and
before catalog writes — so a query change that depends on the new schema
applies in the same run. Each schema apply is sidecar-fenced like a create:
pre-operation manifest version recorded, post-operation version written back,
sidecar retired only after the state update lands; the recovery sweep
classifies survivors by schema digest (consistent ledger → retired; completed
on the graph → state rolled forward with an audit entry; anything else →
`drifted`/`actual_applied_state_pending`, kept).
Migrations run with **soft drops only** — a removed property disappears from
the current version while prior versions retain the data (reversible until
`cleanup`). Data-loss migrations (`allow_data_loss`) are not reachable from
cluster apply until the approval-artifact stage. Unsupported migrations
(e.g. changing a property's type), engine lock contention, or graphs with
user branches fail loudly as `schema_apply_failed` with the engine's message;
dependent changes are demoted to `blocked` and graph-moving work stops for
the run.
`cluster plan` previews schema updates with the engine's real migration plan:
each schema change carries a `migration` field (`supported` + typed steps),
and the human output prints the steps. If the live graph cannot be opened the
preview degrades to the digest diff with a `schema_preview_unavailable`
warning.
**Drift is converged, not just reported.** A schema changed out-of-band on
the live graph shows up as `drifted` after `refresh`, and the next plan
proposes migrating it back to the declared schema — apply executes that like
any other soft migration. Drift correction is gated by the same rules as any
change; nothing about it is hidden (the plan shows the steps, including soft
drops of out-of-band fields).
**Attribution.** `cluster apply --as <actor>` records the operator identity
in recovery sidecars and audit entries and threads it to the engine's
schema-apply (so commit attribution and Cedar enforcement — wherever a policy
checker is installed — work unchanged).
Schema deletes (removing a graph) are never executed by this stage. They are
reported as `deferred` (warning `apply_unsupported_change`), and query/policy
changes that depend on them are `blocked` (warning `apply_dependency_blocked`, status
`blocked` in state). A partially-applicable plan still exits 0 with warnings;