test(cli): cluster lifecycle e2e — real-graph import/apply/refresh, schema-change loop, force-unlock retry

Three composition tests over the spawned binary against a real derived graph:

- import -> plan (dispositions) -> apply -> status -> refresh -> plan-empty,
  then a query edit round-trip. Pins that refresh and apply recompute the
  graph composite digest identically — divergence would silently re-open
  the plan forever and no single-command test would catch it.
- The Stage 3A operator workflow across the control/data-plane boundary:
  cluster apply defers a schema change, omnigraph schema apply executes it,
  cluster refresh observes it, the next cluster apply re-converges.
- Held lock refuses apply, force-unlock clears it, retried apply converges.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-09 23:44:49 +03:00
parent 40a21e4e77
commit d870eaaf3f

View file

@ -890,6 +890,189 @@ fn cluster_apply_locked_exits_nonzero() {
assert!(!temp.path().join("__cluster/resources").exists());
}
fn cluster_json(root: &std::path::Path, command: &str) -> serde_json::Value {
parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg(command)
.arg("--config")
.arg(root)
.arg("--json"),
))
}
/// End-to-end lifecycle against a REAL derived graph: import observes the live
/// graph, plan/apply converge the query+policy catalog, status reports it,
/// refresh re-observes without un-converging, and a query edit round-trips.
/// This is the composition test — every step passes individually elsewhere;
/// this catches the seams (e.g. refresh and apply recomputing the graph
/// composite digest differently would silently re-open the plan forever).
#[test]
fn cluster_e2e_lifecycle_import_apply_status_refresh_converges() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
assert_eq!(import["state_observations"]["state_revision"], 1);
let plan = cluster_json(temp.path(), "plan");
let changes = plan["changes"].as_array().unwrap();
assert_eq!(changes.len(), 3, "{plan}");
let disposition_of = |resource: &str| {
changes
.iter()
.find(|change| change["resource"] == resource)
.unwrap_or_else(|| panic!("missing change for {resource}: {plan}"))["disposition"]
.clone()
};
assert_eq!(disposition_of("graph.knowledge"), "derived");
assert_eq!(disposition_of("query.knowledge.find_person"), "applied");
assert_eq!(disposition_of("policy.base"), "applied");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["ok"], true, "{apply}");
assert_eq!(apply["applied_count"], 2, "{apply}");
assert_eq!(apply["converged"], true, "{apply}");
let status = cluster_json(temp.path(), "status");
assert_eq!(
status["resource_statuses"]["query.knowledge.find_person"]["status"],
"applied"
);
assert_eq!(status["resource_statuses"]["policy.base"]["status"], "applied");
assert!(
status["state_observations"]["applied_config_digest"].is_string(),
"converged apply must record the applied config digest: {status}"
);
// Refresh re-observes the live graph; it must not undo apply's work.
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
let replan = cluster_json(temp.path(), "plan");
assert!(
replan["changes"].as_array().unwrap().is_empty(),
"refresh after a converged apply must not re-open the plan: {replan}"
);
// A query edit round-trips: plan update -> apply -> converged again.
fs::write(
temp.path().join("people.gq"),
r#"
query find_person($name: String) {
match { $p: Person { name: $name } }
return { $p.name }
}
"#,
)
.unwrap();
let apply_edit = cluster_json(temp.path(), "apply");
assert_eq!(apply_edit["applied_count"], 1, "{apply_edit}");
assert_eq!(apply_edit["converged"], true, "{apply_edit}");
let final_apply = cluster_json(temp.path(), "apply");
assert_eq!(final_apply["state_written"], false, "{final_apply}");
assert!(final_apply["changes"].as_array().unwrap().is_empty());
}
/// The operator workflow across the Stage 3A boundary: a schema change is
/// deferred by cluster apply, executed by `omnigraph schema apply` against
/// the graph, picked up by `cluster refresh`, and the next apply re-converges.
#[test]
fn cluster_e2e_schema_change_defers_until_schema_apply_and_refresh() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
// Additive schema change: cluster apply must defer it loudly, not act.
fs::write(
temp.path().join("people.pg"),
r#"
node Person {
name: String @key
age: I32?
bio: String?
}
"#,
)
.unwrap();
let deferred = cluster_json(temp.path(), "apply");
assert_eq!(deferred["ok"], true, "{deferred}");
assert_eq!(deferred["applied_count"], 0, "{deferred}");
assert_eq!(deferred["converged"], false, "{deferred}");
assert!(
deferred["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "apply_unsupported_change"),
"{deferred}"
);
// The graph-plane tool applies the migration...
output_success(
cli()
.arg("schema")
.arg("apply")
.arg(temp.path().join("graphs/knowledge.omni"))
.arg("--schema")
.arg(temp.path().join("people.pg"))
.arg("--json"),
);
// ...refresh observes it...
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
// ...and the control plane re-converges.
let reconverge = cluster_json(temp.path(), "apply");
assert_eq!(reconverge["ok"], true, "{reconverge}");
assert_eq!(reconverge["converged"], true, "{reconverge}");
let replan = cluster_json(temp.path(), "plan");
assert!(
replan["changes"].as_array().unwrap().is_empty(),
"after schema apply + refresh + apply, the plan must be empty: {replan}"
);
}
/// Lock-recovery composition: a held lock refuses apply, force-unlock clears
/// it, and the retried apply converges.
#[test]
fn cluster_e2e_force_unlock_unblocks_apply() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_applyable_state(temp.path());
write_cluster_lock(temp.path(), "stuck-lock", "apply");
let refused = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(refused["ok"], false);
let unlocked = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("force-unlock")
.arg("stuck-lock")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(unlocked["lock_removed"], true, "{unlocked}");
let retried = cluster_json(temp.path(), "apply");
assert_eq!(retried["ok"], true, "{retried}");
assert_eq!(retried["converged"], true, "{retried}");
}
#[test]
fn short_version_flag_prints_current_cli_version() {
let output = output_success(cli().arg("-v"));