Merge pull request #165 from ModernRelay/feat/cluster-apply-stage3a

feat(cluster): Stage 3A — config-only cluster apply
This commit is contained in:
Andrew Altshuler 2026-06-10 00:52:54 +03:00 committed by GitHub
commit 2d1c25d3fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1693 additions and 20 deletions

View file

@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
use omnigraph_cluster::{
DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, force_unlock_config_dir, import_config_dir, plan_config_dir,
ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, apply_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir,
refresh_config_dir, status_config_dir, validate_config_dir,
};
use omnigraph_compiler::query::parser::parse_query;
@ -361,6 +361,16 @@ enum ClusterCommand {
#[arg(long)]
json: bool,
},
/// Apply the config-only (query/policy) subset of the plan to the local
/// cluster catalog. Graph/schema changes are deferred to a later stage.
Apply {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Read the local JSON state ledger without scanning live graph resources.
Status {
/// Cluster config directory containing cluster.yaml.
@ -804,6 +814,48 @@ fn print_cluster_plan_human(output: &PlanOutput) {
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_apply_human(output: &ApplyOutput) {
if output.ok {
println!(
"cluster apply: {} applied, {} deferred/blocked",
output.applied_count, output.deferred_count
);
} else {
println!("cluster apply failed");
}
// The change list prints on failure too: an operator debugging a partial
// apply (payload or state-write error) needs to see what was attempted.
print_cluster_apply_changes(&output.changes);
if output.ok {
let state = &output.state_observations;
println!(
" state: revision {}, converged: {}, written: {}",
state.state_revision, output.converged, output.state_written
);
println!(" note: applied = recorded in the cluster catalog; the server still boots from omnigraph.yaml");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_apply_changes(changes: &[omnigraph_cluster::PlanChange]) {
for change in changes {
match (&change.disposition, change.reason.as_deref()) {
(Some(disposition), Some(reason)) => println!(
" {:?} {} [{disposition:?}: {reason}]",
change.operation, change.resource
),
(Some(disposition), None) => println!(
" {:?} {} [{disposition:?}]",
change.operation, change.resource
),
_ => println!(" {:?} {}", change.operation, change.resource),
}
}
if changes.is_empty() {
println!(" no changes");
}
}
fn print_cluster_status_human(output: &StatusOutput) {
if output.ok {
let state = &output.state_observations;
@ -935,6 +987,19 @@ fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> {
Ok(())
}
fn finish_cluster_apply(output: &ApplyOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_apply_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
@ -3492,6 +3557,10 @@ async fn main() -> Result<()> {
let output = plan_config_dir(config);
finish_cluster_plan(&output, json)?;
}
ClusterCommand::Apply { config, json } => {
let output = apply_config_dir(config);
finish_cluster_apply(&output, json)?;
}
ClusterCommand::Status { config, json } => {
let output = status_config_dir(config);
finish_cluster_status(&output, json)?;

View file

@ -754,6 +754,325 @@ fn cluster_validate_invalid_config_exits_nonzero() {
assert!(stdout.contains("future_phase_field"), "{stdout}");
}
/// Seed an applyable state: schema digest borrowed from `cluster validate`,
/// graph entry present (composite recomputed by apply), queries/policies
/// pending.
fn write_cluster_applyable_state(root: &std::path::Path) -> serde_json::Value {
let validate = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(root)
.arg("--json"),
));
let schema_digest = validate["resource_digests"]["schema.knowledge"]
.as_str()
.unwrap()
.to_string();
let state_dir = root.join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
format!(
r#"{{
"version": 1,
"state_revision": 1,
"applied_revision": {{
"resources": {{
"graph.knowledge": {{ "digest": "seed" }},
"schema.knowledge": {{ "digest": "{schema_digest}" }}
}}
}}
}}
"#
),
)
.unwrap();
validate
}
#[test]
fn cluster_apply_json_applies_query_and_policy() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let validate = write_cluster_applyable_state(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true, "{json}");
assert_eq!(json["applied_count"], 2, "{json}");
assert_eq!(json["converged"], true, "{json}");
assert_eq!(json["state_written"], true, "{json}");
assert_eq!(
json["resource_statuses"]["query.knowledge.find_person"]["status"],
"applied"
);
let query_digest = validate["resource_digests"]["query.knowledge.find_person"]
.as_str()
.unwrap();
let payload = temp
.path()
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{query_digest}.gq"));
assert!(payload.exists(), "missing payload {}", payload.display());
let state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
)
.unwrap();
assert_eq!(state["state_revision"], 2);
assert_eq!(
state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"],
*query_digest
);
}
#[test]
fn cluster_apply_missing_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_failure(
cli()
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_missing"),
"{json}"
);
assert!(!temp.path().join("__cluster/resources").exists());
}
#[test]
fn cluster_apply_locked_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_applyable_state(temp.path());
write_cluster_lock(temp.path(), "held-lock", "plan");
let output = output_failure(
cli()
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held"),
"{json}"
);
assert!(temp.path().join("__cluster/lock.json").exists());
assert!(!temp.path().join("__cluster/resources").exists());
}
fn cluster_json(root: &std::path::Path, command: &str) -> serde_json::Value {
parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg(command)
.arg("--config")
.arg(root)
.arg("--json"),
))
}
/// End-to-end lifecycle against a REAL derived graph: import observes the live
/// graph, plan/apply converge the query+policy catalog, status reports it,
/// refresh re-observes without un-converging, and a query edit round-trips.
/// This is the composition test — every step passes individually elsewhere;
/// this catches the seams (e.g. refresh and apply recomputing the graph
/// composite digest differently would silently re-open the plan forever).
#[test]
fn cluster_e2e_lifecycle_import_apply_status_refresh_converges() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
assert_eq!(import["state_observations"]["state_revision"], 1);
let plan = cluster_json(temp.path(), "plan");
let changes = plan["changes"].as_array().unwrap();
assert_eq!(changes.len(), 3, "{plan}");
let disposition_of = |resource: &str| {
changes
.iter()
.find(|change| change["resource"] == resource)
.unwrap_or_else(|| panic!("missing change for {resource}: {plan}"))["disposition"]
.clone()
};
assert_eq!(disposition_of("graph.knowledge"), "derived");
assert_eq!(disposition_of("query.knowledge.find_person"), "applied");
assert_eq!(disposition_of("policy.base"), "applied");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["ok"], true, "{apply}");
assert_eq!(apply["applied_count"], 2, "{apply}");
assert_eq!(apply["converged"], true, "{apply}");
let status = cluster_json(temp.path(), "status");
assert_eq!(
status["resource_statuses"]["query.knowledge.find_person"]["status"],
"applied"
);
assert_eq!(status["resource_statuses"]["policy.base"]["status"], "applied");
assert!(
status["state_observations"]["applied_config_digest"].is_string(),
"converged apply must record the applied config digest: {status}"
);
// Refresh re-observes the live graph; it must not undo apply's work.
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
let replan = cluster_json(temp.path(), "plan");
assert!(
replan["changes"].as_array().unwrap().is_empty(),
"refresh after a converged apply must not re-open the plan: {replan}"
);
// A query edit round-trips: plan update -> apply -> converged again.
fs::write(
temp.path().join("people.gq"),
r#"
query find_person($name: String) {
match { $p: Person { name: $name } }
return { $p.name }
}
"#,
)
.unwrap();
let apply_edit = cluster_json(temp.path(), "apply");
assert_eq!(apply_edit["applied_count"], 1, "{apply_edit}");
assert_eq!(apply_edit["converged"], true, "{apply_edit}");
let final_apply = cluster_json(temp.path(), "apply");
assert_eq!(final_apply["state_written"], false, "{final_apply}");
assert!(final_apply["changes"].as_array().unwrap().is_empty());
}
/// The operator workflow across the Stage 3A boundary: a schema change is
/// deferred by cluster apply, executed by `omnigraph schema apply` against
/// the graph, picked up by `cluster refresh`, and the next apply re-converges.
#[test]
fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
// Additive schema change: cluster apply must defer it loudly, not act.
fs::write(
temp.path().join("people.pg"),
r#"
node Person {
name: String @key
age: I32?
bio: String?
}
"#,
)
.unwrap();
let deferred = cluster_json(temp.path(), "apply");
assert_eq!(deferred["ok"], true, "{deferred}");
assert_eq!(deferred["applied_count"], 0, "{deferred}");
assert_eq!(deferred["converged"], false, "{deferred}");
assert!(
deferred["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "apply_unsupported_change"),
"{deferred}"
);
// The graph-plane tool applies the migration...
output_success(
cli()
.arg("schema")
.arg("apply")
.arg(temp.path().join("graphs/knowledge.omni"))
.arg("--schema")
.arg(temp.path().join("people.pg"))
.arg("--json"),
);
// ...refresh observes it...
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
// ...and the control plane re-converges.
let reconverge = cluster_json(temp.path(), "apply");
assert_eq!(reconverge["ok"], true, "{reconverge}");
assert_eq!(reconverge["converged"], true, "{reconverge}");
let replan = cluster_json(temp.path(), "plan");
assert!(
replan["changes"].as_array().unwrap().is_empty(),
"after schema apply + refresh + apply, the plan must be empty: {replan}"
);
}
/// Lock-recovery composition: a held lock refuses apply, force-unlock clears
/// it, and the retried apply converges.
#[test]
fn cluster_e2e_force_unlock_unblocks_apply() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_applyable_state(temp.path());
write_cluster_lock(temp.path(), "stuck-lock", "apply");
let refused = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(refused["ok"], false);
let unlocked = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("force-unlock")
.arg("stuck-lock")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(unlocked["lock_removed"], true, "{unlocked}");
let retried = cluster_json(temp.path(), "apply");
assert_eq!(retried["ok"], true, "{retried}");
assert_eq!(retried["converged"], true, "{retried}");
}
#[test]
fn short_version_flag_prints_current_cli_version() {
let output = output_success(cli().arg("-v"));

File diff suppressed because it is too large Load diff

View file

@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
|---|---|---|
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, and config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply) |
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) |
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |

View file

@ -19,7 +19,7 @@ Top-level command families and subcommands. Graph-targeting commands accept eith
| `commit list \| show` | inspect commit graph |
| `schema plan \| apply \| show (alias: get)` | migrations |
| `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` |
| `cluster validate \| plan \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`; `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock <LOCK_ID>` manually removes a held local state lock by exact id. No apply, graph-resource mutation, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 2C |
| `cluster validate \| plan \| apply \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` and annotates each change with its apply disposition; `apply` executes the config-only (stored-query/policy) subset into the content-addressed local catalog under `__cluster/resources/` — graph/schema changes are deferred loudly, and nothing applied serves traffic (the server still boots from `omnigraph.yaml`); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock <LOCK_ID>` manually removes a held local state lock by exact id. No graph-manifest movement, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 3A |
| `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns or uncovered drift; `--json` reports `skipped`) |
| `repair [--confirm] [--force]` | preview or explicitly publish uncovered manifest/head drift. `--confirm` heals verified maintenance drift and exits non-zero if suspicious/unverifiable drift is refused; `--force --confirm` publishes suspicious/unverifiable drift after operator review |
| `cleanup --keep N --older-than 7d --confirm` | destructive version GC |
@ -78,6 +78,7 @@ policy:
```bash
omnigraph cluster validate --config ./company-brain
omnigraph cluster plan --config ./company-brain --json
omnigraph cluster apply --config ./company-brain --json
omnigraph cluster status --config ./company-brain --json
omnigraph cluster refresh --config ./company-brain --json
omnigraph cluster import --config ./company-brain --json
@ -85,16 +86,20 @@ omnigraph cluster force-unlock <LOCK_ID> --config ./company-brain --json
```
`--config` is a directory containing `cluster.yaml`; it defaults to `.`.
Stage 2C accepts graphs, schemas, stored queries, and policy bundle file
Stage 3A accepts graphs, schemas, stored queries, and policy bundle file
references. `cluster plan` reads local JSON state from
`<config-dir>/__cluster/state.json`; a missing file means empty state. Plan,
refresh, and import acquire `__cluster/lock.json` by default and release it
before returning. `cluster status` reads state only and reports any existing
apply, refresh, and import acquire `__cluster/lock.json` by default and release
it before returning. `cluster apply` executes only stored-query/policy catalog
writes (content-addressed under `__cluster/resources/`) and requires an
existing `state.json`; graph/schema changes are deferred with warnings, and
applied resources do not serve traffic — the server still boots from
`omnigraph.yaml`. `cluster status` reads state only and reports any existing
lock metadata. `force-unlock` removes a lock only when the supplied id exactly
matches the lock file. `refresh` requires an existing `state.json`; `import`
creates one only when it is missing. Both observe declared graphs read-only at
`<config-dir>/graphs/<graph-id>.omni`. External state backends, apply,
automatic stale-lock breaking, `plan --refresh`, pipelines, UI specs,
`<config-dir>/graphs/<graph-id>.omni`. External state backends, graph/schema
apply, automatic stale-lock breaking, `plan --refresh`, pipelines, UI specs,
embeddings, aliases, and bindings are reserved for later stages. See
[cluster-config.md](cluster-config.md).

View file

@ -1,19 +1,23 @@
# Cluster Config
**Status:** Stage 2C state-lock recovery preview.
**Status:** Stage 3A config-only apply preview.
Cluster config is the future control-plane configuration surface for a whole
OmniGraph deployment. In this stage, OmniGraph can validate a local
`cluster.yaml` folder, produce a deterministic read-only plan, inspect the
local JSON state ledger, and explicitly refresh/import graph observations into
that ledger. It can also manually remove a held local state lock by exact lock
id. It does not apply desired changes, start servers, or write graph resources.
local JSON state ledger, explicitly refresh/import graph observations into
that ledger, manually remove a held local state lock by exact lock id, and
**apply the config-only subset of the plan** — stored-query and policy-bundle
catalog writes. It does not move graph manifests, change schemas, start
servers, or serve anything it applies: the server still boots from
`omnigraph.yaml`.
## Commands
```bash
omnigraph cluster validate --config ./company-brain
omnigraph cluster plan --config ./company-brain --json
omnigraph cluster apply --config ./company-brain --json
omnigraph cluster status --config ./company-brain --json
omnigraph cluster refresh --config ./company-brain --json
omnigraph cluster import --config ./company-brain --json
@ -38,7 +42,7 @@ of the two files.
## Supported `cluster.yaml`
Stage 2C accepts only the read-only resource subset:
Stage 3A accepts only this resource subset:
```yaml
version: 1
@ -64,9 +68,9 @@ policies:
`metadata.name` is a display label. `state.backend` may be omitted or set to
`cluster`; external state backends are reserved for a later stage. `state.lock`
defaults to `true`. When enabled, `cluster plan`, `cluster refresh`, and
`cluster import` briefly acquire `<config-dir>/__cluster/lock.json`, then remove
it before returning. `cluster status` never acquires the lock; it only reports
defaults to `true`. When enabled, `cluster plan`, `cluster apply`,
`cluster refresh`, and `cluster import` briefly acquire
`<config-dir>/__cluster/lock.json`, then remove it before returning. `cluster status` never acquires the lock; it only reports
whether one is present. `cluster force-unlock` is the only lock-removal command;
it requires the exact lock id and should be run only after confirming no cluster
operation is active.
@ -138,8 +142,53 @@ successful `plan` instead reports `lock_acquired: true` and an
`acquired_lock_id`, then releases the lock before returning. The command never
writes `state.json` and does not scan live graphs. Use explicit
`cluster refresh` / `cluster import` when the state ledger should be updated
from live observations. Apply and live drift scans during plan are later-stage
work.
from live observations. Live drift scans during plan are later-stage work.
Each plan change carries a `disposition` field — an honest preview of what
`cluster apply` will do with it in this stage: `applied` (executes), `derived`
(a `graph.<id>` composite-digest update that converges automatically once its
query digests land), `deferred` (graph/schema change, later phase), or
`blocked` (query/policy gated by an unapplied or missing dependency, with the
condition in `reason`).
## Apply
`cluster apply` executes the config-only subset of the plan — stored-query and
policy-bundle changes. There is no confirm flag: `cluster plan` is the preview,
and apply recomputes the same diff under the state lock before executing, so a
stale preview can never be applied. Apply requires an existing `state.json`
(`state_missing` directs you to `cluster import` first).
For each applied create/update, the resource payload is written
content-addressed into the local catalog:
```text
<config-dir>/__cluster/resources/query/<graph>/<name>/<digest>.gq
<config-dir>/__cluster/resources/policy/<name>/<digest>.yaml
```
Extensions are fixed per kind regardless of the source file's name. Payloads
are written before the state update because `state.json` is the publish point:
if the final CAS-checked state write fails, no success is reported and the
digest-named blobs already written are inert — re-running apply is the repair.
Deletes remove the resource from state; their old payload blobs stay on disk
(garbage collection is a later stage). Re-running a converged apply is a no-op:
no state write, no revision change (`state_written: false`).
**Applied means recorded in the cluster catalog — nothing more.** The server
still boots from `omnigraph.yaml`; no query or policy applied here serves
traffic until the server-boot stage ships, as an explicit per-deployment mode
switch.
Graph and schema changes are never executed by this stage. They are reported
as `deferred` (warning `apply_unsupported_change`), and query/policy changes
that depend on them are `blocked` (warning `apply_dependency_blocked`, status
`blocked` in state). A partially-applicable plan still exits 0 with warnings;
the JSON `converged` field is the automation signal for "state now matches the
desired revision". The applied `config_digest` is only recorded when apply
fully converges. The `graph.<id>` composite digest is recomputed from state's
own schema/query digests after each apply, so applied query changes converge
without graph movement.
## Status