diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 971ffff..42bbed8 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ - DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, - ValidateOutput, force_unlock_config_dir, import_config_dir, plan_config_dir, + ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, + ValidateOutput, apply_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; @@ -361,6 +361,16 @@ enum ClusterCommand { #[arg(long)] json: bool, }, + /// Apply the config-only (query/policy) subset of the plan to the local + /// cluster catalog. Graph/schema changes are deferred to a later stage. + Apply { + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, /// Read the local JSON state ledger without scanning live graph resources. Status { /// Cluster config directory containing cluster.yaml. @@ -804,6 +814,40 @@ fn print_cluster_plan_human(output: &PlanOutput) { print_cluster_diagnostics(&output.diagnostics); } +fn print_cluster_apply_human(output: &ApplyOutput) { + if output.ok { + println!( + "cluster apply: {} applied, {} deferred/blocked", + output.applied_count, output.deferred_count + ); + for change in &output.changes { + match (&change.disposition, change.reason.as_deref()) { + (Some(disposition), Some(reason)) => println!( + " {:?} {} [{disposition:?}: {reason}]", + change.operation, change.resource + ), + (Some(disposition), None) => println!( + " {:?} {} [{disposition:?}]", + change.operation, change.resource + ), + _ => println!(" {:?} {}", change.operation, change.resource), + } + } + if output.changes.is_empty() { + println!(" no changes"); + } + let state = &output.state_observations; + println!( + " state: revision {}, converged: {}, written: {}", + state.state_revision, output.converged, output.state_written + ); + println!(" note: applied = recorded in the cluster catalog; the server still boots from omnigraph.yaml"); + } else { + println!("cluster apply failed"); + } + print_cluster_diagnostics(&output.diagnostics); +} + fn print_cluster_status_human(output: &StatusOutput) { if output.ok { let state = &output.state_observations; @@ -935,6 +979,19 @@ fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> { Ok(()) } +fn finish_cluster_apply(output: &ApplyOutput, json: bool) -> Result<()> { + if json { + print_json(output)?; + } else { + print_cluster_apply_human(output); + } + if !output.ok { + io::stdout().flush()?; + std::process::exit(1); + } + Ok(()) +} + fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> { if json { print_json(output)?; @@ -3492,6 +3549,10 @@ async fn main() -> Result<()> { let output = plan_config_dir(config); finish_cluster_plan(&output, json)?; } + ClusterCommand::Apply { config, json } => { + let output = apply_config_dir(config); + finish_cluster_apply(&output, json)?; + } ClusterCommand::Status { config, json } => { let output = status_config_dir(config); finish_cluster_status(&output, json)?; diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 1dd26a7..9dbf250 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -754,6 +754,142 @@ fn cluster_validate_invalid_config_exits_nonzero() { assert!(stdout.contains("future_phase_field"), "{stdout}"); } +/// Seed an applyable state: schema digest borrowed from `cluster validate`, +/// graph entry present (composite recomputed by apply), queries/policies +/// pending. +fn write_cluster_applyable_state(root: &std::path::Path) -> serde_json::Value { + let validate = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("validate") + .arg("--config") + .arg(root) + .arg("--json"), + )); + let schema_digest = validate["resource_digests"]["schema.knowledge"] + .as_str() + .unwrap() + .to_string(); + let state_dir = root.join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + format!( + r#"{{ + "version": 1, + "state_revision": 1, + "applied_revision": {{ + "resources": {{ + "graph.knowledge": {{ "digest": "seed" }}, + "schema.knowledge": {{ "digest": "{schema_digest}" }} + }} + }} +}} +"# + ), + ) + .unwrap(); + validate +} + +#[test] +fn cluster_apply_json_applies_query_and_policy() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + let validate = write_cluster_applyable_state(temp.path()); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true, "{json}"); + assert_eq!(json["applied_count"], 2, "{json}"); + assert_eq!(json["converged"], true, "{json}"); + assert_eq!(json["state_written"], true, "{json}"); + assert_eq!( + json["resource_statuses"]["query.knowledge.find_person"]["status"], + "applied" + ); + + let query_digest = validate["resource_digests"]["query.knowledge.find_person"] + .as_str() + .unwrap(); + let payload = temp + .path() + .join("__cluster/resources/query/knowledge/find_person") + .join(format!("{query_digest}.gq")); + assert!(payload.exists(), "missing payload {}", payload.display()); + + let state: serde_json::Value = serde_json::from_str( + &fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(), + ) + .unwrap(); + assert_eq!(state["state_revision"], 2); + assert_eq!( + state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"], + *query_digest + ); +} + +#[test] +fn cluster_apply_missing_state_exits_nonzero() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let output = output_failure( + cli() + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + ); + let json = parse_stdout_json(&output); + assert_eq!(json["ok"], false); + assert!( + json["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_missing"), + "{json}" + ); + assert!(!temp.path().join("__cluster/resources").exists()); +} + +#[test] +fn cluster_apply_locked_exits_nonzero() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + write_cluster_applyable_state(temp.path()); + write_cluster_lock(temp.path(), "held-lock", "plan"); + + let output = output_failure( + cli() + .arg("cluster") + .arg("apply") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + ); + let json = parse_stdout_json(&output); + assert_eq!(json["ok"], false); + assert!( + json["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_lock_held"), + "{json}" + ); + assert!(temp.path().join("__cluster/lock.json").exists()); + assert!(!temp.path().join("__cluster/resources").exists()); +} + #[test] fn short_version_flag_prints_current_cli_version() { let output = output_success(cli().arg("-v"));