diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 9dbf250..30fa796 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -890,6 +890,189 @@ fn cluster_apply_locked_exits_nonzero() { assert!(!temp.path().join("__cluster/resources").exists()); } +fn cluster_json(root: &std::path::Path, command: &str) -> serde_json::Value { + parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg(command) + .arg("--config") + .arg(root) + .arg("--json"), + )) +} + +/// End-to-end lifecycle against a REAL derived graph: import observes the live +/// graph, plan/apply converge the query+policy catalog, status reports it, +/// refresh re-observes without un-converging, and a query edit round-trips. +/// This is the composition test — every step passes individually elsewhere; +/// this catches the seams (e.g. refresh and apply recomputing the graph +/// composite digest differently would silently re-open the plan forever). +#[test] +fn cluster_e2e_lifecycle_import_apply_status_refresh_converges() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + assert_eq!(import["state_observations"]["state_revision"], 1); + + let plan = cluster_json(temp.path(), "plan"); + let changes = plan["changes"].as_array().unwrap(); + assert_eq!(changes.len(), 3, "{plan}"); + let disposition_of = |resource: &str| { + changes + .iter() + .find(|change| change["resource"] == resource) + .unwrap_or_else(|| panic!("missing change for {resource}: {plan}"))["disposition"] + .clone() + }; + assert_eq!(disposition_of("graph.knowledge"), "derived"); + assert_eq!(disposition_of("query.knowledge.find_person"), "applied"); + assert_eq!(disposition_of("policy.base"), "applied"); + + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["ok"], true, "{apply}"); + assert_eq!(apply["applied_count"], 2, "{apply}"); + assert_eq!(apply["converged"], true, "{apply}"); + + let status = cluster_json(temp.path(), "status"); + assert_eq!( + status["resource_statuses"]["query.knowledge.find_person"]["status"], + "applied" + ); + assert_eq!(status["resource_statuses"]["policy.base"]["status"], "applied"); + assert!( + status["state_observations"]["applied_config_digest"].is_string(), + "converged apply must record the applied config digest: {status}" + ); + + // Refresh re-observes the live graph; it must not undo apply's work. + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + let replan = cluster_json(temp.path(), "plan"); + assert!( + replan["changes"].as_array().unwrap().is_empty(), + "refresh after a converged apply must not re-open the plan: {replan}" + ); + + // A query edit round-trips: plan update -> apply -> converged again. + fs::write( + temp.path().join("people.gq"), + r#" +query find_person($name: String) { + match { $p: Person { name: $name } } + return { $p.name } +} +"#, + ) + .unwrap(); + let apply_edit = cluster_json(temp.path(), "apply"); + assert_eq!(apply_edit["applied_count"], 1, "{apply_edit}"); + assert_eq!(apply_edit["converged"], true, "{apply_edit}"); + + let final_apply = cluster_json(temp.path(), "apply"); + assert_eq!(final_apply["state_written"], false, "{final_apply}"); + assert!(final_apply["changes"].as_array().unwrap().is_empty()); +} + +/// The operator workflow across the Stage 3A boundary: a schema change is +/// deferred by cluster apply, executed by `omnigraph schema apply` against +/// the graph, picked up by `cluster refresh`, and the next apply re-converges. +#[test] +fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + let import = cluster_json(temp.path(), "import"); + assert_eq!(import["ok"], true, "{import}"); + let apply = cluster_json(temp.path(), "apply"); + assert_eq!(apply["converged"], true, "{apply}"); + + // Additive schema change: cluster apply must defer it loudly, not act. + fs::write( + temp.path().join("people.pg"), + r#" +node Person { + name: String @key + age: I32? + bio: String? +} +"#, + ) + .unwrap(); + let deferred = cluster_json(temp.path(), "apply"); + assert_eq!(deferred["ok"], true, "{deferred}"); + assert_eq!(deferred["applied_count"], 0, "{deferred}"); + assert_eq!(deferred["converged"], false, "{deferred}"); + assert!( + deferred["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "apply_unsupported_change"), + "{deferred}" + ); + + // The graph-plane tool applies the migration... + output_success( + cli() + .arg("schema") + .arg("apply") + .arg(temp.path().join("graphs/knowledge.omni")) + .arg("--schema") + .arg(temp.path().join("people.pg")) + .arg("--json"), + ); + // ...refresh observes it... + let refresh = cluster_json(temp.path(), "refresh"); + assert_eq!(refresh["ok"], true, "{refresh}"); + // ...and the control plane re-converges. + let reconverge = cluster_json(temp.path(), "apply"); + assert_eq!(reconverge["ok"], true, "{reconverge}"); + assert_eq!(reconverge["converged"], true, "{reconverge}"); + let replan = cluster_json(temp.path(), "plan"); + assert!( + replan["changes"].as_array().unwrap().is_empty(), + "after schema apply + refresh + apply, the plan must be empty: {replan}" + ); +} + +/// Lock-recovery composition: a held lock refuses apply, force-unlock clears +/// it, and the retried apply converges. +#[test] +fn cluster_e2e_force_unlock_unblocks_apply() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + write_cluster_applyable_state(temp.path()); + write_cluster_lock(temp.path(), "stuck-lock", "apply"); + + let refused = parse_stdout_json(&output_failure( + cli() + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(refused["ok"], false); + + let unlocked = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("force-unlock") + .arg("stuck-lock") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(unlocked["lock_removed"], true, "{unlocked}"); + + let retried = cluster_json(temp.path(), "apply"); + assert_eq!(retried["ok"], true, "{retried}"); + assert_eq!(retried["converged"], true, "{retried}"); +} + #[test] fn short_version_flag_prints_current_cli_version() { let output = output_success(cli().arg("-v"));