From 043b02e6179629fab79b68923c1ddd3bae401138 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Mon, 8 Jun 2026 20:07:39 +0300 Subject: [PATCH] feat(cluster): add read-only validate and plan --- AGENTS.md | 2 +- Cargo.lock | 14 + Cargo.toml | 1 + crates/omnigraph-cli/Cargo.toml | 1 + crates/omnigraph-cli/src/main.rs | 140 ++- crates/omnigraph-cli/tests/cli.rs | 230 ++++- crates/omnigraph-cluster/Cargo.toml | 20 + crates/omnigraph-cluster/src/lib.rs | 1275 +++++++++++++++++++++++++++ docs/dev/testing.md | 1 + docs/user/cli-reference.md | 17 +- docs/user/cluster-config.md | 95 ++ docs/user/index.md | 1 + 12 files changed, 1764 insertions(+), 33 deletions(-) create mode 100644 crates/omnigraph-cluster/Cargo.toml create mode 100644 crates/omnigraph-cluster/src/lib.rs create mode 100644 docs/user/cluster-config.md diff --git a/AGENTS.md b/AGENTS.md index b876749..26172ff 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -17,7 +17,7 @@ Tools that support `@`-imports (Claude Code) auto-include all three files via th `CLAUDE.md` is a symlink to this file — there is exactly one source of truth. Edit `AGENTS.md`. **Version surveyed:** 0.6.1 -**Workspace crates:** `omnigraph-compiler`, `omnigraph` (engine), `omnigraph-policy`, `omnigraph-cli`, `omnigraph-server` +**Workspace crates:** `omnigraph-compiler`, `omnigraph` (engine), `omnigraph-policy`, `omnigraph-cluster`, `omnigraph-cli`, `omnigraph-server` **Storage substrate:** Lance 6.x (columnar, versioned, branchable) **License:** MIT **Toolchain:** Rust stable, edition 2024 diff --git a/Cargo.lock b/Cargo.lock index 3223b9c..2ee6b7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4550,6 +4550,7 @@ dependencies = [ "color-eyre", "lance", "lance-index", + "omnigraph-cluster", "omnigraph-compiler", "omnigraph-engine", "omnigraph-policy", @@ -4563,6 +4564,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "omnigraph-cluster" +version = "0.6.1" +dependencies = [ + "omnigraph-compiler", + "serde", + "serde_json", + "serde_yaml", + "sha2", + "tempfile", + "thiserror", +] + [[package]] name = "omnigraph-compiler" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index 66bfc01..17990ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/omnigraph-compiler", "crates/omnigraph", "crates/omnigraph-cli", + "crates/omnigraph-cluster", "crates/omnigraph-policy", "crates/omnigraph-server", ] diff --git a/crates/omnigraph-cli/Cargo.toml b/crates/omnigraph-cli/Cargo.toml index 641068e..bc50551 100644 --- a/crates/omnigraph-cli/Cargo.toml +++ b/crates/omnigraph-cli/Cargo.toml @@ -15,6 +15,7 @@ path = "src/main.rs" [dependencies] omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" } omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" } +omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.6.1" } omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" } omnigraph-server = { path = "../omnigraph-server", version = "0.6.1" } clap = { workspace = true } diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 29b55c4..23f1569 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -10,6 +10,9 @@ use color_eyre::eyre::{Result, bail}; use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; +use omnigraph_cluster::{ + DiagnosticSeverity, PlanOutput, ValidateOutput, plan_config_dir, validate_config_dir, +}; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::schema::parser::parse_schema; use omnigraph_compiler::{ @@ -305,6 +308,11 @@ enum Command { #[arg(long)] json: bool, }, + /// Validate and plan read-only cluster configuration. + Cluster { + #[command(subcommand)] + command: ClusterCommand, + }, /// Manage graphs on a multi-graph server (MR-668) Graphs { #[command(subcommand)] @@ -312,6 +320,28 @@ enum Command { }, } +#[derive(Debug, Subcommand)] +enum ClusterCommand { + /// Validate cluster.yaml and referenced schemas, queries, and policy files. + Validate { + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, + /// Produce a read-only plan by diffing cluster.yaml against __cluster/state.json. + Plan { + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, +} + /// Operations on the graph registry of a multi-graph server (MR-668). /// /// All operations target a remote multi-graph server URL (http:// or @@ -683,6 +713,77 @@ fn print_json(value: &T) -> Result<()> { Ok(()) } +fn print_cluster_validate_human(output: &ValidateOutput) { + if output.ok { + println!( + "cluster config valid: {} resource(s), {} dependency edge(s)", + output.resources.len(), + output.dependencies.len() + ); + } else { + println!("cluster config invalid"); + } + print_cluster_diagnostics(&output.diagnostics); +} + +fn print_cluster_plan_human(output: &PlanOutput) { + if output.ok { + println!( + "cluster plan: {} change(s), {} approval gate(s)", + output.changes.len(), + output.approvals_required.len() + ); + for change in &output.changes { + println!(" {:?} {}", change.operation, change.resource); + } + if output.changes.is_empty() { + println!(" no changes"); + } + } else { + println!("cluster plan failed"); + } + print_cluster_diagnostics(&output.diagnostics); +} + +fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) { + for diagnostic in diagnostics { + let label = match diagnostic.severity { + DiagnosticSeverity::Error => "ERROR", + DiagnosticSeverity::Warning => "WARN ", + }; + println!( + "{label} {} {}: {}", + diagnostic.code, diagnostic.path, diagnostic.message + ); + } +} + +fn finish_cluster_validate(output: &ValidateOutput, json: bool) -> Result<()> { + if json { + print_json(output)?; + } else { + print_cluster_validate_human(output); + } + if !output.ok { + io::stdout().flush()?; + std::process::exit(1); + } + Ok(()) +} + +fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> { + if json { + print_json(output)?; + } else { + print_cluster_plan_human(output); + } + if !output.ok { + io::stdout().flush()?; + std::process::exit(1); + } + Ok(()) +} + fn is_remote_uri(uri: &str) -> bool { uri.starts_with("http://") || uri.starts_with("https://") } @@ -801,13 +902,11 @@ struct ResolvedPolicyContext { fn resolve_policy_context(config: &OmnigraphConfig) -> Result { let selected = config.resolve_policy_tooling_graph_selection()?; - let policy_file = config - .resolve_policy_file_for(selected) - .ok_or_else(|| { - color_eyre::eyre::eyre!( - "policy.file or graphs..policy.file must be set in omnigraph.yaml" - ) - })?; + let policy_file = config.resolve_policy_file_for(selected).ok_or_else(|| { + color_eyre::eyre::eyre!( + "policy.file or graphs..policy.file must be set in omnigraph.yaml" + ) + })?; let graph_id = match selected { Some(name) => graph_resource_id_for_selection(Some(name), ""), None => graph_resource_id_for_selection(None, "default"), @@ -2166,16 +2265,14 @@ fn rewrite_deprecated_argv(args: Vec) -> Vec { } if let Some(sub) = args.get(1).and_then(|s| s.to_str()) { match sub { - "read" => eprintln!( - "warning: `omnigraph read` is deprecated; use `omnigraph query` instead" - ), + "read" => { + eprintln!("warning: `omnigraph read` is deprecated; use `omnigraph query` instead") + } "change" => eprintln!( "warning: `omnigraph change` is deprecated; use `omnigraph mutate` instead" ), "check" => { - eprintln!( - "warning: `omnigraph check` is deprecated; use `omnigraph lint` instead" - ); + eprintln!("warning: `omnigraph check` is deprecated; use `omnigraph lint` instead"); // Rewrite the top-level subcommand to `lint`; pass through the rest. let mut out = Vec::with_capacity(args.len()); out.push(args[0].clone()); @@ -3111,6 +3208,16 @@ async fn main() -> Result<()> { } } } + Command::Cluster { command } => match command { + ClusterCommand::Validate { config, json } => { + let output = validate_config_dir(config); + finish_cluster_validate(&output, json)?; + } + ClusterCommand::Plan { config, json } => { + let output = plan_config_dir(config); + finish_cluster_plan(&output, json)?; + } + }, Command::Graphs { command } => match command { GraphsCommand::List { uri, @@ -3157,8 +3264,8 @@ mod tests { use super::{ DEFAULT_BEARER_TOKEN_ENV, apply_bearer_token, bearer_token_from_env_file, legacy_change_request_body, load_cli_config, load_env_file_into_process, - normalize_bearer_token, parse_env_assignment, resolve_policy_context, - resolve_cli_graph, resolve_remote_bearer_token, + normalize_bearer_token, parse_env_assignment, resolve_cli_graph, resolve_policy_context, + resolve_remote_bearer_token, }; use omnigraph_server::load_config; use reqwest::header::AUTHORIZATION; @@ -3420,7 +3527,8 @@ graphs: } #[test] - fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri() { + fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri() + { let temp = tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 9682d9a..156dd6e 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -78,6 +78,52 @@ policy: (config, policy) } +fn write_cluster_config_fixture(root: &std::path::Path) { + fs::write( + root.join("people.pg"), + r#" +node Person { + name: String @key + age: I32? +} +"#, + ) + .unwrap(); + fs::write( + root.join("people.gq"), + r#" +query find_person($name: String) { + match { $p: Person { name: $name } } + return { $p.name, $p.age } +} +"#, + ) + .unwrap(); + fs::write(root.join("base.policy.yaml"), "rules: []\n").unwrap(); + fs::write( + root.join("cluster.yaml"), + r#" +version: 1 +metadata: + name: company-brain +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +policies: + base: + file: ./base.policy.yaml + applies_to: [knowledge] +"#, + ) + .unwrap(); +} + #[test] fn version_command_prints_current_cli_version() { let output = output_success(cli().arg("version")); @@ -89,6 +135,105 @@ fn version_command_prints_current_cli_version() { ); } +#[test] +fn cluster_validate_config_success() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let output = output_success( + cli() + .arg("cluster") + .arg("validate") + .arg("--config") + .arg(temp.path()), + ); + let stdout = stdout_string(&output); + assert!(stdout.contains("cluster config valid"), "{stdout}"); +} + +#[test] +fn cluster_validate_json_is_stable() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("validate") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true); + assert!(json["resource_digests"]["graph.knowledge"].is_string()); + assert!(json["resource_digests"]["query.knowledge.find_person"].is_string()); + assert_eq!(json["dependencies"][0]["from"], "policy.base"); + assert_eq!(json["dependencies"][0]["to"], "graph.knowledge"); +} + +#[test] +fn cluster_plan_json_reads_inferred_local_state() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + let state_dir = temp.path().join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#" +{ + "version": 1, + "applied_revision": { + "config_digest": "old", + "resources": { + "graph.knowledge": { "digest": "old-graph" }, + "policy.old": { "digest": "old-policy" } + } + } +} +"#, + ) + .unwrap(); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("plan") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true); + assert_eq!(json["state_observations"]["state_found"], true); + assert!( + json["changes"] + .as_array() + .unwrap() + .iter() + .any(|change| change["resource"] == "policy.old" && change["operation"] == "delete"), + "plan should read state and delete stale resources: {json}" + ); +} + +#[test] +fn cluster_validate_invalid_config_exits_nonzero() { + let temp = tempdir().unwrap(); + fs::write( + temp.path().join("cluster.yaml"), + "version: 1\ngraphs: {}\npipelines: {}\n", + ) + .unwrap(); + + let output = output_failure( + cli() + .arg("cluster") + .arg("validate") + .arg("--config") + .arg(temp.path()), + ); + let stdout = stdout_string(&output); + assert!(stdout.contains("future_phase_field"), "{stdout}"); +} + #[test] fn short_version_flag_prints_current_cli_version() { let output = output_success(cli().arg("-v")); @@ -798,8 +943,7 @@ fn deprecated_read_and_change_subcommands_emit_warnings() { let output = cli().arg("read").output().unwrap(); let stderr = String::from_utf8(output.stderr).unwrap(); assert!( - stderr.contains("`omnigraph read` is deprecated") - && stderr.contains("`omnigraph query`"), + stderr.contains("`omnigraph read` is deprecated") && stderr.contains("`omnigraph query`"), "expected `omnigraph read` deprecation warning; got: {stderr}" ); @@ -2394,9 +2538,19 @@ fn queries_validate_exits_zero_on_clean_registry() { ); let config = graph.write_config( "omnigraph.yaml", - &queries_test_config(&graph.path().to_string_lossy(), "find_person", "find_person.gq"), + &queries_test_config( + &graph.path().to_string_lossy(), + "find_person", + "find_person.gq", + ), + ); + let output = output_success( + cli() + .arg("queries") + .arg("validate") + .arg("--config") + .arg(&config), ); - let output = output_success(cli().arg("queries").arg("validate").arg("--config").arg(&config)); let stdout = stdout_string(&output); assert!(stdout.contains("OK"), "stdout:\n{stdout}"); } @@ -2405,12 +2559,21 @@ fn queries_validate_exits_zero_on_clean_registry() { fn queries_validate_exits_nonzero_on_type_broken_query() { let graph = SystemGraph::loaded(); // `Widget` is not in the fixture schema. - graph.write_query("ghost.gq", "query ghost() { match { $w: Widget } return { $w.name } }"); + graph.write_query( + "ghost.gq", + "query ghost() { match { $w: Widget } return { $w.name } }", + ); let config = graph.write_config( "omnigraph.yaml", &queries_test_config(&graph.path().to_string_lossy(), "ghost", "ghost.gq"), ); - let output = output_failure(cli().arg("queries").arg("validate").arg("--config").arg(&config)); + let output = output_failure( + cli() + .arg("queries") + .arg("validate") + .arg("--config") + .arg(&config), + ); let stdout = stdout_string(&output); assert!( stdout.contains("ghost"), @@ -2444,7 +2607,13 @@ fn queries_list_prints_registered_query() { graph.path().to_string_lossy().replace('\'', "''") ), ); - let output = output_success(cli().arg("queries").arg("list").arg("--config").arg(&config)); + let output = output_success( + cli() + .arg("queries") + .arg("list") + .arg("--config") + .arg(&config), + ); let stdout = stdout_string(&output); assert!(stdout.contains("find_person"), "stdout:\n{stdout}"); assert!( @@ -2480,7 +2649,13 @@ fn queries_list_requires_graph_selection_for_per_graph_only_registries() { ), ); - let output = output_failure(cli().arg("queries").arg("list").arg("--config").arg(&config)); + let output = output_failure( + cli() + .arg("queries") + .arg("list") + .arg("--config") + .arg(&config), + ); let stderr = String::from_utf8_lossy(&output.stderr); assert!( stderr.contains("local") && stderr.contains("--target local"), @@ -2505,7 +2680,13 @@ fn queries_list_without_graph_selection_lists_top_level_registry() { ), ); - let output = output_success(cli().arg("queries").arg("list").arg("--config").arg(&config)); + let output = output_success( + cli() + .arg("queries") + .arg("list") + .arg("--config") + .arg(&config), + ); let stdout = stdout_string(&output); assert!(stdout.contains("top_find"), "stdout:\n{stdout}"); } @@ -2524,7 +2705,11 @@ fn queries_list_unknown_target_errors() { ); let config = graph.write_config( "omnigraph.yaml", - &queries_test_config(&graph.path().to_string_lossy(), "find_person", "find_person.gq"), + &queries_test_config( + &graph.path().to_string_lossy(), + "find_person", + "find_person.gq", + ), ); let output = output_failure( cli() @@ -2566,7 +2751,7 @@ fn queries_commands_reject_named_graph_with_populated_top_level_block() { " file: ./find_person.gq\n", "cli:\n", " graph: local\n", - "queries:\n", // populated top-level block: the coherence violation + "queries:\n", // populated top-level block: the coherence violation " legacy:\n", " file: ./legacy.gq\n", "policy: {{}}\n", @@ -2592,8 +2777,14 @@ fn queries_validate_exits_nonzero_on_duplicate_tool_name() { // collision — `queries validate` must fail (offline, before the engine // opens) and name both queries plus the contested tool. let graph = SystemGraph::loaded(); - graph.write_query("a.gq", "query a() { match { $p: Person } return { $p.name } }"); - graph.write_query("b.gq", "query b() { match { $p: Person } return { $p.name } }"); + graph.write_query( + "a.gq", + "query a() { match { $p: Person } return { $p.name } }", + ); + graph.write_query( + "b.gq", + "query b() { match { $p: Person } return { $p.name } }", + ); let config = graph.write_config( "omnigraph.yaml", &format!( @@ -2615,7 +2806,13 @@ fn queries_validate_exits_nonzero_on_duplicate_tool_name() { graph.path().to_string_lossy().replace('\'', "''") ), ); - let output = output_failure(cli().arg("queries").arg("validate").arg("--config").arg(&config)); + let output = output_failure( + cli() + .arg("queries") + .arg("validate") + .arg("--config") + .arg(&config), + ); let stderr = String::from_utf8_lossy(&output.stderr); assert!( stderr.contains("dup") && stderr.contains("'a'") && stderr.contains("'b'"), @@ -2635,7 +2832,10 @@ fn queries_validate_positional_uri_ignores_default_graph() { ); // `Widget` is not in the fixture schema — the default graph's per-graph // query would break validate if it were (wrongly) selected. - graph.write_query("broken.gq", "query broken() { match { $w: Widget } return { $w.name } }"); + graph.write_query( + "broken.gq", + "query broken() { match { $w: Widget } return { $w.name } }", + ); let config = graph.write_config( "omnigraph.yaml", concat!( diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml new file mode 100644 index 0000000..60e7785 --- /dev/null +++ b/crates/omnigraph-cluster/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "omnigraph-cluster" +version = "0.6.1" +edition = "2024" +description = "Read-only cluster configuration validation and planning for Omnigraph." +license = "MIT" +repository = "https://github.com/ModernRelay/omnigraph" +homepage = "https://github.com/ModernRelay/omnigraph" +documentation = "https://docs.rs/omnigraph-cluster" + +[dependencies] +omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +sha2 = { workspace = true } +thiserror = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs new file mode 100644 index 0000000..861ae22 --- /dev/null +++ b/crates/omnigraph-cluster/src/lib.rs @@ -0,0 +1,1275 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::fs; +use std::path::{Path, PathBuf}; + +use omnigraph_compiler::build_catalog; +use omnigraph_compiler::query::parser::parse_query; +use omnigraph_compiler::query::typecheck::typecheck_query_decl; +use omnigraph_compiler::schema::parser::parse_schema; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; + +pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml"; +pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum DiagnosticSeverity { + Error, + Warning, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub struct Diagnostic { + pub code: String, + pub severity: DiagnosticSeverity, + pub path: String, + pub message: String, +} + +impl Diagnostic { + fn error(code: impl Into, path: impl Into, message: impl Into) -> Self { + Self { + code: code.into(), + severity: DiagnosticSeverity::Error, + path: path.into(), + message: message.into(), + } + } + + fn warning( + code: impl Into, + path: impl Into, + message: impl Into, + ) -> Self { + Self { + code: code.into(), + severity: DiagnosticSeverity::Warning, + path: path.into(), + message: message.into(), + } + } +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub struct ResourceSummary { + pub address: String, + pub kind: String, + pub digest: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub path: Option, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct Dependency { + pub from: String, + pub to: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct ValidateOutput { + pub ok: bool, + pub config_dir: String, + pub config_file: String, + pub resource_digests: BTreeMap, + pub resources: Vec, + pub dependencies: Vec, + pub diagnostics: Vec, +} + +#[derive(Debug, Clone, Serialize)] +pub struct DesiredRevision { + #[serde(skip_serializing_if = "Option::is_none")] + pub config_digest: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct StateObservations { + pub state_path: String, + pub state_found: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub applied_config_digest: Option, + pub resource_count: usize, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PlanOperation { + Create, + Update, + Delete, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub struct PlanChange { + pub resource: String, + pub operation: PlanOperation, + #[serde(skip_serializing_if = "Option::is_none")] + pub before_digest: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub after_digest: Option, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub struct BlastRadius { + pub resource: String, + pub affected: Vec, +} + +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +pub struct ApprovalRequirement { + pub resource: String, + pub reason: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct PlanOutput { + pub ok: bool, + pub config_dir: String, + pub desired_revision: DesiredRevision, + pub resource_digests: BTreeMap, + pub dependencies: Vec, + pub state_observations: StateObservations, + pub changes: Vec, + pub blast_radius: Vec, + pub approvals_required: Vec, + pub diagnostics: Vec, +} + +#[derive(Debug, Clone)] +struct DesiredCluster { + config_dir: PathBuf, + config_digest: String, + resource_digests: BTreeMap, + resources: Vec, + dependencies: Vec, +} + +#[derive(Debug)] +struct LoadOutcome { + desired: Option, + diagnostics: Vec, + config_dir: PathBuf, + config_file: PathBuf, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct RawClusterConfig { + version: u32, + #[serde(default)] + metadata: Metadata, + #[serde(default)] + state: StateConfig, + #[serde(default)] + graphs: BTreeMap, + #[serde(default)] + policies: BTreeMap, +} + +#[derive(Debug, Default, Deserialize)] +#[serde(deny_unknown_fields)] +struct Metadata { + name: Option, +} + +#[derive(Debug, Default, Deserialize)] +#[serde(deny_unknown_fields)] +struct StateConfig { + backend: Option, + lock: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct GraphConfig { + schema: PathBuf, + #[serde(default)] + queries: BTreeMap, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct QueryConfig { + file: PathBuf, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct PolicyConfig { + file: PathBuf, + applies_to: Vec, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct ClusterState { + version: u32, + applied_revision: AppliedRevisionState, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct AppliedRevisionState { + #[serde(default)] + config_digest: Option, + #[serde(default)] + resources: BTreeMap, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct StateResource { + digest: String, +} + +pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { + let outcome = load_desired(config_dir.as_ref()); + let (resource_digests, resources, dependencies) = match outcome.desired { + Some(desired) => ( + desired.resource_digests, + desired.resources, + desired.dependencies, + ), + None => (BTreeMap::new(), Vec::new(), Vec::new()), + }; + let ok = !has_errors(&outcome.diagnostics); + + ValidateOutput { + ok, + config_dir: display_path(&outcome.config_dir), + config_file: display_path(&outcome.config_file), + resource_digests, + resources, + dependencies, + diagnostics: outcome.diagnostics, + } +} + +pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { + let outcome = load_desired(config_dir.as_ref()); + let mut diagnostics = outcome.diagnostics; + let state_path = outcome.config_dir.join(CLUSTER_STATE_FILE); + let mut observations = StateObservations { + state_path: display_path(&state_path), + state_found: false, + applied_config_digest: None, + resource_count: 0, + }; + + let Some(desired) = outcome.desired else { + return PlanOutput { + ok: false, + config_dir: display_path(&outcome.config_dir), + desired_revision: DesiredRevision { + config_digest: None, + }, + resource_digests: BTreeMap::new(), + dependencies: Vec::new(), + state_observations: observations, + changes: Vec::new(), + blast_radius: Vec::new(), + approvals_required: Vec::new(), + diagnostics, + }; + }; + + let mut prior_resources = BTreeMap::new(); + if state_path.exists() { + observations.state_found = true; + match fs::read_to_string(&state_path) { + Ok(text) => match serde_json::from_str::(&text) { + Ok(state) if state.version == 1 => { + observations.applied_config_digest = state.applied_revision.config_digest; + observations.resource_count = state.applied_revision.resources.len(); + prior_resources = state + .applied_revision + .resources + .into_iter() + .map(|(address, resource)| (address, resource.digest)) + .collect(); + } + Ok(state) => diagnostics.push(Diagnostic::error( + "unsupported_state_version", + "state.version", + format!( + "unsupported cluster state version {}; this build supports version 1", + state.version + ), + )), + Err(err) => diagnostics.push(Diagnostic::error( + "invalid_state_json", + CLUSTER_STATE_FILE, + format!("could not parse state JSON: {err}"), + )), + }, + Err(err) => diagnostics.push(Diagnostic::error( + "state_read_error", + CLUSTER_STATE_FILE, + format!("could not read state file: {err}"), + )), + } + } + + let changes = if has_errors(&diagnostics) { + Vec::new() + } else { + diff_resources(&prior_resources, &desired.resource_digests) + }; + let blast_radius = compute_blast_radius(&changes, &desired.dependencies); + let approvals_required = compute_approvals(&changes); + let ok = !has_errors(&diagnostics); + + PlanOutput { + ok, + config_dir: display_path(&desired.config_dir), + desired_revision: DesiredRevision { + config_digest: Some(desired.config_digest), + }, + resource_digests: desired.resource_digests, + dependencies: desired.dependencies, + state_observations: observations, + changes, + blast_radius, + approvals_required, + diagnostics, + } +} + +fn load_desired(config_dir: &Path) -> LoadOutcome { + let config_dir = config_dir.to_path_buf(); + let config_file = config_dir.join(CLUSTER_CONFIG_FILE); + let mut diagnostics = Vec::new(); + + if !config_dir.is_dir() { + diagnostics.push(Diagnostic::error( + "config_dir_not_found", + display_path(&config_dir), + "`--config` must point at a directory containing cluster.yaml", + )); + return LoadOutcome { + desired: None, + diagnostics, + config_dir, + config_file, + }; + } + + let text = match fs::read_to_string(&config_file) { + Ok(text) => text, + Err(err) => { + diagnostics.push(Diagnostic::error( + "cluster_config_read_error", + CLUSTER_CONFIG_FILE, + format!("could not read cluster.yaml: {err}"), + )); + return LoadOutcome { + desired: None, + diagnostics, + config_dir, + config_file, + }; + } + }; + + diagnostics.extend(duplicate_key_diagnostics(&text)); + diagnostics.extend(future_field_diagnostics(&text)); + if has_errors(&diagnostics) { + return LoadOutcome { + desired: None, + diagnostics, + config_dir, + config_file, + }; + } + + let raw = match serde_yaml::from_str::(&text) { + Ok(raw) => raw, + Err(err) => { + diagnostics.push(Diagnostic::error( + "invalid_cluster_yaml", + CLUSTER_CONFIG_FILE, + format!("could not parse cluster.yaml: {err}"), + )); + return LoadOutcome { + desired: None, + diagnostics, + config_dir, + config_file, + }; + } + }; + + if raw.version != 1 { + diagnostics.push(Diagnostic::error( + "unsupported_cluster_config_version", + "version", + format!( + "unsupported cluster config version {}; this build supports version 1", + raw.version + ), + )); + } + if let Some(name) = raw.metadata.name.as_deref() { + if name.trim().is_empty() { + diagnostics.push(Diagnostic::error( + "empty_metadata_name", + "metadata.name", + "metadata.name must not be empty when provided", + )); + } + } + if let Some(backend) = raw.state.backend.as_deref() { + if backend != "cluster" { + diagnostics.push(Diagnostic::error( + "unsupported_state_backend", + "state.backend", + "Stage 1 supports only omitted state.backend or `cluster`", + )); + } + } + let _lock_parsed_for_forward_compat = raw.state.lock; + + let mut resources = BTreeMap::new(); + let mut dependencies = BTreeSet::new(); + let mut graph_query_digests: BTreeMap> = BTreeMap::new(); + let mut graph_schema_digests: BTreeMap = BTreeMap::new(); + + for (graph_id, graph) in &raw.graphs { + validate_id( + "graph id", + &format!("graphs.{graph_id}"), + graph_id, + &mut diagnostics, + ); + let graph_address = graph_address(graph_id); + let schema_address = schema_address(graph_id); + dependencies.insert(Dependency { + from: schema_address.clone(), + to: graph_address.clone(), + }); + + let schema_path = resolve_config_path(&config_dir, &graph.schema); + let schema_source = match fs::read_to_string(&schema_path) { + Ok(source) => { + let digest = sha256_hex(source.as_bytes()); + graph_schema_digests.insert(graph_id.clone(), digest.clone()); + resources.insert( + schema_address.clone(), + ResourceSummary { + address: schema_address.clone(), + kind: "schema".to_string(), + digest, + path: Some(display_path(&schema_path)), + }, + ); + Some(source) + } + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_file_missing", + format!("graphs.{graph_id}.schema"), + format!( + "could not read schema file '{}': {err}", + schema_path.display() + ), + )); + None + } + }; + + let catalog = schema_source.and_then(|source| match parse_schema(&source) { + Ok(schema) => match build_catalog(&schema) { + Ok(catalog) => Some(catalog), + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_catalog_error", + format!("graphs.{graph_id}.schema"), + err.to_string(), + )); + None + } + }, + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_parse_error", + format!("graphs.{graph_id}.schema"), + err.to_string(), + )); + None + } + }); + + for (query_name, query) in &graph.queries { + validate_id( + "query name", + &format!("graphs.{graph_id}.queries.{query_name}"), + query_name, + &mut diagnostics, + ); + let query_address = query_address(graph_id, query_name); + dependencies.insert(Dependency { + from: query_address.clone(), + to: graph_address.clone(), + }); + dependencies.insert(Dependency { + from: query_address.clone(), + to: schema_address.clone(), + }); + + let query_path = resolve_config_path(&config_dir, &query.file); + match fs::read_to_string(&query_path) { + Ok(source) => { + let digest = sha256_hex(source.as_bytes()); + graph_query_digests + .entry(graph_id.clone()) + .or_default() + .insert(query_name.clone(), digest.clone()); + resources.insert( + query_address.clone(), + ResourceSummary { + address: query_address, + kind: "query".to_string(), + digest, + path: Some(display_path(&query_path)), + }, + ); + validate_query_source( + graph_id, + query_name, + &source, + catalog.as_ref(), + &mut diagnostics, + ); + } + Err(err) => diagnostics.push(Diagnostic::error( + "query_file_missing", + format!("graphs.{graph_id}.queries.{query_name}.file"), + format!( + "could not read query file '{}': {err}", + query_path.display() + ), + )), + } + } + } + + for graph_id in raw.graphs.keys() { + let digest = graph_digest( + graph_id, + graph_schema_digests.get(graph_id), + graph_query_digests.get(graph_id), + ); + resources.insert( + graph_address(graph_id), + ResourceSummary { + address: graph_address(graph_id), + kind: "graph".to_string(), + digest, + path: None, + }, + ); + } + + for (policy_name, policy) in &raw.policies { + validate_id( + "policy name", + &format!("policies.{policy_name}"), + policy_name, + &mut diagnostics, + ); + if policy.applies_to.is_empty() { + diagnostics.push(Diagnostic::error( + "policy_missing_applies_to", + format!("policies.{policy_name}.applies_to"), + "policy.applies_to must name `cluster` or at least one graph", + )); + } + + let policy_address = policy_address(policy_name); + for (idx, target) in policy.applies_to.iter().enumerate() { + match normalize_policy_target(target) { + PolicyTarget::Cluster => {} + PolicyTarget::Graph(graph_id) => { + if raw.graphs.contains_key(&graph_id) { + dependencies.insert(Dependency { + from: policy_address.clone(), + to: graph_address(&graph_id), + }); + } else { + diagnostics.push(Diagnostic::error( + "dangling_graph_reference", + format!("policies.{policy_name}.applies_to[{idx}]"), + format!( + "policy references graph `{graph_id}`, but no graph with that id is declared" + ), + )); + } + } + PolicyTarget::WrongKind(kind) => diagnostics.push(Diagnostic::error( + "wrong_kind_reference", + format!("policies.{policy_name}.applies_to[{idx}]"), + format!("policy applies_to expects graph refs or `cluster`, got `{kind}`"), + )), + } + } + + let policy_path = resolve_config_path(&config_dir, &policy.file); + match fs::read(&policy_path) { + Ok(bytes) => { + resources.insert( + policy_address.clone(), + ResourceSummary { + address: policy_address, + kind: "policy".to_string(), + digest: sha256_hex(&bytes), + path: Some(display_path(&policy_path)), + }, + ); + } + Err(err) => diagnostics.push(Diagnostic::error( + "policy_file_missing", + format!("policies.{policy_name}.file"), + format!( + "could not read policy file '{}': {err}", + policy_path.display() + ), + )), + } + } + + let mut resource_digests = BTreeMap::new(); + let mut resource_list = Vec::new(); + for (address, resource) in resources { + resource_digests.insert(address, resource.digest.clone()); + resource_list.push(resource); + } + let dependencies: Vec<_> = dependencies.into_iter().collect(); + let config_digest = desired_config_digest(&text, &resource_digests); + + LoadOutcome { + desired: Some(DesiredCluster { + config_dir: config_dir.clone(), + config_digest, + resource_digests, + resources: resource_list, + dependencies, + }), + diagnostics, + config_dir, + config_file, + } +} + +fn validate_query_source( + graph_id: &str, + query_name: &str, + source: &str, + catalog: Option<&omnigraph_compiler::catalog::Catalog>, + diagnostics: &mut Vec, +) { + let path = format!("graphs.{graph_id}.queries.{query_name}"); + match parse_query(source) { + Ok(query_file) => { + let Some(query_decl) = query_file.queries.iter().find(|q| q.name == query_name) else { + diagnostics.push(Diagnostic::error( + "query_key_mismatch", + path, + format!("no `query {query_name}` declaration found in the referenced .gq file"), + )); + return; + }; + if let Some(catalog) = catalog { + if let Err(err) = typecheck_query_decl(catalog, query_decl) { + diagnostics.push(Diagnostic::error( + "query_typecheck_error", + format!("graphs.{graph_id}.queries.{query_name}"), + err.to_string(), + )); + } + } else { + diagnostics.push(Diagnostic::warning( + "query_typecheck_skipped", + format!("graphs.{graph_id}.queries.{query_name}"), + "query parsed, but type-check was skipped because the graph schema is invalid", + )); + } + } + Err(err) => diagnostics.push(Diagnostic::error( + "query_parse_error", + path, + err.to_string(), + )), + } +} + +fn diff_resources( + prior: &BTreeMap, + desired: &BTreeMap, +) -> Vec { + let mut changes = Vec::new(); + for (address, after) in desired { + match prior.get(address) { + None => changes.push(PlanChange { + resource: address.clone(), + operation: PlanOperation::Create, + before_digest: None, + after_digest: Some(after.clone()), + }), + Some(before) if before != after => changes.push(PlanChange { + resource: address.clone(), + operation: PlanOperation::Update, + before_digest: Some(before.clone()), + after_digest: Some(after.clone()), + }), + Some(_) => {} + } + } + for (address, before) in prior { + if !desired.contains_key(address) { + changes.push(PlanChange { + resource: address.clone(), + operation: PlanOperation::Delete, + before_digest: Some(before.clone()), + after_digest: None, + }); + } + } + changes.sort_by(|a, b| a.resource.cmp(&b.resource)); + changes +} + +fn compute_blast_radius(changes: &[PlanChange], dependencies: &[Dependency]) -> Vec { + changes + .iter() + .filter_map(|change| { + let affected: Vec<_> = dependencies + .iter() + .filter_map(|dep| (dep.to == change.resource).then_some(dep.from.clone())) + .collect(); + (!affected.is_empty()).then(|| BlastRadius { + resource: change.resource.clone(), + affected, + }) + }) + .collect() +} + +fn compute_approvals(changes: &[PlanChange]) -> Vec { + changes + .iter() + .filter_map(|change| { + if change.operation == PlanOperation::Delete + && (change.resource.starts_with("graph.") || change.resource.starts_with("schema.")) + { + Some(ApprovalRequirement { + resource: change.resource.clone(), + reason: "delete may remove deployed graph or schema definition".to_string(), + }) + } else { + None + } + }) + .collect() +} + +fn duplicate_key_diagnostics(text: &str) -> Vec { + #[derive(Debug)] + struct Frame { + indent: isize, + path: String, + keys: BTreeSet, + } + + let mut diagnostics = Vec::new(); + let mut stack = vec![Frame { + indent: -1, + path: String::new(), + keys: BTreeSet::new(), + }]; + + for (line_idx, line) in text.lines().enumerate() { + let line_without_comment = strip_comment(line); + if line_without_comment.trim().is_empty() { + continue; + } + let indent = line_without_comment + .chars() + .take_while(|ch| *ch == ' ') + .count() as isize; + let trimmed = line_without_comment.trim_start(); + if trimmed.starts_with('-') { + continue; + } + let Some((raw_key, raw_value)) = trimmed.split_once(':') else { + continue; + }; + let key = raw_key.trim(); + if key.is_empty() || key.starts_with('{') || key.starts_with('[') { + continue; + } + + while stack.last().is_some_and(|frame| indent <= frame.indent) { + stack.pop(); + } + let parent = stack.last_mut().expect("root frame is always present"); + let full_path = if parent.path.is_empty() { + key.to_string() + } else { + format!("{}.{}", parent.path, key) + }; + if !parent.keys.insert(key.to_string()) { + diagnostics.push(Diagnostic::error( + "duplicate_yaml_key", + full_path.clone(), + format!("duplicate YAML key `{key}` on line {}", line_idx + 1), + )); + } + if raw_value.trim().is_empty() { + stack.push(Frame { + indent, + path: full_path, + keys: BTreeSet::new(), + }); + } + } + + diagnostics +} + +fn future_field_diagnostics(text: &str) -> Vec { + let Ok(value) = serde_yaml::from_str::(text) else { + return Vec::new(); + }; + let Some(mapping) = value.as_mapping() else { + return Vec::new(); + }; + let future_fields = [ + "apply", + "env_file", + "providers", + "pipelines", + "embeddings", + "ui", + "aliases", + "bindings", + ]; + mapping + .keys() + .filter_map(|key| key.as_str()) + .filter(|key| future_fields.contains(key)) + .map(|key| { + Diagnostic::error( + "future_phase_field", + key, + format!("`{key}` is reserved for a later cluster-control phase"), + ) + }) + .collect() +} + +fn strip_comment(line: &str) -> String { + let mut in_single_quote = false; + let mut in_double_quote = false; + let mut escaped = false; + + for (idx, ch) in line.char_indices() { + if escaped { + escaped = false; + continue; + } + match ch { + '\\' if in_double_quote => escaped = true, + '\'' if !in_double_quote => in_single_quote = !in_single_quote, + '"' if !in_single_quote => in_double_quote = !in_double_quote, + '#' if !in_single_quote && !in_double_quote => return line[..idx].to_string(), + _ => {} + } + } + + line.to_string() +} + +fn validate_id(kind: &str, path: &str, value: &str, diagnostics: &mut Vec) { + let mut chars = value.chars(); + let valid = chars + .next() + .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_') + && chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-'); + if !valid { + diagnostics.push(Diagnostic::error( + "invalid_resource_id", + path, + format!("{kind} `{value}` must start with a letter or `_` and contain only ASCII letters, digits, `_`, or `-`"), + )); + } +} + +enum PolicyTarget { + Cluster, + Graph(String), + WrongKind(String), +} + +fn normalize_policy_target(value: &str) -> PolicyTarget { + if value == "cluster" { + PolicyTarget::Cluster + } else if let Some(graph_id) = value.strip_prefix("graph.") { + PolicyTarget::Graph(graph_id.to_string()) + } else if value.contains('.') { + PolicyTarget::WrongKind(value.to_string()) + } else { + PolicyTarget::Graph(value.to_string()) + } +} + +fn graph_address(graph_id: &str) -> String { + format!("graph.{graph_id}") +} + +fn schema_address(graph_id: &str) -> String { + format!("schema.{graph_id}") +} + +fn query_address(graph_id: &str, query_name: &str) -> String { + format!("query.{graph_id}.{query_name}") +} + +fn policy_address(policy_name: &str) -> String { + format!("policy.{policy_name}") +} + +fn resolve_config_path(config_dir: &Path, path: &Path) -> PathBuf { + if path.is_absolute() { + path.to_path_buf() + } else { + config_dir.join(path) + } +} + +fn graph_digest( + graph_id: &str, + schema_digest: Option<&String>, + query_digests: Option<&BTreeMap>, +) -> String { + let mut input = format!( + "graph\0{graph_id}\0schema\0{}\0", + schema_digest.map_or("", String::as_str) + ); + if let Some(query_digests) = query_digests { + for (name, digest) in query_digests { + input.push_str("query\0"); + input.push_str(name); + input.push('\0'); + input.push_str(digest); + input.push('\0'); + } + } + sha256_hex(input.as_bytes()) +} + +fn desired_config_digest( + config_source: &str, + resource_digests: &BTreeMap, +) -> String { + let mut input = String::from("cluster-config\0"); + input.push_str(config_source); + input.push('\0'); + for (address, digest) in resource_digests { + input.push_str(address); + input.push('\0'); + input.push_str(digest); + input.push('\0'); + } + sha256_hex(input.as_bytes()) +} + +fn sha256_hex(bytes: &[u8]) -> String { + let digest = Sha256::digest(bytes); + let mut out = String::with_capacity(digest.len() * 2); + for byte in digest { + out.push_str(&format!("{byte:02x}")); + } + out +} + +fn has_errors(diagnostics: &[Diagnostic]) -> bool { + diagnostics + .iter() + .any(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error) +} + +fn display_path(path: &Path) -> String { + path.display().to_string() +} + +#[cfg(test)] +mod tests { + use std::fs; + + use serde_json::json; + use tempfile::tempdir; + + use super::*; + + const SCHEMA: &str = r#" +node Person { + name: String @key + age: I32? +} +"#; + + const QUERY: &str = r#" +query find_person($name: String) { + match { $p: Person { name: $name } } + return { $p.name, $p.age } +} +"#; + + fn fixture() -> tempfile::TempDir { + let dir = tempdir().unwrap(); + fs::write(dir.path().join("people.pg"), SCHEMA).unwrap(); + fs::write(dir.path().join("people.gq"), QUERY).unwrap(); + fs::write(dir.path().join("base.policy.yaml"), "rules: []\n").unwrap(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +metadata: + name: test +state: + backend: cluster + lock: true +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +policies: + base: + file: ./base.policy.yaml + applies_to: [knowledge] +"#, + ) + .unwrap(); + dir + } + + #[test] + fn valid_minimal_config() { + let dir = fixture(); + let out = validate_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.resource_digests.contains_key("graph.knowledge")); + assert!(out.resource_digests.contains_key("schema.knowledge")); + assert!( + out.dependencies + .iter() + .any(|dep| dep.from == "policy.base" && dep.to == "graph.knowledge") + ); + } + + #[test] + fn unknown_field_rejection() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + "version: 1\ngraphs: {}\nwat: true\n", + ) + .unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + assert!(out.diagnostics[0].message.contains("unknown field")); + } + + #[test] + fn future_phase_field_rejection() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + "version: 1\ngraphs: {}\npipelines: {}\n", + ) + .unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + assert_eq!(out.diagnostics[0].code, "future_phase_field"); + } + + #[test] + fn duplicate_yaml_key_rejection() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + "version: 1\ngraphs: {}\ngraphs: {}\n", + ) + .unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + assert_eq!(out.diagnostics[0].code, "duplicate_yaml_key"); + } + + #[test] + fn duplicate_yaml_key_rejection_keeps_quoted_hashes() { + let diagnostics = + duplicate_key_diagnostics("\"name#display\": one\n\"name#display\": two\n"); + assert_eq!(diagnostics.len(), 1); + assert_eq!(diagnostics[0].code, "duplicate_yaml_key"); + } + + #[test] + fn missing_schema_query_and_policy_files() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +graphs: + knowledge: + schema: ./missing.pg + queries: + find_person: { file: ./missing.gq } +policies: + base: + file: ./missing.policy.yaml + applies_to: [knowledge] +"#, + ) + .unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + let codes: BTreeSet<_> = out.diagnostics.iter().map(|d| d.code.as_str()).collect(); + assert!(codes.contains("schema_file_missing")); + assert!(codes.contains("query_file_missing")); + assert!(codes.contains("policy_file_missing")); + } + + #[test] + fn wrong_kind_and_dangling_refs_fail() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg +policies: + base: + file: ./base.policy.yaml + applies_to: [query.knowledge.find_person, missing] +"#, + ) + .unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + let codes: BTreeSet<_> = out.diagnostics.iter().map(|d| d.code.as_str()).collect(); + assert!(codes.contains("wrong_kind_reference")); + assert!(codes.contains("dangling_graph_reference")); + } + + #[test] + fn query_key_mismatch_fails() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg + queries: + different: { file: ./people.gq } +"#, + ) + .unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + assert_eq!(out.diagnostics[0].code, "query_key_mismatch"); + } + + #[test] + fn query_typecheck_failure_fails() { + let dir = fixture(); + fs::write( + dir.path().join("people.gq"), + "query find_person() { match { $d: DoesNotExist } return { $d.name } }\n", + ) + .unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "query_typecheck_error") + ); + } + + #[test] + fn missing_state_plans_creates() { + let dir = fixture(); + let out = plan_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!out.state_observations.state_found); + assert!( + out.changes + .iter() + .all(|c| c.operation == PlanOperation::Create) + ); + assert!(out.changes.iter().any(|c| c.resource == "graph.knowledge")); + } + + #[test] + fn existing_state_plans_update_and_delete_deterministically() { + let dir = fixture(); + let first = plan_config_dir(dir.path()); + let state_dir = dir.path().join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + serde_json::to_string_pretty(&json!({ + "version": 1, + "applied_revision": { + "config_digest": "old", + "resources": { + "graph.knowledge": { "digest": first.resource_digests["graph.knowledge"] }, + "policy.old": { "digest": "abc" }, + "schema.knowledge": { "digest": "old-schema" } + } + } + })) + .unwrap(), + ) + .unwrap(); + + let out = plan_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + let rendered: Vec<_> = out + .changes + .iter() + .map(|change| (change.resource.as_str(), &change.operation)) + .collect(); + assert_eq!( + rendered, + vec![ + ("policy.base", &PlanOperation::Create), + ("policy.old", &PlanOperation::Delete), + ("query.knowledge.find_person", &PlanOperation::Create), + ("schema.knowledge", &PlanOperation::Update), + ] + ); + } + + #[test] + fn external_state_backend_rejected() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + "version: 1\nstate:\n backend: s3://bucket/state\ngraphs: {}\n", + ) + .unwrap(); + let out = validate_config_dir(dir.path()); + assert!(!out.ok); + assert_eq!(out.diagnostics[0].code, "unsupported_state_backend"); + } +} diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 425fcee..0b5a234 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,6 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, read-only validate/plan | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 8263919..2f27322 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -2,7 +2,7 @@ A reference for the `omnigraph` binary's command surface and `omnigraph.yaml` schema. For a quick-start guide, see [cli.md](cli.md). -17 top-level command families, 40+ subcommands. All commands accept either a positional `URI`, `--uri`, or a `--target ` resolved against `omnigraph.yaml`. +18 top-level command families, 40+ subcommands. Graph commands accept either a positional `URI`, `--uri`, or a `--target ` resolved against `omnigraph.yaml`; `cluster` commands instead use `--config `. ## Top-level commands @@ -21,6 +21,7 @@ A reference for the `omnigraph` binary's command surface and `omnigraph.yaml` sc | `schema plan \| apply \| show (alias: get)` | migrations | | `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` | | `queries validate \| list` | operate on the server-side stored-query registry (the `queries:` block). `validate` type-checks every stored query against the live schema offline (opens the selected graph; exits non-zero on any breakage), catching schema drift without restarting the server; `list` prints the selected registry's query names, MCP exposure, and typed params. For per-graph registries, pass `--target ` or set `cli.graph`; with no graph selection, `list` shows only top-level `queries:`. Distinct from `lint`, which validates a single `.gq` file | +| `cluster validate \| plan` | read-only cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`. No apply, lock, graph open, server change, or state write occurs in Stage 1 | | `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns; `--json` reports a `skipped` field) | | `cleanup --keep N --older-than 7d --confirm` | destructive version GC | | `embed` | offline JSONL embedding pipeline | @@ -73,6 +74,20 @@ policy: file: ./policy.yaml ``` +## Cluster config preview + +```bash +omnigraph cluster validate --config ./company-brain +omnigraph cluster plan --config ./company-brain --json +``` + +`--config` is a directory containing `cluster.yaml`; it defaults to `.`. +Stage 1 accepts graphs, schemas, stored queries, and policy bundle file +references. `cluster plan` reads local JSON state from +`/__cluster/state.json`; a missing file means empty state. External +state backends, apply, locks, pipelines, UI specs, embeddings, aliases, and +bindings are reserved for later stages. See [cluster-config.md](cluster-config.md). + ## Output formats (`query` command, alias: `read`) - `json` — pretty-printed object with metadata + rows diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md new file mode 100644 index 0000000..29d9c32 --- /dev/null +++ b/docs/user/cluster-config.md @@ -0,0 +1,95 @@ +# Cluster Config + +**Status:** Stage 1 read-only preview. + +Cluster config is the future control-plane configuration surface for a whole +OmniGraph deployment. In this stage, OmniGraph can validate a local +`cluster.yaml` folder and produce a deterministic read-only plan. It does not +apply changes, acquire locks, open graph roots, start servers, or write state. + +## Commands + +```bash +omnigraph cluster validate --config ./company-brain +omnigraph cluster plan --config ./company-brain --json +``` + +`--config` points at a directory, not a file. The directory must contain +`cluster.yaml`. When omitted, it defaults to the current directory. + +## Supported `cluster.yaml` + +Stage 1 accepts only the read-only resource subset: + +```yaml +version: 1 +metadata: + name: company-brain + +state: + backend: cluster + lock: true + +graphs: + knowledge: + schema: ./knowledge.pg + queries: + find_experts: + file: ./knowledge.gq + +policies: + base: + file: ./base.policy.yaml + applies_to: [knowledge] +``` + +`metadata.name` is a display label. `state.lock` is parsed for forward +compatibility, but no lock is acquired in this read-only stage. `state.backend` +may be omitted or set to `cluster`; external state backends are reserved for a +later stage. + +## Validation + +`cluster validate` checks: + +- `cluster.yaml` syntax and supported fields +- duplicate YAML keys +- schema, query, and policy file existence +- schema parsing and catalog construction +- stored-query parsing and query-name matching +- stored-query type-checking against the desired schema +- policy `applies_to` graph references + +Fields reserved for later phases, such as `pipelines`, `embeddings`, `ui`, +`aliases`, and `bindings`, fail with a typed diagnostic instead of being +silently ignored. + +## Planning + +`cluster plan` first performs validation, then reads local JSON state from: + +```text +/__cluster/state.json +``` + +If the file is missing, the state is treated as empty and every desired +resource is planned as a create. If present, the file must use this shape: + +```json +{ + "version": 1, + "applied_revision": { + "config_digest": "...", + "resources": { + "graph.knowledge": { "digest": "..." }, + "schema.knowledge": { "digest": "..." }, + "query.knowledge.find_experts": { "digest": "..." }, + "policy.base": { "digest": "..." } + } + } +} +``` + +Plan output compares desired resource digests against state resource digests +and reports `create`, `update`, and `delete` changes. The command never writes +`state.json`; apply and locking are later-stage work. diff --git a/docs/user/index.md b/docs/user/index.md index 1b93efa..6cf6ade 100644 --- a/docs/user/index.md +++ b/docs/user/index.md @@ -13,6 +13,7 @@ of MRs, internal recovery mechanics, or contributor-only invariants. | Install OmniGraph | [install.md](install.md) | | Run the CLI locally | [cli.md](cli.md) | | Look up every CLI flag and config field | [cli-reference.md](cli-reference.md) | +| Validate and plan cluster config | [cluster-config.md](cluster-config.md) | | Write schemas | [schema-language.md](schema-language.md) | | Read schema-lint diagnostic codes | [schema-lint.md](schema-lint.md) | | Write queries and mutations | [query-language.md](query-language.md) |