test(cli): comprehensive full-cycle cluster e2e with a live server

Two system tests composing the whole Phase 1-5 surface with real binaries:

- local_cluster_full_lifecycle_declare_serve_evolve_delete: declare two
  graphs -> one apply creates and converges them -> the --cluster server
  serves both stored queries -> schema+query evolve in one apply (migration
  previewed in plan) -> restart serves the new shape -> out-of-band schema
  drift observed by refresh and converged back by apply (rogue field
  soft-dropped) -> approved graph delete -> restart serves the survivor and
  404s the tombstoned graph -> final plan empty. Catches composition
  regressions where each stage passes its own tests but the lifecycle
  breaks (the composite_flow.rs principle at the control-plane level).

- local_cluster_serving_enforces_applied_policy_bindings: applied policy
  bundles gate serving per their bindings over HTTP with bearer-resolved
  actors — the cluster-bound bundle owns graph_list (admin 200, reader 403,
  anonymous 401), the graph-bound bundle owns invoke_query (reader gets
  rows; denied invocation is the documented anti-probing 404).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 18:07:29 +03:00
parent af6a1096b0
commit 7d70811df1
3 changed files with 413 additions and 1 deletions

View file

@ -218,6 +218,15 @@ pub fn spawn_server_with_cluster(cluster_dir: &Path) -> TestServer {
spawn_server_process(command)
}
pub fn spawn_server_with_cluster_env(cluster_dir: &Path, envs: &[(&str, &str)]) -> TestServer {
let mut command = server_process();
command.arg("--cluster").arg(cluster_dir);
for (name, value) in envs {
command.env(name, value);
}
spawn_server_process(command)
}
pub fn spawn_server_with_config_env(config: &Path, envs: &[(&str, &str)]) -> TestServer {
let mut command = server_process();
command.arg("--config").arg(config);

View file

@ -1714,3 +1714,406 @@ graphs:
let body: serde_json::Value = response.json().unwrap();
assert!(body.to_string().contains("Ada"), "{body}");
}
// ---- Comprehensive full-cycle cluster e2e (Phases 1-5 composed) ----
fn cluster_cli(dir: &std::path::Path, args: &[&str]) -> serde_json::Value {
let mut command = cli();
command.arg("cluster");
for arg in args {
command.arg(arg);
}
let output = command
.arg("--config")
.arg(dir)
.arg("--json")
.output()
.unwrap();
let stdout = String::from_utf8_lossy(&output.stdout);
serde_json::from_str(stdout.trim()).unwrap_or_else(|err| {
panic!(
"cluster {args:?} produced unparseable output ({err}): stdout={stdout} stderr={}",
String::from_utf8_lossy(&output.stderr)
)
})
}
fn write_two_graph_cluster(dir: &std::path::Path) {
std::fs::write(
dir.join("people.pg"),
"\nnode Person {\n name: String @key\n}\n",
)
.unwrap();
std::fs::write(
dir.join("services.pg"),
"\nnode Service {\n name: String @key\n}\n",
)
.unwrap();
std::fs::write(
dir.join("people.gq"),
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
)
.unwrap();
std::fs::write(
dir.join("services.gq"),
"\nquery find_service($name: String) {\n match { $s: Service { name: $name } }\n return { $s.name }\n}\n",
)
.unwrap();
std::fs::write(
dir.join("cluster.yaml"),
r#"
version: 1
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
engineering:
schema: ./services.pg
queries:
find_service:
file: ./services.gq
"#,
)
.unwrap();
}
fn seed_graph(dir: &std::path::Path, graph: &str, row: &str) {
let data = dir.join(format!("{graph}-seed.jsonl"));
std::fs::write(&data, row).unwrap();
let output = cli()
.arg("load")
.arg("--data")
.arg(&data)
.arg(dir.join(format!("graphs/{graph}.omni")))
.output()
.unwrap();
assert!(
output.status.success(),
"seed {graph} failed: {}",
String::from_utf8_lossy(&output.stderr)
);
}
fn invoke_query(
client: &Client,
base_url: &str,
graph: &str,
query: &str,
params: serde_json::Value,
) -> (u16, serde_json::Value) {
let response = client
.post(format!("{base_url}/graphs/{graph}/queries/{query}"))
.json(&serde_json::json!({ "params": params }))
.send()
.unwrap();
let status = response.status().as_u16();
let body = response.json().unwrap_or(serde_json::Value::Null);
(status, body)
}
/// The whole control-plane story in one test: declare two graphs → converge
/// (apply creates them) → serve → evolve schema+query in one apply → restart
/// serves the new shape → out-of-band drift converged back → approved graph
/// delete → restart serves the survivor only → plan empty.
#[test]
fn local_cluster_full_lifecycle_declare_serve_evolve_delete() {
let temp = tempfile::tempdir().unwrap();
let dir = temp.path();
write_two_graph_cluster(dir);
// Phase 1-2: declare + record.
assert_eq!(cluster_cli(dir, &["import"])["ok"], true);
// Phase 3-4: one apply creates both graphs and publishes the catalog.
let converge = cluster_cli(dir, &["apply"]);
assert_eq!(converge["converged"], true, "{converge}");
seed_graph(dir, "knowledge", "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n");
seed_graph(dir, "engineering", "{\"type\":\"Service\",\"data\":{\"name\":\"billing\"}}\n");
// Phase 5: serve the applied revision.
let client = Client::new();
{
let server = spawn_server_with_cluster(dir);
let (status, body) = invoke_query(
&client,
&server.base_url,
"knowledge",
"find_person",
serde_json::json!({"name": "Ada"}),
);
assert_eq!(status, 200, "{body}");
assert_eq!(body["rows"][0]["p.name"], "Ada", "{body}");
let (status, body) = invoke_query(
&client,
&server.base_url,
"engineering",
"find_service",
serde_json::json!({"name": "billing"}),
);
assert_eq!(status, 200, "{body}");
assert_eq!(body["rows"][0]["s.name"], "billing", "{body}");
}
// Evolve: schema gains a field, the query returns it — one apply, with
// the migration previewed in plan.
std::fs::write(
dir.join("people.pg"),
"\nnode Person {\n name: String @key\n bio: String?\n}\n",
)
.unwrap();
std::fs::write(
dir.join("people.gq"),
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name, $p.bio }\n}\n",
)
.unwrap();
let plan = cluster_cli(dir, &["plan"]);
let schema_change = plan["changes"]
.as_array()
.unwrap()
.iter()
.find(|change| change["resource"] == "schema.knowledge")
.unwrap();
assert_eq!(schema_change["migration"]["supported"], true, "{plan}");
let evolve = cluster_cli(dir, &["apply"]);
assert_eq!(evolve["converged"], true, "{evolve}");
// Restart: the server serves the evolved shape.
{
let server = spawn_server_with_cluster(dir);
let (status, body) = invoke_query(
&client,
&server.base_url,
"knowledge",
"find_person",
serde_json::json!({"name": "Ada"}),
);
assert_eq!(status, 200, "{body}");
assert!(
body["columns"]
.as_array()
.unwrap()
.iter()
.any(|column| column == "p.bio"),
"evolved query must project the new field: {body}"
);
}
// Out-of-band drift: the live graph evolves behind the cluster's back;
// refresh observes it, apply converges it back to the declared schema.
std::fs::write(
dir.join("rogue.pg"),
"\nnode Person {\n name: String @key\n bio: String?\n rogue: String?\n}\n",
)
.unwrap();
let output = cli()
.arg("schema")
.arg("apply")
.arg(dir.join("graphs/knowledge.omni"))
.arg("--schema")
.arg(dir.join("rogue.pg"))
.arg("--json")
.output()
.unwrap();
assert!(output.status.success(), "out-of-band schema apply failed");
let refresh = cluster_cli(dir, &["refresh"]);
assert_eq!(
refresh["resource_statuses"]["schema.knowledge"]["status"],
"drifted",
"{refresh}"
);
let heal = cluster_cli(dir, &["apply"]);
assert_eq!(heal["converged"], true, "{heal}");
let schema_show = cli()
.arg("schema")
.arg("show")
.arg(dir.join("graphs/knowledge.omni"))
.output()
.unwrap();
assert!(
!String::from_utf8_lossy(&schema_show.stdout).contains("rogue"),
"drift must be soft-dropped back to the declared schema"
);
// Retire engineering: gated delete, then the server serves the survivor.
std::fs::write(
dir.join("cluster.yaml"),
r#"
version: 1
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
"#,
)
.unwrap();
let blocked = cluster_cli(dir, &["apply"]);
assert_eq!(blocked["converged"], false, "{blocked}");
let approve_output = cli()
.arg("--as")
.arg("andrew")
.arg("cluster")
.arg("approve")
.arg("graph.engineering")
.arg("--config")
.arg(dir)
.arg("--json")
.output()
.unwrap();
assert!(approve_output.status.success(), "approve failed");
let delete = cluster_cli(dir, &["apply"]);
assert_eq!(delete["converged"], true, "{delete}");
assert!(!dir.join("graphs/engineering.omni").exists());
{
let server = spawn_server_with_cluster(dir);
let (status, body) = invoke_query(
&client,
&server.base_url,
"knowledge",
"find_person",
serde_json::json!({"name": "Ada"}),
);
assert_eq!(status, 200, "{body}");
let response = client
.post(format!(
"{}/graphs/engineering/queries/find_service",
server.base_url
))
.json(&serde_json::json!({"params": {"name": "billing"}}))
.send()
.unwrap();
assert_eq!(
response.status().as_u16(),
404,
"a deleted graph must vanish from the serving surface"
);
}
// The story ends converged: nothing left to do.
let final_plan = cluster_cli(dir, &["plan"]);
assert!(
final_plan["changes"].as_array().unwrap().is_empty(),
"{final_plan}"
);
}
/// Applied policy bundles gate serving per their bindings: the cluster-bound
/// bundle owns the management surface (graph_list), the graph-bound bundle
/// owns query invocation — enforced over HTTP with bearer-resolved actors.
#[test]
fn local_cluster_serving_enforces_applied_policy_bindings() {
let temp = tempfile::tempdir().unwrap();
let dir = temp.path();
std::fs::write(
dir.join("people.pg"),
"\nnode Person {\n name: String @key\n}\n",
)
.unwrap();
std::fs::write(
dir.join("people.gq"),
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
)
.unwrap();
std::fs::write(
dir.join("graph.policy.yaml"),
r#"
version: 1
groups:
readers: ["act-reader"]
protected_branches: [main]
rules:
- id: allow-invoke
allow:
actors: { group: readers }
actions: [invoke_query]
- id: allow-read
allow:
actors: { group: readers }
actions: [read]
branch_scope: any
"#,
)
.unwrap();
std::fs::write(
dir.join("server.policy.yaml"),
r#"
version: 1
kind: server
groups:
admins: ["act-admin"]
rules:
- id: allow-list
allow:
actors: { group: admins }
actions: [graph_list]
"#,
)
.unwrap();
std::fs::write(
dir.join("cluster.yaml"),
r#"
version: 1
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
graph_rules:
file: ./graph.policy.yaml
applies_to: [knowledge]
server_rules:
file: ./server.policy.yaml
applies_to: [cluster]
"#,
)
.unwrap();
assert_eq!(cluster_cli(dir, &["import"])["ok"], true);
let converge = cluster_cli(dir, &["apply"]);
assert_eq!(converge["converged"], true, "{converge}");
seed_graph(dir, "knowledge", "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n");
let server = spawn_server_with_cluster_env(
dir,
&[(
"OMNIGRAPH_SERVER_BEARER_TOKENS_JSON",
r#"{"act-admin":"admin-token","act-reader":"reader-token"}"#,
)],
);
let client = Client::new();
let get_graphs = |token: Option<&str>| {
let mut request = client.get(format!("{}/graphs", server.base_url));
if let Some(token) = token {
request = request.bearer_auth(token);
}
request.send().unwrap().status().as_u16()
};
// Management surface: cluster-bound bundle, admins only.
assert_eq!(get_graphs(Some("admin-token")), 200);
assert_eq!(get_graphs(Some("reader-token")), 403);
assert_eq!(get_graphs(None), 401);
// Query invocation: graph-bound bundle, readers only.
let invoke = |token: &str| {
client
.post(format!(
"{}/graphs/knowledge/queries/find_person",
server.base_url
))
.bearer_auth(token)
.json(&serde_json::json!({"params": {"name": "Ada"}}))
.send()
.unwrap()
};
let response = invoke("reader-token");
assert_eq!(response.status().as_u16(), 200);
let body: serde_json::Value = response.json().unwrap();
assert_eq!(body["rows"][0]["p.name"], "Ada", "{body}");
// Denied invocation is deliberately 404, indistinguishable from an
// unknown query — the server's anti-probing contract.
assert_eq!(invoke("admin-token").status().as_u16(), 404);
}

View file

@ -7,7 +7,7 @@ This file is the always-on map of the test surface. **Consult it before every ta
| Crate | Path | Style |
|---|---|---|
| `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish; includes the `cluster_e2e_*` lifecycle compositions over the spawned binary — lost-state re-import recovery, out-of-band drift, graph-root destruction, multi-graph mixed-disposition convergence), `system_local.rs` (incl. the full-cycle cluster lifecycle with a spawned `--cluster` server — declare→serve→evolve→drift-heal→approved-delete — and applied-policy enforcement over HTTP), `system_remote.rs`, share `tests/support/mod.rs` |
| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests`; `tests/failpoints.rs` (feature-gated) | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations, config-only apply (content-addressed payload publish, disposition gating, composite-digest convergence, idempotent re-apply), catalog payload verification (status read-only, refresh drift + self-heal), failpoint crash-mid-apply / CAS-race coverage, Stage 4A graph creation (create executor, recovery sidecars + sweep rows, create crash windows), Stage 4B schema apply (migration previews in plan, schema executor, schema-apply sweep classification, schema crash windows), Stage 4C gated deletes (digest-bound approvals, delete executor + tombstones, delete sweep rows, delete crash windows), and 5A policy binding metadata (applies_to in the applied revision, binding-change diffing + convergence, pre-5A backfill), and the 5B serving-snapshot read API (converged read, refusal rows) |
| `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level; incl. cluster-mode boot — converged-dir serving, policy binding wiring, boot refusals), `openapi.rs` (OpenAPI drift / regeneration) |
| `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint |