From b6d228ff54a27ceeb7d13de93ee1ef8df94c87b5 Mon Sep 17 00:00:00 2001 From: Andrew Altshuler Date: Wed, 10 Jun 2026 00:59:20 +0300 Subject: [PATCH] =?UTF-8?q?test(cli):=20cluster=20e2e=20hardening=20?= =?UTF-8?q?=E2=80=94=20lost-state=20recovery,=20out-of-band=20drift,=20roo?= =?UTF-8?q?t=20destruction,=20multi-graph=20convergence=20(#166)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Four lifecycle compositions over the spawned binary that pin spec claims no single-command test proves: - Lost ledger: delete state.json -> re-import from the live graph -> re-apply converges onto the same content-addressed blobs (axiom 5's reconstructable- state resilience edge, end to end). - Out-of-band schema apply (the Sarah/Bob violation): refresh marks graph/schema Drifted with schema_mismatch, status and plan surface it, and cluster apply refuses to silently correct it — state keeps the LIVE schema digest (drift correction is gated, axiom 8). - Destroyed graph root: refresh records graph_missing drift and drops graph/schema digests while preserving query/policy; plan proposes deferred creates only; apply moves nothing and the catalog stays intact. - Two graphs (one live, one not yet created) + a graph-spanning policy + a cluster-scoped policy: a single apply yields all four dispositions at once (applied/derived/deferred/blocked, deterministically ordered), then the second graph appears, refresh observes it, and apply converges. Helpers: init_named_cluster_graph generalizes init_cluster_derived_graph; write_multi_graph_cluster_fixture builds the two-graph config. Co-authored-by: Claude Fable 5 --- crates/omnigraph-cli/tests/cli.rs | 366 +++++++++++++++++++++++++++++- docs/dev/testing.md | 2 +- 2 files changed, 365 insertions(+), 3 deletions(-) diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 30fa796..f60ffbe 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -145,14 +145,18 @@ policies: } fn init_cluster_derived_graph(root: &std::path::Path) { + init_named_cluster_graph(root, "knowledge", "people.pg"); +} + +fn init_named_cluster_graph(root: &std::path::Path, graph_id: &str, schema_file: &str) { let graph_dir = root.join("graphs"); fs::create_dir_all(&graph_dir).unwrap(); output_success( cli() .arg("init") .arg("--schema") - .arg(root.join("people.pg")) - .arg(graph_dir.join("knowledge.omni")), + .arg(root.join(schema_file)) + .arg(graph_dir.join(format!("{graph_id}.omni"))), ); } @@ -1073,6 +1077,364 @@ fn cluster_e2e_force_unlock_unblocks_apply() { assert_eq!(retried["converged"], true, "{retried}"); } +/// Two-graph fixture: `knowledge` (people) + `engineering` (services), a +/// policy spanning both graphs, and a cluster-scoped policy with no graph +/// dependencies. +fn write_multi_graph_cluster_fixture(root: &std::path::Path) { + write_cluster_config_fixture(root); + fs::write( + root.join("services.pg"), + r#" +node Service { + name: String @key +} +"#, + ) + .unwrap(); + fs::write( + root.join("services.gq"), + r#" +query find_service($name: String) { + match { $s: Service { name: $name } } + return { $s.name } +} +"#, + ) + .unwrap(); + fs::write(root.join("cluster_wide.policy.yaml"), "rules: []\n").unwrap(); + fs::write(root.join("shared.policy.yaml"), "rules: []\n").unwrap(); + fs::write( + root.join("cluster.yaml"), + r#" +version: 1 +metadata: + name: company-brain +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq + engineering: + schema: ./services.pg + queries: + find_service: + file: ./services.gq +policies: + shared: + file: ./shared.policy.yaml + applies_to: [knowledge, engineering] + cluster_wide: + file: ./cluster_wide.policy.yaml + applies_to: [cluster] +"#, + ) + .unwrap(); +} + +fn change_for<'j>(json: &'j serde_json::Value, resource: &str) -> &'j serde_json::Value { + json["changes"] + .as_array() + .unwrap() + .iter() + .find(|change| change["resource"] == resource) + .unwrap_or_else(|| panic!("missing change for {resource}: {json}")) +} + +/// The spec's resilience claim — "state is reconstructable from the +/// self-describing cluster" — proven end to end: lose the ledger, re-import +/// from the live graph, re-apply, and converge onto the same content-addressed +/// catalog blobs. +#[test] +fn cluster_e2e_lost_state_reimport_recovers_catalog() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["converged"], true, "{apply}"); + + let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"] + .as_str() + .unwrap() + .to_string(); + let blob = temp + .path() + .join("__cluster/resources/query/knowledge/find_person") + .join(format!("{query_digest}.gq")); + let blob_content = fs::read_to_string(&blob).unwrap(); + + // Disaster: the state ledger is lost. + fs::remove_file(temp.path().join("__cluster/state.json")).unwrap(); + + let reimport = cluster_json(temp.path(), "import"); + assert_eq!(reimport["ok"], true, "{reimport}"); + assert_eq!(reimport["state_observations"]["state_revision"], 1); + // Import observes graph/schema only; query/policy digests are not invented. + assert!( + reimport["resource_digests"] + .get("query.knowledge.find_person") + .is_none(), + "{reimport}" + ); + + let plan = cluster_json(temp.path(), "plan"); + assert_eq!( + change_for(&plan, "query.knowledge.find_person")["disposition"], + "applied" + ); + assert_eq!(change_for(&plan, "policy.base")["disposition"], "applied"); + + let reapply = cluster_json(temp.path(), "apply"); + assert_eq!(reapply["ok"], true, "{reapply}"); + assert_eq!(reapply["converged"], true, "{reapply}"); + assert!( + reapply["state_observations"]["applied_config_digest"].is_string(), + "{reapply}" + ); + // The catalog blob was reused, not rewritten with different content. + assert_eq!(fs::read_to_string(&blob).unwrap(), blob_content); + + let replan = cluster_json(temp.path(), "plan"); + assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}"); +} + +/// The Sarah/Bob violation made visible: a schema change applied directly to +/// the graph (no config change) must surface as drift through refresh, status, +/// and plan — and apply must never silently "correct" it. +#[test] +fn cluster_e2e_out_of_band_schema_change_surfaces_as_drift() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["converged"], true, "{apply}"); + + // Out-of-band: the live graph evolves, cluster.yaml stays put. + fs::write( + temp.path().join("people_v2.pg"), + r#" +node Person { + name: String @key + age: I32? + bio: String? +} +"#, + ) + .unwrap(); + output_success( + cli() + .arg("schema") + .arg("apply") + .arg(temp.path().join("graphs/knowledge.omni")) + .arg("--schema") + .arg(temp.path().join("people_v2.pg")) + .arg("--json"), + ); + + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + assert_eq!( + refresh["resource_statuses"]["schema.knowledge"]["status"], + "drifted" + ); + assert_eq!( + refresh["resource_statuses"]["graph.knowledge"]["status"], + "drifted" + ); + assert_eq!( + refresh["observations"]["graph.knowledge"]["schema_matches_desired"], + false + ); + + let status = cluster_json(temp.path(), "status"); + assert_eq!( + status["resource_statuses"]["schema.knowledge"]["status"], + "drifted" + ); + + let plan = cluster_json(temp.path(), "plan"); + assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred"); + assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred"); + let live_schema_digest = change_for(&plan, "schema.knowledge")["before_digest"] + .as_str() + .unwrap() + .to_string(); + + let drift_apply = cluster_json(temp.path(), "apply"); + assert_eq!(drift_apply["applied_count"], 0, "{drift_apply}"); + assert_eq!(drift_apply["converged"], false, "{drift_apply}"); + // Apply must not have "corrected" the drift: state still records the LIVE + // schema digest, not the desired one. + let state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(), + ) + .unwrap(); + assert_eq!( + state["applied_revision"]["resources"]["schema.knowledge"]["digest"], + live_schema_digest + ); +} + +/// Disaster input fails closed: a destroyed graph root drifts the ledger, +/// the plan proposes deferred creates, and apply moves nothing. +#[test] +fn cluster_e2e_graph_root_destruction_drifts_and_apply_moves_nothing() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["converged"], true, "{apply}"); + let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"] + .as_str() + .unwrap() + .to_string(); + + fs::remove_dir_all(temp.path().join("graphs/knowledge.omni")).unwrap(); + + // Missing root is drift, not an error. + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + assert_eq!( + refresh["resource_statuses"]["graph.knowledge"]["status"], + "drifted" + ); + assert!( + refresh["resource_statuses"]["graph.knowledge"]["conditions"] + .as_array() + .unwrap() + .iter() + .any(|condition| condition == "graph_missing"), + "{refresh}" + ); + // Graph/schema digests removed; query/policy digests preserved. + assert!(refresh["resource_digests"].get("graph.knowledge").is_none()); + assert!(refresh["resource_digests"].get("schema.knowledge").is_none()); + assert!( + refresh["resource_digests"] + .get("query.knowledge.find_person") + .is_some(), + "{refresh}" + ); + + let plan = cluster_json(temp.path(), "plan"); + assert_eq!(change_for(&plan, "graph.knowledge")["operation"], "create"); + assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "deferred"); + assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "deferred"); + // Converged-then-destroyed: query/policy are already in state at the + // desired digests, so they are not changes at all. + assert_eq!(plan["changes"].as_array().unwrap().len(), 2, "{plan}"); + + let disaster_apply = cluster_json(temp.path(), "apply"); + assert_eq!(disaster_apply["applied_count"], 0, "{disaster_apply}"); + assert_eq!(disaster_apply["converged"], false, "{disaster_apply}"); + let state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(), + ) + .unwrap(); + assert_eq!( + state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"], + query_digest + ); + assert!( + temp.path() + .join("__cluster/resources/query/knowledge/find_person") + .join(format!("{query_digest}.gq")) + .exists() + ); +} + +/// The disposition matrix as a system: one apply over two graphs (one live, +/// one not yet created) plus graph-spanning and cluster-scoped policies must +/// produce all four dispositions at once — then converge after the second +/// graph appears. +#[test] +fn cluster_e2e_multi_graph_mixed_dispositions_then_converge() { + let temp = tempdir().unwrap(); + write_multi_graph_cluster_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); // knowledge only + + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["ok"], true, "{apply}"); + assert_eq!(apply["converged"], false, "{apply}"); + assert_eq!(apply["applied_count"], 2, "{apply}"); + assert_eq!( + change_for(&apply, "query.knowledge.find_person")["disposition"], + "applied" + ); + assert_eq!( + change_for(&apply, "policy.cluster_wide")["disposition"], + "applied" + ); + assert_eq!( + change_for(&apply, "query.engineering.find_service")["disposition"], + "blocked" + ); + assert_eq!( + change_for(&apply, "query.engineering.find_service")["reason"], + "dependency_missing" + ); + // One missing dependency graph blocks the whole spanning policy. + assert_eq!(change_for(&apply, "policy.shared")["disposition"], "blocked"); + assert_eq!( + change_for(&apply, "graph.engineering")["disposition"], + "deferred" + ); + assert_eq!( + change_for(&apply, "schema.engineering")["disposition"], + "deferred" + ); + assert_eq!( + change_for(&apply, "graph.knowledge")["disposition"], + "derived" + ); + assert_eq!( + apply["resource_statuses"]["policy.shared"]["status"], + "blocked" + ); + // Deterministic ordering: changes sorted by resource address. + let order: Vec<&str> = apply["changes"] + .as_array() + .unwrap() + .iter() + .map(|change| change["resource"].as_str().unwrap()) + .collect(); + let mut sorted = order.clone(); + sorted.sort_unstable(); + assert_eq!(order, sorted, "{apply}"); + + // The second graph appears; refresh observes it; apply converges. + init_named_cluster_graph(temp.path(), "engineering", "services.pg"); + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + + let converge = cluster_json(temp.path(), "apply"); + assert_eq!(converge["ok"], true, "{converge}"); + assert_eq!(converge["converged"], true, "{converge}"); + assert_eq!( + change_for(&converge, "query.engineering.find_service")["disposition"], + "applied" + ); + assert_eq!(change_for(&converge, "policy.shared")["disposition"], "applied"); + + let final_plan = cluster_json(temp.path(), "plan"); + assert!( + final_plan["changes"].as_array().unwrap().is_empty(), + "{final_plan}" + ); +} + #[test] fn short_version_flag_prints_current_cli_version() { let output = output_success(cli().arg("-v")); diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 1f818e9..1eebeb2 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -7,7 +7,7 @@ This file is the always-on map of the test surface. **Consult it before every ta | Crate | Path | Style | |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | -| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | +| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | | `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, and config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |