feat(cluster): add read-only validate and plan

This commit is contained in:
aaltshuler 2026-06-08 20:07:39 +03:00
parent ab5f3b878a
commit 043b02e617
12 changed files with 1764 additions and 33 deletions

View file

@ -15,6 +15,7 @@ path = "src/main.rs"
[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.6.1" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" }
omnigraph-server = { path = "../omnigraph-server", version = "0.6.1" }
clap = { workspace = true }

View file

@ -10,6 +10,9 @@ use color_eyre::eyre::{Result, bail};
use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
use omnigraph_cluster::{
DiagnosticSeverity, PlanOutput, ValidateOutput, plan_config_dir, validate_config_dir,
};
use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::schema::parser::parse_schema;
use omnigraph_compiler::{
@ -305,6 +308,11 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Validate and plan read-only cluster configuration.
Cluster {
#[command(subcommand)]
command: ClusterCommand,
},
/// Manage graphs on a multi-graph server (MR-668)
Graphs {
#[command(subcommand)]
@ -312,6 +320,28 @@ enum Command {
},
}
#[derive(Debug, Subcommand)]
enum ClusterCommand {
/// Validate cluster.yaml and referenced schemas, queries, and policy files.
Validate {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Produce a read-only plan by diffing cluster.yaml against __cluster/state.json.
Plan {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
}
/// Operations on the graph registry of a multi-graph server (MR-668).
///
/// All operations target a remote multi-graph server URL (http:// or
@ -683,6 +713,77 @@ fn print_json<T: Serialize>(value: &T) -> Result<()> {
Ok(())
}
fn print_cluster_validate_human(output: &ValidateOutput) {
if output.ok {
println!(
"cluster config valid: {} resource(s), {} dependency edge(s)",
output.resources.len(),
output.dependencies.len()
);
} else {
println!("cluster config invalid");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_plan_human(output: &PlanOutput) {
if output.ok {
println!(
"cluster plan: {} change(s), {} approval gate(s)",
output.changes.len(),
output.approvals_required.len()
);
for change in &output.changes {
println!(" {:?} {}", change.operation, change.resource);
}
if output.changes.is_empty() {
println!(" no changes");
}
} else {
println!("cluster plan failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) {
for diagnostic in diagnostics {
let label = match diagnostic.severity {
DiagnosticSeverity::Error => "ERROR",
DiagnosticSeverity::Warning => "WARN ",
};
println!(
"{label} {} {}: {}",
diagnostic.code, diagnostic.path, diagnostic.message
);
}
}
fn finish_cluster_validate(output: &ValidateOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_validate_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_plan_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn is_remote_uri(uri: &str) -> bool {
uri.starts_with("http://") || uri.starts_with("https://")
}
@ -801,13 +902,11 @@ struct ResolvedPolicyContext {
fn resolve_policy_context(config: &OmnigraphConfig) -> Result<ResolvedPolicyContext> {
let selected = config.resolve_policy_tooling_graph_selection()?;
let policy_file = config
.resolve_policy_file_for(selected)
.ok_or_else(|| {
color_eyre::eyre::eyre!(
"policy.file or graphs.<name>.policy.file must be set in omnigraph.yaml"
)
})?;
let policy_file = config.resolve_policy_file_for(selected).ok_or_else(|| {
color_eyre::eyre::eyre!(
"policy.file or graphs.<name>.policy.file must be set in omnigraph.yaml"
)
})?;
let graph_id = match selected {
Some(name) => graph_resource_id_for_selection(Some(name), ""),
None => graph_resource_id_for_selection(None, "default"),
@ -2166,16 +2265,14 @@ fn rewrite_deprecated_argv(args: Vec<OsString>) -> Vec<OsString> {
}
if let Some(sub) = args.get(1).and_then(|s| s.to_str()) {
match sub {
"read" => eprintln!(
"warning: `omnigraph read` is deprecated; use `omnigraph query` instead"
),
"read" => {
eprintln!("warning: `omnigraph read` is deprecated; use `omnigraph query` instead")
}
"change" => eprintln!(
"warning: `omnigraph change` is deprecated; use `omnigraph mutate` instead"
),
"check" => {
eprintln!(
"warning: `omnigraph check` is deprecated; use `omnigraph lint` instead"
);
eprintln!("warning: `omnigraph check` is deprecated; use `omnigraph lint` instead");
// Rewrite the top-level subcommand to `lint`; pass through the rest.
let mut out = Vec::with_capacity(args.len());
out.push(args[0].clone());
@ -3111,6 +3208,16 @@ async fn main() -> Result<()> {
}
}
}
Command::Cluster { command } => match command {
ClusterCommand::Validate { config, json } => {
let output = validate_config_dir(config);
finish_cluster_validate(&output, json)?;
}
ClusterCommand::Plan { config, json } => {
let output = plan_config_dir(config);
finish_cluster_plan(&output, json)?;
}
},
Command::Graphs { command } => match command {
GraphsCommand::List {
uri,
@ -3157,8 +3264,8 @@ mod tests {
use super::{
DEFAULT_BEARER_TOKEN_ENV, apply_bearer_token, bearer_token_from_env_file,
legacy_change_request_body, load_cli_config, load_env_file_into_process,
normalize_bearer_token, parse_env_assignment, resolve_policy_context,
resolve_cli_graph, resolve_remote_bearer_token,
normalize_bearer_token, parse_env_assignment, resolve_cli_graph, resolve_policy_context,
resolve_remote_bearer_token,
};
use omnigraph_server::load_config;
use reqwest::header::AUTHORIZATION;
@ -3420,7 +3527,8 @@ graphs:
}
#[test]
fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri() {
fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri()
{
let temp = tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(

View file

@ -78,6 +78,52 @@ policy:
(config, policy)
}
fn write_cluster_config_fixture(root: &std::path::Path) {
fs::write(
root.join("people.pg"),
r#"
node Person {
name: String @key
age: I32?
}
"#,
)
.unwrap();
fs::write(
root.join("people.gq"),
r#"
query find_person($name: String) {
match { $p: Person { name: $name } }
return { $p.name, $p.age }
}
"#,
)
.unwrap();
fs::write(root.join("base.policy.yaml"), "rules: []\n").unwrap();
fs::write(
root.join("cluster.yaml"),
r#"
version: 1
metadata:
name: company-brain
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
base:
file: ./base.policy.yaml
applies_to: [knowledge]
"#,
)
.unwrap();
}
#[test]
fn version_command_prints_current_cli_version() {
let output = output_success(cli().arg("version"));
@ -89,6 +135,105 @@ fn version_command_prints_current_cli_version() {
);
}
#[test]
fn cluster_validate_config_success() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path()),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("cluster config valid"), "{stdout}");
}
#[test]
fn cluster_validate_json_is_stable() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert!(json["resource_digests"]["graph.knowledge"].is_string());
assert!(json["resource_digests"]["query.knowledge.find_person"].is_string());
assert_eq!(json["dependencies"][0]["from"], "policy.base");
assert_eq!(json["dependencies"][0]["to"], "graph.knowledge");
}
#[test]
fn cluster_plan_json_reads_inferred_local_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"applied_revision": {
"config_digest": "old",
"resources": {
"graph.knowledge": { "digest": "old-graph" },
"policy.old": { "digest": "old-policy" }
}
}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_found"], true);
assert!(
json["changes"]
.as_array()
.unwrap()
.iter()
.any(|change| change["resource"] == "policy.old" && change["operation"] == "delete"),
"plan should read state and delete stale resources: {json}"
);
}
#[test]
fn cluster_validate_invalid_config_exits_nonzero() {
let temp = tempdir().unwrap();
fs::write(
temp.path().join("cluster.yaml"),
"version: 1\ngraphs: {}\npipelines: {}\n",
)
.unwrap();
let output = output_failure(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path()),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("future_phase_field"), "{stdout}");
}
#[test]
fn short_version_flag_prints_current_cli_version() {
let output = output_success(cli().arg("-v"));
@ -798,8 +943,7 @@ fn deprecated_read_and_change_subcommands_emit_warnings() {
let output = cli().arg("read").output().unwrap();
let stderr = String::from_utf8(output.stderr).unwrap();
assert!(
stderr.contains("`omnigraph read` is deprecated")
&& stderr.contains("`omnigraph query`"),
stderr.contains("`omnigraph read` is deprecated") && stderr.contains("`omnigraph query`"),
"expected `omnigraph read` deprecation warning; got: {stderr}"
);
@ -2394,9 +2538,19 @@ fn queries_validate_exits_zero_on_clean_registry() {
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "find_person", "find_person.gq"),
&queries_test_config(
&graph.path().to_string_lossy(),
"find_person",
"find_person.gq",
),
);
let output = output_success(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let output = output_success(cli().arg("queries").arg("validate").arg("--config").arg(&config));
let stdout = stdout_string(&output);
assert!(stdout.contains("OK"), "stdout:\n{stdout}");
}
@ -2405,12 +2559,21 @@ fn queries_validate_exits_zero_on_clean_registry() {
fn queries_validate_exits_nonzero_on_type_broken_query() {
let graph = SystemGraph::loaded();
// `Widget` is not in the fixture schema.
graph.write_query("ghost.gq", "query ghost() { match { $w: Widget } return { $w.name } }");
graph.write_query(
"ghost.gq",
"query ghost() { match { $w: Widget } return { $w.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "ghost", "ghost.gq"),
);
let output = output_failure(cli().arg("queries").arg("validate").arg("--config").arg(&config));
let output = output_failure(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(
stdout.contains("ghost"),
@ -2444,7 +2607,13 @@ fn queries_list_prints_registered_query() {
graph.path().to_string_lossy().replace('\'', "''")
),
);
let output = output_success(cli().arg("queries").arg("list").arg("--config").arg(&config));
let output = output_success(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("find_person"), "stdout:\n{stdout}");
assert!(
@ -2480,7 +2649,13 @@ fn queries_list_requires_graph_selection_for_per_graph_only_registries() {
),
);
let output = output_failure(cli().arg("queries").arg("list").arg("--config").arg(&config));
let output = output_failure(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("local") && stderr.contains("--target local"),
@ -2505,7 +2680,13 @@ fn queries_list_without_graph_selection_lists_top_level_registry() {
),
);
let output = output_success(cli().arg("queries").arg("list").arg("--config").arg(&config));
let output = output_success(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("top_find"), "stdout:\n{stdout}");
}
@ -2524,7 +2705,11 @@ fn queries_list_unknown_target_errors() {
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "find_person", "find_person.gq"),
&queries_test_config(
&graph.path().to_string_lossy(),
"find_person",
"find_person.gq",
),
);
let output = output_failure(
cli()
@ -2566,7 +2751,7 @@ fn queries_commands_reject_named_graph_with_populated_top_level_block() {
" file: ./find_person.gq\n",
"cli:\n",
" graph: local\n",
"queries:\n", // populated top-level block: the coherence violation
"queries:\n", // populated top-level block: the coherence violation
" legacy:\n",
" file: ./legacy.gq\n",
"policy: {{}}\n",
@ -2592,8 +2777,14 @@ fn queries_validate_exits_nonzero_on_duplicate_tool_name() {
// collision — `queries validate` must fail (offline, before the engine
// opens) and name both queries plus the contested tool.
let graph = SystemGraph::loaded();
graph.write_query("a.gq", "query a() { match { $p: Person } return { $p.name } }");
graph.write_query("b.gq", "query b() { match { $p: Person } return { $p.name } }");
graph.write_query(
"a.gq",
"query a() { match { $p: Person } return { $p.name } }",
);
graph.write_query(
"b.gq",
"query b() { match { $p: Person } return { $p.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&format!(
@ -2615,7 +2806,13 @@ fn queries_validate_exits_nonzero_on_duplicate_tool_name() {
graph.path().to_string_lossy().replace('\'', "''")
),
);
let output = output_failure(cli().arg("queries").arg("validate").arg("--config").arg(&config));
let output = output_failure(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("dup") && stderr.contains("'a'") && stderr.contains("'b'"),
@ -2635,7 +2832,10 @@ fn queries_validate_positional_uri_ignores_default_graph() {
);
// `Widget` is not in the fixture schema — the default graph's per-graph
// query would break validate if it were (wrongly) selected.
graph.write_query("broken.gq", "query broken() { match { $w: Widget } return { $w.name } }");
graph.write_query(
"broken.gq",
"query broken() { match { $w: Widget } return { $w.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
concat!(