Merge pull request #173 from ModernRelay/feat/cluster-graph-delete-4c

feat(cluster): Stage 4C — gated graph delete; Phase 4 complete
This commit is contained in:
Andrew Altshuler 2026-06-10 14:53:11 +03:00 committed by GitHub
commit 14b85a59de
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1280 additions and 59 deletions

View file

@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
use omnigraph_cluster::{
ApplyOptions, ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, apply_config_dir_with_options, force_unlock_config_dir, import_config_dir, plan_config_dir,
ApplyOptions, ApplyOutput, ApproveOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, apply_config_dir_with_options, approve_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir,
refresh_config_dir, status_config_dir, validate_config_dir,
};
use omnigraph_compiler::query::parser::parse_query;
@ -371,6 +371,18 @@ enum ClusterCommand {
#[arg(long)]
json: bool,
},
/// Record a digest-bound approval for a gated (irreversible) change,
/// e.g. a graph delete. Requires the global --as actor.
Approve {
/// Typed resource address of the gated change (e.g. graph.scratch).
resource: String,
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Read the local JSON state ledger without scanning live graph resources.
Status {
/// Cluster config directory containing cluster.yaml.
@ -1011,6 +1023,33 @@ fn finish_cluster_apply(output: &ApplyOutput, json: bool) -> Result<()> {
Ok(())
}
fn finish_cluster_approve(output: &ApproveOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else if output.ok {
println!(
"cluster approve: {} {} approved by {} (approval {})",
output
.operation
.as_ref()
.map(|operation| format!("{operation:?}").to_lowercase())
.unwrap_or_default(),
output.resource.as_deref().unwrap_or("?"),
output.approved_by.as_deref().unwrap_or("?"),
output.approval_id.as_deref().unwrap_or("?"),
);
print_cluster_diagnostics(&output.diagnostics);
} else {
println!("cluster approve failed");
print_cluster_diagnostics(&output.diagnostics);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
@ -3581,6 +3620,19 @@ async fn main() -> Result<()> {
.await;
finish_cluster_apply(&output, json)?;
}
ClusterCommand::Approve {
resource,
config,
json,
} => {
let Some(approver) = cli.as_actor.as_deref() else {
bail!(
"`cluster approve` requires the global --as <ACTOR> flag: an approval without an approver is meaningless"
);
};
let output = approve_config_dir(config, &resource, approver).await;
finish_cluster_approve(&output, json)?;
}
ClusterCommand::Status { config, json } => {
let output = status_config_dir(config);
finish_cluster_status(&output, json)?;

View file

@ -1358,7 +1358,7 @@ fn cluster_e2e_graph_root_destruction_drifts_then_apply_recreates_empty_graph()
/// (applied), its composite (derived) — shows all four dispositions at once
/// before the graph-plane schema apply closes the loop.
#[test]
fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
fn cluster_e2e_multi_graph_mixed_dispositions_then_approve_and_converge() {
let temp = tempdir().unwrap();
write_multi_graph_cluster_fixture(temp.path());
// No manual init: Stage 4A creates both graphs.
@ -1424,22 +1424,29 @@ policies:
let mixed = cluster_json(temp.path(), "apply");
assert_eq!(mixed["ok"], true, "{mixed}");
assert_eq!(mixed["converged"], false, "{mixed}");
// Stage 4C: deletes are gated on a digest-bound approval, one gate per
// subtree (the graph-level approval carries schema + queries).
assert_eq!(
change_for(&mixed, "graph.engineering")["disposition"],
"deferred"
);
assert_eq!(
change_for(&mixed, "schema.engineering")["disposition"],
"deferred"
);
assert_eq!(
change_for(&mixed, "query.engineering.find_service")["disposition"],
"blocked"
);
assert_eq!(
change_for(&mixed, "query.engineering.find_service")["reason"],
"dependency_not_applied"
change_for(&mixed, "graph.engineering")["reason"],
"approval_required"
);
assert_eq!(
change_for(&mixed, "schema.engineering")["reason"],
"approval_required"
);
assert_eq!(
change_for(&mixed, "query.engineering.find_service")["reason"],
"approval_required"
);
let gate_plan = cluster_json(temp.path(), "plan");
let gates = gate_plan["approvals_required"].as_array().unwrap();
assert_eq!(gates.len(), 1, "{gate_plan}");
assert_eq!(gates[0]["resource"], "graph.engineering");
assert_eq!(gates[0]["satisfied"], false);
assert_eq!(
change_for(&mixed, "query.knowledge.find_person")["disposition"],
"applied"
@ -1461,7 +1468,55 @@ policies:
let mut sorted = order.clone();
sorted.sort_unstable();
assert_eq!(order, sorted, "{mixed}");
// Graph deletion cannot converge until stage 4C's approval artifacts.
// The conclusion: an apply without approval stays blocked; the approved
// delete converges the cluster, tombstoning the removed graph.
let still_blocked = cluster_json(temp.path(), "apply");
assert_eq!(still_blocked["converged"], false, "{still_blocked}");
let approve = parse_stdout_json(&output_success(
cli()
.arg("--as")
.arg("andrew")
.arg("cluster")
.arg("approve")
.arg("graph.engineering")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(approve["ok"], true, "{approve}");
assert_eq!(approve["approved_by"], "andrew");
let converge = cluster_json(temp.path(), "apply");
assert_eq!(converge["ok"], true, "{converge}");
assert_eq!(converge["converged"], true, "{converge}");
assert!(!temp.path().join("graphs/engineering.omni").exists());
let status = cluster_json(temp.path(), "status");
assert_eq!(status["observations"]["graph.engineering"]["kind"], "tombstone");
let final_plan = cluster_json(temp.path(), "plan");
assert!(
final_plan["changes"].as_array().unwrap().is_empty(),
"{final_plan}"
);
}
/// An approval without an approver is meaningless: approve requires --as.
#[test]
fn cluster_e2e_approve_requires_actor() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_failure(
cli()
.arg("cluster")
.arg("approve")
.arg("graph.knowledge")
.arg("--config")
.arg(temp.path()),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(stderr.contains("--as"), "{stderr}");
}
/// Stage 4A headline: a declared graph is created by `cluster apply` itself —

File diff suppressed because it is too large Load diff

View file

@ -16,7 +16,8 @@ use fail::FailScenario;
use omnigraph_cluster::failpoints::ScopedFailPoint;
use omnigraph::db::Omnigraph;
use omnigraph_cluster::{
ApplyOptions, apply_config_dir, apply_config_dir_with_options, validate_config_dir,
ApplyOptions, apply_config_dir, apply_config_dir_with_options, approve_config_dir,
validate_config_dir,
};
use tempfile::tempdir;
@ -467,3 +468,126 @@ async fn schema_crash_after_apply_rolls_state_forward() {
);
scenario.teardown();
}
/// Seed: converged state + a stale `old` graph subtree with a real root and
/// a valid approval for its delete. Returns the approval id.
async fn seed_approved_delete(dir: &Path) -> String {
let digests = seed_applyable_state(dir);
let graph_digest = digests["graph.knowledge"].clone();
let schema_digest = digests["schema.knowledge"].clone();
let state_dir = dir.join("__cluster");
fs::write(
state_dir.join("state.json"),
format!(
r#"{{
"version": 1,
"state_revision": 1,
"applied_revision": {{
"resources": {{
"graph.knowledge": {{ "digest": "{graph_digest}" }},
"schema.knowledge": {{ "digest": "{schema_digest}" }},
"graph.old": {{ "digest": "3333" }},
"schema.old": {{ "digest": "4444" }}
}}
}}
}}
"#
),
)
.unwrap();
let root = dir.join("graphs/old.omni");
fs::create_dir_all(&root).unwrap();
fs::write(root.join("_schema.pg"), "stale").unwrap();
let approved = approve_config_dir(dir, "graph.old", "test-actor").await;
assert!(approved.ok, "{:?}", approved.diagnostics);
approved.approval_id.unwrap()
}
/// Crash before the removal: root intact, approval unconsumed, no ack; the
/// next run retires the stale intent (row 8) and the still-approved delete
/// completes in the same run.
#[tokio::test]
async fn delete_crash_before_removal_reproposes() {
let scenario = FailScenario::setup();
let dir = fixture();
let approval_id = seed_approved_delete(dir.path()).await;
{
let _failpoint = ScopedFailPoint::new("cluster_apply.before_graph_delete", "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(dir.path().join("graphs/old.omni").exists());
assert_eq!(recovery_sidecars(dir.path()).len(), 1);
// The approval is untouched (file unconsumed).
let artifact: serde_json::Value = serde_json::from_str(
&fs::read_to_string(
dir.path()
.join("__cluster/approvals")
.join(format!("{approval_id}.json")),
)
.unwrap(),
)
.unwrap();
assert!(artifact["consumed_at"].is_null());
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(
recovered
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "graph_delete_incomplete")
);
assert!(recovered.converged);
assert!(!dir.path().join("graphs/old.omni").exists());
assert!(recovery_sidecars(dir.path()).is_empty());
scenario.teardown();
}
/// Crash after the removal, before the state CAS: root gone, ledger stale,
/// nothing acknowledged; the next run's sweep rolls the tombstone forward,
/// consumes the approval the sidecar carries, and audits the recovery.
#[tokio::test]
async fn delete_crash_after_removal_rolls_forward() {
let scenario = FailScenario::setup();
let dir = fixture();
let approval_id = seed_approved_delete(dir.path()).await;
let state_before = fs::read(state_path(dir.path())).unwrap();
{
let _failpoint = ScopedFailPoint::new("cluster_apply.after_graph_delete", "return");
let out = apply_config_dir(dir.path()).await;
assert!(!out.ok);
assert!(!out.state_written);
assert!(!dir.path().join("graphs/old.omni").exists());
assert_eq!(fs::read(state_path(dir.path())).unwrap(), state_before);
let sidecars = recovery_sidecars(dir.path());
assert_eq!(sidecars.len(), 1);
let sidecar: serde_json::Value =
serde_json::from_str(&fs::read_to_string(&sidecars[0]).unwrap()).unwrap();
assert_eq!(sidecar["approval_id"], approval_id.as_str());
}
let recovered = apply_config_dir(dir.path()).await;
assert!(recovered.ok, "{:?}", recovered.diagnostics);
assert!(
recovered
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "cluster_recovery_rolled_forward")
);
assert!(recovered.converged);
let state: serde_json::Value =
serde_json::from_str(&fs::read_to_string(state_path(dir.path())).unwrap()).unwrap();
assert_eq!(state["observations"]["graph.old"]["kind"], "tombstone");
assert!(state["approval_records"][&approval_id]["consumed_at"].is_string());
assert!(
state["recovery_records"]
.as_object()
.unwrap()
.values()
.any(|record| record["kind"] == "graph_delete")
);
scenario.teardown();
}

View file

@ -1,6 +1,7 @@
# RFC: Cluster Graph & Schema Apply — Phase 4 of the Cluster Control Plane
**Status:** Proposed
**Status:** Landed (4A #170, 4B #171, 4C — all shipped)
**Implementation deviations:** (1) D3 row 8 retires the stale delete sidecar and lets the still-approved delete re-propose and retry, instead of a pending-block — prefix removal is idempotent, so the retry is the repair. (2) The approver/actor flag is the CLI's existing global `--as`, not a dedicated `--actor`/`--by`. (3) Consumed approval artifacts are rewritten with `consumed_at` rather than moved into state — the file and the ledger record both survive independently (axiom 11).
**Date:** 2026-06-10
**Builds on:** cluster Stages 13B (shipped: validate/plan/status/refresh/import/force-unlock, config-only `cluster apply` with content-addressed catalog publish, catalog payload verification, failpoint-proven crash/CAS recovery for the apply protocol). Normative context: [cluster-config-specs.md](cluster-config-specs.md), [cluster-axioms.md](cluster-axioms.md), [cluster-config-implementation-spec.md](cluster-config-implementation-spec.md).
**Target release:** unversioned (phased — see Sequencing); no cluster functionality is in a tagged release yet.

View file

@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
|---|---|---|
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), and Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows) |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), and Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows) |
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) |
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |

View file

@ -19,7 +19,7 @@ Top-level command families and subcommands. Graph-targeting commands accept eith
| `commit list \| show` | inspect commit graph |
| `schema plan \| apply \| show (alias: get)` | migrations |
| `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` |
| `cluster validate \| plan \| apply \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` and annotates each change with its apply disposition; `apply` executes the config-only (stored-query/policy) subset into the content-addressed local catalog under `__cluster/resources/` — graph/schema changes are deferred loudly, and nothing applied serves traffic (the server still boots from `omnigraph.yaml`); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock <LOCK_ID>` manually removes a held local state lock by exact id. No graph-manifest movement, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 3A |
| `cluster validate \| plan \| apply \| approve \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` and annotates each change with its apply disposition; `apply` executes the config-only (stored-query/policy) subset into the content-addressed local catalog under `__cluster/resources/` — graph/schema changes are deferred loudly, and nothing applied serves traffic (the server still boots from `omnigraph.yaml`); `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock <LOCK_ID>` manually removes a held local state lock by exact id. No graph-manifest movement, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 3A |
| `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns or uncovered drift; `--json` reports `skipped`) |
| `repair [--confirm] [--force]` | preview or explicitly publish uncovered manifest/head drift. `--confirm` heals verified maintenance drift and exits non-zero if suspicious/unverifiable drift is refused; `--force --confirm` publishes suspicious/unverifiable drift after operator review |
| `cleanup --keep N --older-than 7d --confirm` | destructive version GC |
@ -79,6 +79,7 @@ policy:
omnigraph cluster validate --config ./company-brain
omnigraph cluster plan --config ./company-brain --json
omnigraph cluster apply --config ./company-brain --json
omnigraph cluster approve graph.<id> --config ./company-brain --as <actor>
omnigraph cluster status --config ./company-brain --json
omnigraph cluster refresh --config ./company-brain --json
omnigraph cluster import --config ./company-brain --json

View file

@ -1,6 +1,6 @@
# Cluster Config
**Status:** Stage 4B schema-apply preview.
**Status:** Stage 4C — Phase 4 complete (graph create, schema apply, gated graph delete).
Cluster config is the future control-plane configuration surface for a whole
OmniGraph deployment. In this stage, OmniGraph can validate a local
@ -9,11 +9,10 @@ local JSON state ledger, explicitly refresh/import graph observations into
that ledger, manually remove a held local state lock by exact lock id, and
**apply the executable subset of the plan** — stored-query and policy-bundle
catalog writes, **graph creation** (a declared graph that does not exist yet
is initialized by apply at the derived root), and **schema updates**: a
changed schema is migrated on the live graph by apply itself, soft drops
only. It does not delete graphs (a later stage), perform data-loss
migrations, start servers, or serve anything it applies: the server still
boots from `omnigraph.yaml`.
is initialized by apply at the derived root), **schema updates** (soft drops
only), and — behind an explicit, digest-bound **approval** — **graph
deletion**. It does not perform data-loss schema migrations, start servers,
or serve anything it applies: the server still boots from `omnigraph.yaml`.
## Commands
@ -21,6 +20,7 @@ boots from `omnigraph.yaml`.
omnigraph cluster validate --config ./company-brain
omnigraph cluster plan --config ./company-brain --json
omnigraph cluster apply --config ./company-brain --json
omnigraph cluster approve graph.<id> --config ./company-brain --as <actor>
omnigraph cluster status --config ./company-brain --json
omnigraph cluster refresh --config ./company-brain --json
omnigraph cluster import --config ./company-brain --json
@ -253,7 +253,38 @@ in recovery sidecars and audit entries and threads it to the engine's
schema-apply (so commit attribution and Cedar enforcement — wherever a policy
checker is installed — work unchanged).
Schema deletes (removing a graph) are never executed by this stage. They are
### Approvals and graph deletion
Deleting a graph is the irreversible tier: it requires a recorded human
decision. `cluster plan` lists the gate under `approvals_required` (one gate
per graph — the graph-level approval carries its schema and queries);
`cluster approve graph.<id> --as <actor>` writes a digest-bound artifact to
```text
<config-dir>/__cluster/approvals/<approval-id>.json
```
bound to the exact desired config digest and the change's state digest, so
**any config or state drift after approving invalidates the artifact**
automatically (`approval_stale` warning; it never authorizes a different
change). An unapproved delete blocks with `approval_required`.
An approved delete executes **last** in the apply run: the graph root is
removed recursively, the subtree (graph, schema, its queries) is tombstoned
out of the state ledger with a tombstone observation, and the approval is
consumed — recorded in the state's `approval_records` in the same state
update, and the artifact file rewritten with `consumed_at` (the file is never
deleted: the audit fact survives the loss of either store). A failed run
consumes nothing; the approval stays valid for the retry. Catalog blobs of
the deleted graph's queries stay on disk (GC is a later stage).
Crash recovery for deletes: a completed-but-unrecorded delete is rolled
forward by the sweep (tombstone + approval consumption + audit entry); an
incomplete delete (root still present) is retired with a
`graph_delete_incomplete` warning and simply **re-proposed** — prefix removal
is idempotent, so the still-approved retry is the repair.
Standalone schema deletes are never executed by this stage. They are
reported as `deferred` (warning `apply_unsupported_change`), and query/policy
changes that depend on them are `blocked` (warning `apply_dependency_blocked`, status
`blocked` in state). A partially-applicable plan still exits 0 with warnings;