test(cli): cluster e2e hardening — lost-state recovery, out-of-band drift, root destruction, multi-graph convergence (#166)

Four lifecycle compositions over the spawned binary that pin spec claims no
single-command test proves:

- Lost ledger: delete state.json -> re-import from the live graph -> re-apply
  converges onto the same content-addressed blobs (axiom 5's reconstructable-
  state resilience edge, end to end).
- Out-of-band schema apply (the Sarah/Bob violation): refresh marks
  graph/schema Drifted with schema_mismatch, status and plan surface it, and
  cluster apply refuses to silently correct it — state keeps the LIVE schema
  digest (drift correction is gated, axiom 8).
- Destroyed graph root: refresh records graph_missing drift and drops
  graph/schema digests while preserving query/policy; plan proposes deferred
  creates only; apply moves nothing and the catalog stays intact.
- Two graphs (one live, one not yet created) + a graph-spanning policy + a
  cluster-scoped policy: a single apply yields all four dispositions at once
  (applied/derived/deferred/blocked, deterministically ordered), then the
  second graph appears, refresh observes it, and apply converges.

Helpers: init_named_cluster_graph generalizes init_cluster_derived_graph;
write_multi_graph_cluster_fixture builds the two-graph config.

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-06-10 00:59:20 +03:00 committed by GitHub
parent 2d1c25d3fa
commit b6d228ff54
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 365 additions and 3 deletions

View file

@ -145,14 +145,18 @@ policies:
}
fn init_cluster_derived_graph(root: &std::path::Path) {
init_named_cluster_graph(root, "knowledge", "people.pg");
}
fn init_named_cluster_graph(root: &std::path::Path, graph_id: &str, schema_file: &str) {
let graph_dir = root.join("graphs");
fs::create_dir_all(&graph_dir).unwrap();
output_success(
cli()
.arg("init")
.arg("--schema")
.arg(root.join("people.pg"))
.arg(graph_dir.join("knowledge.omni")),
.arg(root.join(schema_file))
.arg(graph_dir.join(format!("{graph_id}.omni"))),
);
}
@ -1073,6 +1077,364 @@ fn cluster_e2e_force_unlock_unblocks_apply() {
assert_eq!(retried["converged"], true, "{retried}");
}
/// Two-graph fixture: `knowledge` (people) + `engineering` (services), a
/// policy spanning both graphs, and a cluster-scoped policy with no graph
/// dependencies.
fn write_multi_graph_cluster_fixture(root: &std::path::Path) {
write_cluster_config_fixture(root);
fs::write(
root.join("services.pg"),
r#"
node Service {
name: String @key
}
"#,
)
.unwrap();
fs::write(
root.join("services.gq"),
r#"
query find_service($name: String) {
match { $s: Service { name: $name } }
return { $s.name }
}
"#,
)
.unwrap();
fs::write(root.join("cluster_wide.policy.yaml"), "rules: []\n").unwrap();
fs::write(root.join("shared.policy.yaml"), "rules: []\n").unwrap();
fs::write(
root.join("cluster.yaml"),
r#"
version: 1
metadata:
name: company-brain
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
engineering:
schema: ./services.pg
queries:
find_service:
file: ./services.gq
policies:
shared:
file: ./shared.policy.yaml
applies_to: [knowledge, engineering]
cluster_wide:
file: ./cluster_wide.policy.yaml
applies_to: [cluster]
"#,
)
.unwrap();
}
fn change_for<'j>(json: &'j serde_json::Value, resource: &str) -> &'j serde_json::Value {
json["changes"]
.as_array()
.unwrap()
.iter()
.find(|change| change["resource"] == resource)
.unwrap_or_else(|| panic!("missing change for {resource}: {json}"))
}
/// The spec's resilience claim — "state is reconstructable from the
/// self-describing cluster" — proven end to end: lose the ledger, re-import
/// from the live graph, re-apply, and converge onto the same content-addressed
/// catalog blobs.
#[test]
fn cluster_e2e_lost_state_reimport_recovers_catalog() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"]
.as_str()
.unwrap()
.to_string();
let blob = temp
.path()
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{query_digest}.gq"));
let blob_content = fs::read_to_string(&blob).unwrap();
// Disaster: the state ledger is lost.
fs::remove_file(temp.path().join("__cluster/state.json")).unwrap();
let reimport = cluster_json(temp.path(), "import");
assert_eq!(reimport["ok"], true, "{reimport}");
assert_eq!(reimport["state_observations"]["state_revision"], 1);
// Import observes graph/schema only; query/policy digests are not invented.
assert!(
reimport["resource_digests"]
.get("query.knowledge.find_person")
.is_none(),
"{reimport}"
);
let plan = cluster_json(temp.path(), "plan");
assert_eq!(
change_for(&plan, "query.knowledge.find_person")["disposition"],
"applied"
);
assert_eq!(change_for(&plan, "policy.base")["disposition"], "applied");
let reapply = cluster_json(temp.path(), "apply");
assert_eq!(reapply["ok"], true, "{reapply}");
assert_eq!(reapply["converged"], true, "{reapply}");
assert!(
reapply["state_observations"]["applied_config_digest"].is_string(),
"{reapply}"
);
// The catalog blob was reused, not rewritten with different content.
assert_eq!(fs::read_to_string(&blob).unwrap(), blob_content);
let replan = cluster_json(temp.path(), "plan");
assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}");
}
/// The Sarah/Bob violation made visible: a schema change applied directly to
/// the graph (no config change) must surface as drift through refresh, status,
/// and plan — and apply must never silently "correct" it.
#[test]
fn cluster_e2e_out_of_band_schema_change_surfaces_as_drift() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
// Out-of-band: the live graph evolves, cluster.yaml stays put.
fs::write(
temp.path().join("people_v2.pg"),
r#"
node Person {
name: String @key
age: I32?
bio: String?
}
"#,
)
.unwrap();
output_success(
cli()
.arg("schema")
.arg("apply")
.arg(temp.path().join("graphs/knowledge.omni"))
.arg("--schema")
.arg(temp.path().join("people_v2.pg"))
.arg("--json"),
);
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
assert_eq!(
refresh["resource_statuses"]["schema.knowledge"]["status"],
"drifted"
);
assert_eq!(
refresh["resource_statuses"]["graph.knowledge"]["status"],
"drifted"
);
assert_eq!(
refresh["observations"]["graph.knowledge"]["schema_matches_desired"],
false
);
let status = cluster_json(temp.path(), "status");
assert_eq!(
status["resource_statuses"]["schema.knowledge"]["status"],
"drifted"
);
let plan = cluster_json(temp.path(), "plan");
assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred");
assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred");
let live_schema_digest = change_for(&plan, "schema.knowledge")["before_digest"]
.as_str()
.unwrap()
.to_string();
let drift_apply = cluster_json(temp.path(), "apply");
assert_eq!(drift_apply["applied_count"], 0, "{drift_apply}");
assert_eq!(drift_apply["converged"], false, "{drift_apply}");
// Apply must not have "corrected" the drift: state still records the LIVE
// schema digest, not the desired one.
let state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
)
.unwrap();
assert_eq!(
state["applied_revision"]["resources"]["schema.knowledge"]["digest"],
live_schema_digest
);
}
/// Disaster input fails closed: a destroyed graph root drifts the ledger,
/// the plan proposes deferred creates, and apply moves nothing.
#[test]
fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"]
.as_str()
.unwrap()
.to_string();
fs::remove_dir_all(temp.path().join("graphs/knowledge.omni")).unwrap();
// Missing root is drift, not an error.
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
assert_eq!(
refresh["resource_statuses"]["graph.knowledge"]["status"],
"drifted"
);
assert!(
refresh["resource_statuses"]["graph.knowledge"]["conditions"]
.as_array()
.unwrap()
.iter()
.any(|condition| condition == "graph_missing"),
"{refresh}"
);
// Graph/schema digests removed; query/policy digests preserved.
assert!(refresh["resource_digests"].get("graph.knowledge").is_none());
assert!(refresh["resource_digests"].get("schema.knowledge").is_none());
assert!(
refresh["resource_digests"]
.get("query.knowledge.find_person")
.is_some(),
"{refresh}"
);
let plan = cluster_json(temp.path(), "plan");
assert_eq!(change_for(&plan, "graph.knowledge")["operation"], "create");
assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred");
assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred");
// Converged-then-destroyed: query/policy are already in state at the
// desired digests, so they are not changes at all.
assert_eq!(plan["changes"].as_array().unwrap().len(), 2, "{plan}");
let disaster_apply = cluster_json(temp.path(), "apply");
assert_eq!(disaster_apply["applied_count"], 0, "{disaster_apply}");
assert_eq!(disaster_apply["converged"], false, "{disaster_apply}");
let state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
)
.unwrap();
assert_eq!(
state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"],
query_digest
);
assert!(
temp.path()
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{query_digest}.gq"))
.exists()
);
}
/// The disposition matrix as a system: one apply over two graphs (one live,
/// one not yet created) plus graph-spanning and cluster-scoped policies must
/// produce all four dispositions at once — then converge after the second
/// graph appears.
#[test]
fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() {
let temp = tempdir().unwrap();
write_multi_graph_cluster_fixture(temp.path());
init_cluster_derived_graph(temp.path()); // knowledge only
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["ok"], true, "{apply}");
assert_eq!(apply["converged"], false, "{apply}");
assert_eq!(apply["applied_count"], 2, "{apply}");
assert_eq!(
change_for(&apply, "query.knowledge.find_person")["disposition"],
"applied"
);
assert_eq!(
change_for(&apply, "policy.cluster_wide")["disposition"],
"applied"
);
assert_eq!(
change_for(&apply, "query.engineering.find_service")["disposition"],
"blocked"
);
assert_eq!(
change_for(&apply, "query.engineering.find_service")["reason"],
"dependency_missing"
);
// One missing dependency graph blocks the whole spanning policy.
assert_eq!(change_for(&apply, "policy.shared")["disposition"], "blocked");
assert_eq!(
change_for(&apply, "graph.engineering")["disposition"],
"deferred"
);
assert_eq!(
change_for(&apply, "schema.engineering")["disposition"],
"deferred"
);
assert_eq!(
change_for(&apply, "graph.knowledge")["disposition"],
"derived"
);
assert_eq!(
apply["resource_statuses"]["policy.shared"]["status"],
"blocked"
);
// Deterministic ordering: changes sorted by resource address.
let order: Vec<&str> = apply["changes"]
.as_array()
.unwrap()
.iter()
.map(|change| change["resource"].as_str().unwrap())
.collect();
let mut sorted = order.clone();
sorted.sort_unstable();
assert_eq!(order, sorted, "{apply}");
// The second graph appears; refresh observes it; apply converges.
init_named_cluster_graph(temp.path(), "engineering", "services.pg");
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
let converge = cluster_json(temp.path(), "apply");
assert_eq!(converge["ok"], true, "{converge}");
assert_eq!(converge["converged"], true, "{converge}");
assert_eq!(
change_for(&converge, "query.engineering.find_service")["disposition"],
"applied"
);
assert_eq!(change_for(&converge, "policy.shared")["disposition"], "applied");
let final_plan = cluster_json(temp.path(), "plan");
assert!(
final_plan["changes"].as_array().unwrap().is_empty(),
"{final_plan}"
);
}
#[test]
fn short_version_flag_prints_current_cli_version() {
let output = output_success(cli().arg("-v"));

View file

@ -7,7 +7,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
| Crate | Path | Style |
|---|---|---|
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, and config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply) |
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) |
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |