From 7d70811df1eceedf4f7b51a9b4e340e6a740237c Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Wed, 10 Jun 2026 18:07:29 +0300 Subject: [PATCH] test(cli): comprehensive full-cycle cluster e2e with a live server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two system tests composing the whole Phase 1-5 surface with real binaries: - local_cluster_full_lifecycle_declare_serve_evolve_delete: declare two graphs -> one apply creates and converges them -> the --cluster server serves both stored queries -> schema+query evolve in one apply (migration previewed in plan) -> restart serves the new shape -> out-of-band schema drift observed by refresh and converged back by apply (rogue field soft-dropped) -> approved graph delete -> restart serves the survivor and 404s the tombstoned graph -> final plan empty. Catches composition regressions where each stage passes its own tests but the lifecycle breaks (the composite_flow.rs principle at the control-plane level). - local_cluster_serving_enforces_applied_policy_bindings: applied policy bundles gate serving per their bindings over HTTP with bearer-resolved actors — the cluster-bound bundle owns graph_list (admin 200, reader 403, anonymous 401), the graph-bound bundle owns invoke_query (reader gets rows; denied invocation is the documented anti-probing 404). Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/tests/support/mod.rs | 9 + crates/omnigraph-cli/tests/system_local.rs | 403 +++++++++++++++++++++ docs/dev/testing.md | 2 +- 3 files changed, 413 insertions(+), 1 deletion(-) diff --git a/crates/omnigraph-cli/tests/support/mod.rs b/crates/omnigraph-cli/tests/support/mod.rs index 5c17182..855d8e0 100644 --- a/crates/omnigraph-cli/tests/support/mod.rs +++ b/crates/omnigraph-cli/tests/support/mod.rs @@ -218,6 +218,15 @@ pub fn spawn_server_with_cluster(cluster_dir: &Path) -> TestServer { spawn_server_process(command) } +pub fn spawn_server_with_cluster_env(cluster_dir: &Path, envs: &[(&str, &str)]) -> TestServer { + let mut command = server_process(); + command.arg("--cluster").arg(cluster_dir); + for (name, value) in envs { + command.env(name, value); + } + spawn_server_process(command) +} + pub fn spawn_server_with_config_env(config: &Path, envs: &[(&str, &str)]) -> TestServer { let mut command = server_process(); command.arg("--config").arg(config); diff --git a/crates/omnigraph-cli/tests/system_local.rs b/crates/omnigraph-cli/tests/system_local.rs index 81476b0..14b8890 100644 --- a/crates/omnigraph-cli/tests/system_local.rs +++ b/crates/omnigraph-cli/tests/system_local.rs @@ -1714,3 +1714,406 @@ graphs: let body: serde_json::Value = response.json().unwrap(); assert!(body.to_string().contains("Ada"), "{body}"); } + +// ---- Comprehensive full-cycle cluster e2e (Phases 1-5 composed) ---- + +fn cluster_cli(dir: &std::path::Path, args: &[&str]) -> serde_json::Value { + let mut command = cli(); + command.arg("cluster"); + for arg in args { + command.arg(arg); + } + let output = command + .arg("--config") + .arg(dir) + .arg("--json") + .output() + .unwrap(); + let stdout = String::from_utf8_lossy(&output.stdout); + serde_json::from_str(stdout.trim()).unwrap_or_else(|err| { + panic!( + "cluster {args:?} produced unparseable output ({err}): stdout={stdout} stderr={}", + String::from_utf8_lossy(&output.stderr) + ) + }) +} + +fn write_two_graph_cluster(dir: &std::path::Path) { + std::fs::write( + dir.join("people.pg"), + "\nnode Person {\n name: String @key\n}\n", + ) + .unwrap(); + std::fs::write( + dir.join("services.pg"), + "\nnode Service {\n name: String @key\n}\n", + ) + .unwrap(); + std::fs::write( + dir.join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + std::fs::write( + dir.join("services.gq"), + "\nquery find_service($name: String) {\n match { $s: Service { name: $name } }\n return { $s.name }\n}\n", + ) + .unwrap(); + std::fs::write( + dir.join("cluster.yaml"), + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq + engineering: + schema: ./services.pg + queries: + find_service: + file: ./services.gq +"#, + ) + .unwrap(); +} + +fn seed_graph(dir: &std::path::Path, graph: &str, row: &str) { + let data = dir.join(format!("{graph}-seed.jsonl")); + std::fs::write(&data, row).unwrap(); + let output = cli() + .arg("load") + .arg("--data") + .arg(&data) + .arg(dir.join(format!("graphs/{graph}.omni"))) + .output() + .unwrap(); + assert!( + output.status.success(), + "seed {graph} failed: {}", + String::from_utf8_lossy(&output.stderr) + ); +} + +fn invoke_query( + client: &Client, + base_url: &str, + graph: &str, + query: &str, + params: serde_json::Value, +) -> (u16, serde_json::Value) { + let response = client + .post(format!("{base_url}/graphs/{graph}/queries/{query}")) + .json(&serde_json::json!({ "params": params })) + .send() + .unwrap(); + let status = response.status().as_u16(); + let body = response.json().unwrap_or(serde_json::Value::Null); + (status, body) +} + +/// The whole control-plane story in one test: declare two graphs → converge +/// (apply creates them) → serve → evolve schema+query in one apply → restart +/// serves the new shape → out-of-band drift converged back → approved graph +/// delete → restart serves the survivor only → plan empty. +#[test] +fn local_cluster_full_lifecycle_declare_serve_evolve_delete() { + let temp = tempfile::tempdir().unwrap(); + let dir = temp.path(); + write_two_graph_cluster(dir); + + // Phase 1-2: declare + record. + assert_eq!(cluster_cli(dir, &["import"])["ok"], true); + // Phase 3-4: one apply creates both graphs and publishes the catalog. + let converge = cluster_cli(dir, &["apply"]); + assert_eq!(converge["converged"], true, "{converge}"); + seed_graph(dir, "knowledge", "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n"); + seed_graph(dir, "engineering", "{\"type\":\"Service\",\"data\":{\"name\":\"billing\"}}\n"); + + // Phase 5: serve the applied revision. + let client = Client::new(); + { + let server = spawn_server_with_cluster(dir); + let (status, body) = invoke_query( + &client, + &server.base_url, + "knowledge", + "find_person", + serde_json::json!({"name": "Ada"}), + ); + assert_eq!(status, 200, "{body}"); + assert_eq!(body["rows"][0]["p.name"], "Ada", "{body}"); + let (status, body) = invoke_query( + &client, + &server.base_url, + "engineering", + "find_service", + serde_json::json!({"name": "billing"}), + ); + assert_eq!(status, 200, "{body}"); + assert_eq!(body["rows"][0]["s.name"], "billing", "{body}"); + } + + // Evolve: schema gains a field, the query returns it — one apply, with + // the migration previewed in plan. + std::fs::write( + dir.join("people.pg"), + "\nnode Person {\n name: String @key\n bio: String?\n}\n", + ) + .unwrap(); + std::fs::write( + dir.join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name, $p.bio }\n}\n", + ) + .unwrap(); + let plan = cluster_cli(dir, &["plan"]); + let schema_change = plan["changes"] + .as_array() + .unwrap() + .iter() + .find(|change| change["resource"] == "schema.knowledge") + .unwrap(); + assert_eq!(schema_change["migration"]["supported"], true, "{plan}"); + let evolve = cluster_cli(dir, &["apply"]); + assert_eq!(evolve["converged"], true, "{evolve}"); + + // Restart: the server serves the evolved shape. + { + let server = spawn_server_with_cluster(dir); + let (status, body) = invoke_query( + &client, + &server.base_url, + "knowledge", + "find_person", + serde_json::json!({"name": "Ada"}), + ); + assert_eq!(status, 200, "{body}"); + assert!( + body["columns"] + .as_array() + .unwrap() + .iter() + .any(|column| column == "p.bio"), + "evolved query must project the new field: {body}" + ); + } + + // Out-of-band drift: the live graph evolves behind the cluster's back; + // refresh observes it, apply converges it back to the declared schema. + std::fs::write( + dir.join("rogue.pg"), + "\nnode Person {\n name: String @key\n bio: String?\n rogue: String?\n}\n", + ) + .unwrap(); + let output = cli() + .arg("schema") + .arg("apply") + .arg(dir.join("graphs/knowledge.omni")) + .arg("--schema") + .arg(dir.join("rogue.pg")) + .arg("--json") + .output() + .unwrap(); + assert!(output.status.success(), "out-of-band schema apply failed"); + let refresh = cluster_cli(dir, &["refresh"]); + assert_eq!( + refresh["resource_statuses"]["schema.knowledge"]["status"], + "drifted", + "{refresh}" + ); + let heal = cluster_cli(dir, &["apply"]); + assert_eq!(heal["converged"], true, "{heal}"); + let schema_show = cli() + .arg("schema") + .arg("show") + .arg(dir.join("graphs/knowledge.omni")) + .output() + .unwrap(); + assert!( + !String::from_utf8_lossy(&schema_show.stdout).contains("rogue"), + "drift must be soft-dropped back to the declared schema" + ); + + // Retire engineering: gated delete, then the server serves the survivor. + std::fs::write( + dir.join("cluster.yaml"), + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +"#, + ) + .unwrap(); + let blocked = cluster_cli(dir, &["apply"]); + assert_eq!(blocked["converged"], false, "{blocked}"); + let approve_output = cli() + .arg("--as") + .arg("andrew") + .arg("cluster") + .arg("approve") + .arg("graph.engineering") + .arg("--config") + .arg(dir) + .arg("--json") + .output() + .unwrap(); + assert!(approve_output.status.success(), "approve failed"); + let delete = cluster_cli(dir, &["apply"]); + assert_eq!(delete["converged"], true, "{delete}"); + assert!(!dir.join("graphs/engineering.omni").exists()); + + { + let server = spawn_server_with_cluster(dir); + let (status, body) = invoke_query( + &client, + &server.base_url, + "knowledge", + "find_person", + serde_json::json!({"name": "Ada"}), + ); + assert_eq!(status, 200, "{body}"); + let response = client + .post(format!( + "{}/graphs/engineering/queries/find_service", + server.base_url + )) + .json(&serde_json::json!({"params": {"name": "billing"}})) + .send() + .unwrap(); + assert_eq!( + response.status().as_u16(), + 404, + "a deleted graph must vanish from the serving surface" + ); + } + + // The story ends converged: nothing left to do. + let final_plan = cluster_cli(dir, &["plan"]); + assert!( + final_plan["changes"].as_array().unwrap().is_empty(), + "{final_plan}" + ); +} + +/// Applied policy bundles gate serving per their bindings: the cluster-bound +/// bundle owns the management surface (graph_list), the graph-bound bundle +/// owns query invocation — enforced over HTTP with bearer-resolved actors. +#[test] +fn local_cluster_serving_enforces_applied_policy_bindings() { + let temp = tempfile::tempdir().unwrap(); + let dir = temp.path(); + std::fs::write( + dir.join("people.pg"), + "\nnode Person {\n name: String @key\n}\n", + ) + .unwrap(); + std::fs::write( + dir.join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + std::fs::write( + dir.join("graph.policy.yaml"), + r#" +version: 1 +groups: + readers: ["act-reader"] +protected_branches: [main] +rules: + - id: allow-invoke + allow: + actors: { group: readers } + actions: [invoke_query] + - id: allow-read + allow: + actors: { group: readers } + actions: [read] + branch_scope: any +"#, + ) + .unwrap(); + std::fs::write( + dir.join("server.policy.yaml"), + r#" +version: 1 +kind: server +groups: + admins: ["act-admin"] +rules: + - id: allow-list + allow: + actors: { group: admins } + actions: [graph_list] +"#, + ) + .unwrap(); + std::fs::write( + dir.join("cluster.yaml"), + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +policies: + graph_rules: + file: ./graph.policy.yaml + applies_to: [knowledge] + server_rules: + file: ./server.policy.yaml + applies_to: [cluster] +"#, + ) + .unwrap(); + assert_eq!(cluster_cli(dir, &["import"])["ok"], true); + let converge = cluster_cli(dir, &["apply"]); + assert_eq!(converge["converged"], true, "{converge}"); + seed_graph(dir, "knowledge", "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n"); + + let server = spawn_server_with_cluster_env( + dir, + &[( + "OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", + r#"{"act-admin":"admin-token","act-reader":"reader-token"}"#, + )], + ); + let client = Client::new(); + let get_graphs = |token: Option<&str>| { + let mut request = client.get(format!("{}/graphs", server.base_url)); + if let Some(token) = token { + request = request.bearer_auth(token); + } + request.send().unwrap().status().as_u16() + }; + // Management surface: cluster-bound bundle, admins only. + assert_eq!(get_graphs(Some("admin-token")), 200); + assert_eq!(get_graphs(Some("reader-token")), 403); + assert_eq!(get_graphs(None), 401); + + // Query invocation: graph-bound bundle, readers only. + let invoke = |token: &str| { + client + .post(format!( + "{}/graphs/knowledge/queries/find_person", + server.base_url + )) + .bearer_auth(token) + .json(&serde_json::json!({"params": {"name": "Ada"}})) + .send() + .unwrap() + }; + let response = invoke("reader-token"); + assert_eq!(response.status().as_u16(), 200); + let body: serde_json::Value = response.json().unwrap(); + assert_eq!(body["rows"][0]["p.name"], "Ada", "{body}"); + // Denied invocation is deliberately 404, indistinguishable from an + // unknown query — the server's anti-probing contract. + assert_eq!(invoke("admin-token").status().as_u16(), 404); +} diff --git a/docs/dev/testing.md b/docs/dev/testing.md index eba70c9..9de80f7 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -7,7 +7,7 @@ This file is the always-on map of the test surface. **Consult it before every ta | Crate | Path | Style | |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | -| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | +| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs` (incl. the full-cycle cluster lifecycle with a spawned `--cluster` server — declare→serve→evolve→drift-heal→approved-delete — and applied-policy enforcement over HTTP), `system_remote.rs`, share `tests/support/mod.rs` | | `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level; incl. cluster-mode boot — converged-dir serving, policy binding wiring, boot refusals), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |