From d870eaaf3ff3d083061a799f4cf3ec1023b02cbb Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Tue, 9 Jun 2026 23:44:49 +0300 Subject: [PATCH] =?UTF-8?q?test(cli):=20cluster=20lifecycle=20e2e=20?= =?UTF-8?q?=E2=80=94=20real-graph=20import/apply/refresh,=20schema-change?= =?UTF-8?q?=20loop,=20force-unlock=20retry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three composition tests over the spawned binary against a real derived graph: - import -> plan (dispositions) -> apply -> status -> refresh -> plan-empty, then a query edit round-trip. Pins that refresh and apply recompute the graph composite digest identically — divergence would silently re-open the plan forever and no single-command test would catch it. - The Stage 3A operator workflow across the control/data-plane boundary: cluster apply defers a schema change, omnigraph schema apply executes it, cluster refresh observes it, the next cluster apply re-converges. - Held lock refuses apply, force-unlock clears it, retried apply converges. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/tests/cli.rs | 183 ++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 9dbf250..30fa796 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -890,6 +890,189 @@ fn cluster_apply_locked_exits_nonzero() { assert!(!temp.path().join("__cluster/resources").exists()); } +fn cluster_json(root: &std::path::Path, command: &str) -> serde_json::Value { + parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg(command) + .arg("--config") + .arg(root) + .arg("--json"), + )) +} + +/// End-to-end lifecycle against a REAL derived graph: import observes the live +/// graph, plan/apply converge the query+policy catalog, status reports it, +/// refresh re-observes without un-converging, and a query edit round-trips. +/// This is the composition test — every step passes individually elsewhere; +/// this catches the seams (e.g. refresh and apply recomputing the graph +/// composite digest differently would silently re-open the plan forever). +#[test] +fn cluster_e2e_lifecycle_import_apply_status_refresh_converges() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + assert_eq!(import["state_observations"]["state_revision"], 1); + + let plan = cluster_json(temp.path(), "plan"); + let changes = plan["changes"].as_array().unwrap(); + assert_eq!(changes.len(), 3, "{plan}"); + let disposition_of = |resource: &str| { + changes + .iter() + .find(|change| change["resource"] == resource) + .unwrap_or_else(|| panic!("missing change for {resource}: {plan}"))["disposition"] + .clone() + }; + assert_eq!(disposition_of("graph.knowledge"), "derived"); + assert_eq!(disposition_of("query.knowledge.find_person"), "applied"); + assert_eq!(disposition_of("policy.base"), "applied"); + + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["ok"], true, "{apply}"); + assert_eq!(apply["applied_count"], 2, "{apply}"); + assert_eq!(apply["converged"], true, "{apply}"); + + let status = cluster_json(temp.path(), "status"); + assert_eq!( + status["resource_statuses"]["query.knowledge.find_person"]["status"], + "applied" + ); + assert_eq!(status["resource_statuses"]["policy.base"]["status"], "applied"); + assert!( + status["state_observations"]["applied_config_digest"].is_string(), + "converged apply must record the applied config digest: {status}" + ); + + // Refresh re-observes the live graph; it must not undo apply's work. + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + let replan = cluster_json(temp.path(), "plan"); + assert!( + replan["changes"].as_array().unwrap().is_empty(), + "refresh after a converged apply must not re-open the plan: {replan}" + ); + + // A query edit round-trips: plan update -> apply -> converged again. + fs::write( + temp.path().join("people.gq"), + r#" +query find_person($name: String) { + match { $p: Person { name: $name } } + return { $p.name } +} +"#, + ) + .unwrap(); + let apply_edit = cluster_json(temp.path(), "apply"); + assert_eq!(apply_edit["applied_count"], 1, "{apply_edit}"); + assert_eq!(apply_edit["converged"], true, "{apply_edit}"); + + let final_apply = cluster_json(temp.path(), "apply"); + assert_eq!(final_apply["state_written"], false, "{final_apply}"); + assert!(final_apply["changes"].as_array().unwrap().is_empty()); +} + +/// The operator workflow across the Stage 3A boundary: a schema change is +/// deferred by cluster apply, executed by `omnigraph schema apply` against +/// the graph, picked up by `cluster refresh`, and the next apply re-converges. +#[test] +fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["converged"], true, "{apply}"); + + // Additive schema change: cluster apply must defer it loudly, not act. + fs::write( + temp.path().join("people.pg"), + r#" +node Person { + name: String @key + age: I32? + bio: String? +} +"#, + ) + .unwrap(); + let deferred = cluster_json(temp.path(), "apply"); + assert_eq!(deferred["ok"], true, "{deferred}"); + assert_eq!(deferred["applied_count"], 0, "{deferred}"); + assert_eq!(deferred["converged"], false, "{deferred}"); + assert!( + deferred["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "apply_unsupported_change"), + "{deferred}" + ); + + // The graph-plane tool applies the migration... + output_success( + cli() + .arg("schema") + .arg("apply") + .arg(temp.path().join("graphs/knowledge.omni")) + .arg("--schema") + .arg(temp.path().join("people.pg")) + .arg("--json"), + ); + // ...refresh observes it... + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + // ...and the control plane re-converges. + let reconverge = cluster_json(temp.path(), "apply"); + assert_eq!(reconverge["ok"], true, "{reconverge}"); + assert_eq!(reconverge["converged"], true, "{reconverge}"); + let replan = cluster_json(temp.path(), "plan"); + assert!( + replan["changes"].as_array().unwrap().is_empty(), + "after schema apply + refresh + apply, the plan must be empty: {replan}" + ); +} + +/// Lock-recovery composition: a held lock refuses apply, force-unlock clears +/// it, and the retried apply converges. +#[test] +fn cluster_e2e_force_unlock_unblocks_apply() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + write_cluster_applyable_state(temp.path()); + write_cluster_lock(temp.path(), "stuck-lock", "apply"); + + let refused = parse_stdout_json(&output_failure( + cli() + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(refused["ok"], false); + + let unlocked = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("force-unlock") + .arg("stuck-lock") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(unlocked["lock_removed"], true, "{unlocked}"); + + let retried = cluster_json(temp.path(), "apply"); + assert_eq!(retried["ok"], true, "{retried}"); + assert_eq!(retried["converged"], true, "{retried}"); +} + #[test] fn short_version_flag_prints_current_cli_version() { let output = output_success(cli().arg("-v"));