From dc0a1fc5a5720d44f78a59f3581f2408d8c79705 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 05:37:20 +0300 Subject: [PATCH] refactor(cluster): move declared-config loading to config.rs Verbatim move of cluster.yaml parsing, query discovery, source digesting, header/id validation, path resolution, and live-graph observation. Two helpers that the cut swept along were relocated to their right homes (state-status helpers back to lib.rs, lock-file helpers to store.rs). 95 tests green. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cluster/src/config.rs | 881 ++++++++++++++++++++++ crates/omnigraph-cluster/src/lib.rs | 971 +------------------------ crates/omnigraph-cluster/src/store.rs | 27 + 3 files changed, 947 insertions(+), 932 deletions(-) create mode 100644 crates/omnigraph-cluster/src/config.rs diff --git a/crates/omnigraph-cluster/src/config.rs b/crates/omnigraph-cluster/src/config.rs new file mode 100644 index 0000000..ecdc71c --- /dev/null +++ b/crates/omnigraph-cluster/src/config.rs @@ -0,0 +1,881 @@ +//! Declared-configuration loading: cluster.yaml parsing, query +//! discovery, source digesting, validation (moved verbatim from lib.rs +//! in the modularization). Reads the operator's WORKING TREE — stored +//! state never lives here (see store.rs). + +use super::*; + +/// How a graph declares its stored queries. Terraform-style: the `.gq` +/// files ARE the declaration — point at them (or a directory) and every +/// `query ` they contain is discovered. The explicit name->file map +/// remains for fine-grained control. +#[derive(Debug, Serialize, Deserialize)] +#[serde(untagged)] +pub(crate) enum QueriesDecl { + /// `queries: ./queries/` — a directory (top-level `*.gq`, sorted) or a + /// single `.gq` file; every declaration inside is registered. + Discover(PathBuf), + /// `queries: [./queries/, ./extra.gq]` — several directories/files. + DiscoverMany(Vec), + /// `queries: { name: { file: ... } }` — explicit registry. + Explicit(BTreeMap), +} + +impl Default for QueriesDecl { + fn default() -> Self { + QueriesDecl::Explicit(BTreeMap::new()) + } +} + +/// Expand a graph's query declaration into the canonical name->file map. +/// Discovery reads and parses each `.gq`; unreadable or unparseable files +/// and duplicate query names are loud validation errors — a declaration the +/// tool cannot enumerate is broken, not partially usable. +pub(crate) fn resolve_query_decls( + config_dir: &Path, + graph_id: &str, + decl: &QueriesDecl, + diagnostics: &mut Vec, +) -> (BTreeMap, BTreeMap) { + let paths: Vec = match decl { + QueriesDecl::Explicit(map) => { + return ( + map.iter() + .map(|(name, config)| { + (name.clone(), QueryConfig { file: config.file.clone() }) + }) + .collect(), + BTreeMap::new(), + ); + } + QueriesDecl::Discover(path) => vec![path.clone()], + QueriesDecl::DiscoverMany(paths) => paths.clone(), + }; + + let mut files: Vec<(PathBuf, PathBuf)> = Vec::new(); // (declared-relative, resolved) + for declared in &paths { + let resolved = resolve_config_path(config_dir, declared); + if resolved.is_dir() { + let mut entries: Vec = match fs::read_dir(&resolved) { + Ok(read) => read + .flatten() + .map(|entry| entry.path()) + .filter(|path| path.extension().is_some_and(|ext| ext == "gq")) + .collect(), + Err(err) => { + diagnostics.push(Diagnostic::error( + "query_dir_unreadable", + format!("graphs.{graph_id}.queries"), + format!("could not list query directory '{}': {err}", resolved.display()), + )); + continue; + } + }; + entries.sort(); + if entries.is_empty() { + diagnostics.push(Diagnostic::warning( + "query_dir_empty", + format!("graphs.{graph_id}.queries"), + format!("query directory '{}' contains no .gq files", resolved.display()), + )); + } + for path in entries { + let relative = declared.join(path.file_name().expect("dir entries have names")); + files.push((relative, path)); + } + } else { + files.push((declared.clone(), resolved)); + } + } + + let mut registry: BTreeMap = BTreeMap::new(); + let mut origin: BTreeMap = BTreeMap::new(); + // Content read once at discovery and handed to the caller — the per-query + // digest/typecheck pass reuses it instead of re-reading (no N+1 reads, no + // window for the file to change between enumeration and validation). + let mut contents: BTreeMap = BTreeMap::new(); + for (declared, resolved) in files { + let source = match fs::read_to_string(&resolved) { + Ok(source) => source, + Err(err) => { + diagnostics.push(Diagnostic::error( + "query_file_missing", + format!("graphs.{graph_id}.queries"), + format!("could not read query file '{}': {err}", resolved.display()), + )); + continue; + } + }; + let parsed = match parse_query(&source) { + Ok(parsed) => parsed, + Err(err) => { + diagnostics.push(Diagnostic::error( + "query_parse_error", + format!("graphs.{graph_id}.queries"), + format!("'{}' does not parse: {err}", resolved.display()), + )); + continue; + } + }; + for query_decl in &parsed.queries { + let name = query_decl.name.clone(); + if let Some(previous) = origin.get(&name) { + diagnostics.push(Diagnostic::error( + "duplicate_query_name", + format!("graphs.{graph_id}.queries.{name}"), + format!( + "query '{name}' is declared in both '{}' and '{}'", + previous.display(), + declared.display() + ), + )); + continue; + } + origin.insert(name.clone(), declared.clone()); + registry.insert(name, QueryConfig { file: declared.clone() }); + } + contents.insert(declared, source); + } + (registry, contents) +} + +pub(crate) fn parse_cluster_config(config_dir: &Path) -> ParsedConfig { + let config_dir = config_dir.to_path_buf(); + let config_file = config_dir.join(CLUSTER_CONFIG_FILE); + let mut diagnostics = Vec::new(); + + if !config_dir.is_dir() { + diagnostics.push(Diagnostic::error( + "config_dir_not_found", + display_path(&config_dir), + "`--config` must point at a directory containing cluster.yaml", + )); + return ParsedConfig { + raw: None, + diagnostics, + config_dir, + config_file, + }; + } + + let text = match fs::read_to_string(&config_file) { + Ok(text) => text, + Err(err) => { + diagnostics.push(Diagnostic::error( + "cluster_config_read_error", + CLUSTER_CONFIG_FILE, + format!("could not read cluster.yaml: {err}"), + )); + return ParsedConfig { + raw: None, + diagnostics, + config_dir, + config_file, + }; + } + }; + + diagnostics.extend(duplicate_key_diagnostics(&text)); + diagnostics.extend(future_field_diagnostics(&text)); + if has_errors(&diagnostics) { + return ParsedConfig { + raw: None, + diagnostics, + config_dir, + config_file, + }; + } + + let raw = match serde_yaml::from_str::(&text) { + Ok(raw) => Some(raw), + Err(err) => { + diagnostics.push(Diagnostic::error( + "invalid_cluster_yaml", + CLUSTER_CONFIG_FILE, + format!("could not parse cluster.yaml: {err}"), + )); + None + } + }; + + ParsedConfig { + raw, + diagnostics, + config_dir, + config_file, + } +} + +pub(crate) fn validate_cluster_header( + raw: &RawClusterConfig, + diagnostics: &mut Vec, +) -> ClusterSettings { + if raw.version != 1 { + diagnostics.push(Diagnostic::error( + "unsupported_cluster_config_version", + "version", + format!( + "unsupported cluster config version {}; this build supports version 1", + raw.version + ), + )); + } + if let Some(name) = raw.metadata.name.as_deref() { + if name.trim().is_empty() { + diagnostics.push(Diagnostic::error( + "empty_metadata_name", + "metadata.name", + "metadata.name must not be empty when provided", + )); + } + } + if let Some(backend) = raw.state.backend.as_deref() { + if backend != "cluster" { + diagnostics.push(Diagnostic::error( + "unsupported_state_backend", + "state.backend", + "Stage 2C supports only omitted state.backend or `cluster`", + )); + } + } + + ClusterSettings { + state_lock: raw.state.lock.unwrap_or(true), + } +} + + + +pub(crate) fn state_resource_digests(state: &ClusterState) -> BTreeMap { + state + .applied_revision + .resources + .iter() + .map(|(address, resource)| (address.clone(), resource.digest.clone())) + .collect() +} + +pub(crate) fn initial_import_state(desired: &DesiredCluster) -> ClusterState { + ClusterState { + version: 1, + state_revision: 0, + applied_revision: AppliedRevisionState { + config_digest: Some(desired.config_digest.clone()), + resources: BTreeMap::new(), + }, + resource_statuses: BTreeMap::new(), + approval_records: BTreeMap::new(), + recovery_records: BTreeMap::new(), + observations: BTreeMap::new(), + } +} + + +pub(crate) async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize { + let mut graph_error_count = 0; + for graph in &desired.graphs { + let graph_address = graph_address(&graph.id); + let schema_address = schema_address(&graph.id); + let graph_path = desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{}.omni", graph.id)); + let graph_uri = display_path(&graph_path); + let observed_at = now_rfc3339(); + + if !graph_path.exists() { + state.applied_revision.resources.remove(&graph_address); + state.applied_revision.resources.remove(&schema_address); + state.observations.insert( + graph_address.clone(), + graph_observation_json(GraphObservationJson { + address: &graph_address, + graph_uri: &graph_uri, + observed_at: &observed_at, + exists: false, + manifest_version: None, + schema_digest: None, + desired_schema_digest: &graph.schema_digest, + schema_matches_desired: Some(false), + error: Some("derived graph root is missing"), + }), + ); + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "graph_missing", + "derived graph root is missing", + ); + set_resource_status( + state, + &schema_address, + ResourceLifecycleStatus::Drifted, + "graph_missing", + "derived graph root is missing", + ); + continue; + } + + match observe_live_graph(&graph_uri).await { + Ok(observation) => { + let schema_matches = observation.schema_digest == graph.schema_digest; + state.applied_revision.resources.insert( + schema_address.clone(), + StateResource { + digest: observation.schema_digest.clone(), + applies_to: None, + }, + ); + let query_digests = state_query_digests_for_graph(state, &graph.id); + let graph_digest_value = graph_digest( + &graph.id, + Some(&observation.schema_digest), + Some(&query_digests), + ); + state.applied_revision.resources.insert( + graph_address.clone(), + StateResource { + digest: graph_digest_value, + applies_to: None, + }, + ); + state.observations.insert( + graph_address.clone(), + graph_observation_json(GraphObservationJson { + address: &graph_address, + graph_uri: &graph_uri, + observed_at: &observed_at, + exists: true, + manifest_version: Some(observation.manifest_version), + schema_digest: Some(observation.schema_digest.as_str()), + desired_schema_digest: &graph.schema_digest, + schema_matches_desired: Some(schema_matches), + error: None, + }), + ); + if schema_matches { + set_resource_status_applied(state, &graph_address); + set_resource_status_applied(state, &schema_address); + } else { + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "schema_mismatch", + "live schema digest differs from desired schema digest", + ); + set_resource_status( + state, + &schema_address, + ResourceLifecycleStatus::Drifted, + "schema_mismatch", + "live schema digest differs from desired schema digest", + ); + } + } + Err(error) => { + graph_error_count += 1; + state.observations.insert( + graph_address.clone(), + graph_observation_json(GraphObservationJson { + address: &graph_address, + graph_uri: &graph_uri, + observed_at: &observed_at, + exists: true, + manifest_version: None, + schema_digest: None, + desired_schema_digest: &graph.schema_digest, + schema_matches_desired: None, + error: Some(error.as_str()), + }), + ); + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Error, + "graph_observation_error", + error.as_str(), + ); + set_resource_status( + state, + &schema_address, + ResourceLifecycleStatus::Error, + "graph_observation_error", + error.as_str(), + ); + } + } + } + graph_error_count +} + +/// RFC-004 §D7: the data-aware preview — the engine's migration plan for a +/// desired schema against the live graph, computed read-only (no lock). +pub(crate) async fn preview_schema_migration( + graph_uri: &str, + schema_path: &str, +) -> Result { + let source = fs::read_to_string(schema_path).map_err(|err| err.to_string())?; + let db = Omnigraph::open_read_only(graph_uri) + .await + .map_err(|err| err.to_string())?; + let preview = db + .preview_schema_apply_with_options(&source, SchemaApplyOptions::default()) + .await + .map_err(|err| err.to_string())?; + Ok(preview.plan) +} + +struct LiveGraphObservation { + manifest_version: u64, + schema_digest: String, +} + +pub(crate) async fn observe_live_graph(graph_uri: &str) -> Result { + let db = Omnigraph::open_read_only(graph_uri) + .await + .map_err(|err| err.to_string())?; + let snapshot = db + .snapshot_of(ReadTarget::branch("main")) + .await + .map_err(|err| err.to_string())?; + let schema_source = db.schema_source(); + Ok(LiveGraphObservation { + manifest_version: snapshot.version(), + schema_digest: sha256_hex(schema_source.as_bytes()), + }) +} + +struct GraphObservationJson<'a> { + address: &'a str, + graph_uri: &'a str, + observed_at: &'a str, + exists: bool, + manifest_version: Option, + schema_digest: Option<&'a str>, + desired_schema_digest: &'a str, + schema_matches_desired: Option, + error: Option<&'a str>, +} + +pub(crate) fn graph_observation_json(observation: GraphObservationJson<'_>) -> serde_json::Value { + json!({ + "kind": "graph", + "address": observation.address, + "graph_uri": observation.graph_uri, + "observed_at": observation.observed_at, + "exists": observation.exists, + "manifest_version": observation.manifest_version, + "schema_digest": observation.schema_digest, + "desired_schema_digest": observation.desired_schema_digest, + "schema_matches_desired": observation.schema_matches_desired, + "error": observation.error, + }) +} + + +pub(crate) fn load_desired(config_dir: &Path) -> LoadOutcome { + let parsed = parse_cluster_config(config_dir); + let config_dir = parsed.config_dir; + let config_file = parsed.config_file; + let mut diagnostics = parsed.diagnostics; + let Some(raw) = parsed.raw else { + return LoadOutcome { + desired: None, + diagnostics, + config_dir, + config_file, + }; + }; + let settings = validate_cluster_header(&raw, &mut diagnostics); + + let mut resources = BTreeMap::new(); + let mut dependencies = BTreeSet::new(); + let mut graph_query_digests: BTreeMap> = BTreeMap::new(); + let mut graph_schema_digests: BTreeMap = BTreeMap::new(); + + for (graph_id, graph) in &raw.graphs { + validate_id( + "graph id", + &format!("graphs.{graph_id}"), + graph_id, + &mut diagnostics, + ); + let graph_address = graph_address(graph_id); + let schema_address = schema_address(graph_id); + dependencies.insert(Dependency { + from: schema_address.clone(), + to: graph_address.clone(), + }); + + let schema_path = resolve_config_path(&config_dir, &graph.schema); + let schema_source = match fs::read_to_string(&schema_path) { + Ok(source) => { + let digest = sha256_hex(source.as_bytes()); + graph_schema_digests.insert(graph_id.clone(), digest.clone()); + resources.insert( + schema_address.clone(), + ResourceSummary { + address: schema_address.clone(), + kind: "schema".to_string(), + digest, + path: Some(display_path(&schema_path)), + }, + ); + Some(source) + } + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_file_missing", + format!("graphs.{graph_id}.schema"), + format!( + "could not read schema file '{}': {err}", + schema_path.display() + ), + )); + None + } + }; + + let catalog = schema_source.and_then(|source| match parse_schema(&source) { + Ok(schema) => match build_catalog(&schema) { + Ok(catalog) => Some(catalog), + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_catalog_error", + format!("graphs.{graph_id}.schema"), + err.to_string(), + )); + None + } + }, + Err(err) => { + diagnostics.push(Diagnostic::error( + "schema_parse_error", + format!("graphs.{graph_id}.schema"), + err.to_string(), + )); + None + } + }); + + let (graph_queries, query_contents) = + resolve_query_decls(&config_dir, graph_id, &graph.queries, &mut diagnostics); + for (query_name, query) in &graph_queries { + validate_id( + "query name", + &format!("graphs.{graph_id}.queries.{query_name}"), + query_name, + &mut diagnostics, + ); + let query_address = query_address(graph_id, query_name); + dependencies.insert(Dependency { + from: query_address.clone(), + to: graph_address.clone(), + }); + dependencies.insert(Dependency { + from: query_address.clone(), + to: schema_address.clone(), + }); + + let query_path = resolve_config_path(&config_dir, &query.file); + let source = match query_contents.get(&query.file) { + Some(cached) => Ok(cached.clone()), + None => fs::read_to_string(&query_path), + }; + match source { + Ok(source) => { + let digest = sha256_hex(source.as_bytes()); + graph_query_digests + .entry(graph_id.clone()) + .or_default() + .insert(query_name.clone(), digest.clone()); + resources.insert( + query_address.clone(), + ResourceSummary { + address: query_address, + kind: "query".to_string(), + digest, + path: Some(display_path(&query_path)), + }, + ); + validate_query_source( + graph_id, + query_name, + &source, + catalog.as_ref(), + &mut diagnostics, + ); + } + Err(err) => diagnostics.push(Diagnostic::error( + "query_file_missing", + format!("graphs.{graph_id}.queries.{query_name}.file"), + format!( + "could not read query file '{}': {err}", + query_path.display() + ), + )), + } + } + } + + for graph_id in raw.graphs.keys() { + let digest = graph_digest( + graph_id, + graph_schema_digests.get(graph_id), + graph_query_digests.get(graph_id), + ); + resources.insert( + graph_address(graph_id), + ResourceSummary { + address: graph_address(graph_id), + kind: "graph".to_string(), + digest, + path: None, + }, + ); + } + + let mut policy_bindings: BTreeMap> = BTreeMap::new(); + for (policy_name, policy) in &raw.policies { + validate_id( + "policy name", + &format!("policies.{policy_name}"), + policy_name, + &mut diagnostics, + ); + if policy.applies_to.is_empty() { + diagnostics.push(Diagnostic::error( + "policy_missing_applies_to", + format!("policies.{policy_name}.applies_to"), + "policy.applies_to must name `cluster` or at least one graph", + )); + } + + let policy_address = policy_address(policy_name); + let mut normalized_bindings: Vec = Vec::new(); + for (idx, target) in policy.applies_to.iter().enumerate() { + match normalize_policy_target(target) { + PolicyTarget::Cluster => { + normalized_bindings.push("cluster".to_string()); + } + PolicyTarget::Graph(graph_id) => { + normalized_bindings.push(graph_address(&graph_id)); + if raw.graphs.contains_key(&graph_id) { + dependencies.insert(Dependency { + from: policy_address.clone(), + to: graph_address(&graph_id), + }); + } else { + diagnostics.push(Diagnostic::error( + "dangling_graph_reference", + format!("policies.{policy_name}.applies_to[{idx}]"), + format!( + "policy references graph `{graph_id}`, but no graph with that id is declared" + ), + )); + } + } + PolicyTarget::WrongKind(kind) => diagnostics.push(Diagnostic::error( + "wrong_kind_reference", + format!("policies.{policy_name}.applies_to[{idx}]"), + format!("policy applies_to expects graph refs or `cluster`, got `{kind}`"), + )), + } + } + + normalized_bindings.sort(); + normalized_bindings.dedup(); + policy_bindings.insert(policy_address.clone(), normalized_bindings); + + let policy_path = resolve_config_path(&config_dir, &policy.file); + match fs::read(&policy_path) { + Ok(bytes) => { + resources.insert( + policy_address.clone(), + ResourceSummary { + address: policy_address, + kind: "policy".to_string(), + digest: sha256_hex(&bytes), + path: Some(display_path(&policy_path)), + }, + ); + } + Err(err) => diagnostics.push(Diagnostic::error( + "policy_file_missing", + format!("policies.{policy_name}.file"), + format!( + "could not read policy file '{}': {err}", + policy_path.display() + ), + )), + } + } + + let mut resource_digests = BTreeMap::new(); + let mut resource_list = Vec::new(); + for (address, resource) in resources { + resource_digests.insert(address, resource.digest.clone()); + resource_list.push(resource); + } + let dependencies: Vec<_> = dependencies.into_iter().collect(); + let graphs = raw + .graphs + .keys() + .map(|graph_id| DesiredGraph { + id: graph_id.clone(), + schema_digest: graph_schema_digests + .get(graph_id) + .cloned() + .unwrap_or_default(), + }) + .collect(); + let config_digest = desired_config_digest(&raw, &resource_digests); + + LoadOutcome { + desired: Some(DesiredCluster { + config_dir: config_dir.clone(), + config_digest, + state_lock: settings.state_lock, + graphs, + resource_digests, + resources: resource_list, + dependencies, + policy_bindings, + }), + diagnostics, + config_dir, + config_file, + } +} + +pub(crate) fn validate_query_source( + graph_id: &str, + query_name: &str, + source: &str, + catalog: Option<&omnigraph_compiler::catalog::Catalog>, + diagnostics: &mut Vec, +) { + let path = format!("graphs.{graph_id}.queries.{query_name}"); + match parse_query(source) { + Ok(query_file) => { + let Some(query_decl) = query_file.queries.iter().find(|q| q.name == query_name) else { + diagnostics.push(Diagnostic::error( + "query_key_mismatch", + path, + format!("no `query {query_name}` declaration found in the referenced .gq file"), + )); + return; + }; + if let Some(catalog) = catalog { + if let Err(err) = typecheck_query_decl(catalog, query_decl) { + diagnostics.push(Diagnostic::error( + "query_typecheck_error", + format!("graphs.{graph_id}.queries.{query_name}"), + err.to_string(), + )); + } + } else { + diagnostics.push(Diagnostic::warning( + "query_typecheck_skipped", + format!("graphs.{graph_id}.queries.{query_name}"), + "query parsed, but type-check was skipped because the graph schema is invalid", + )); + } + } + Err(err) => diagnostics.push(Diagnostic::error( + "query_parse_error", + path, + err.to_string(), + )), + } +} + +pub(crate) fn future_field_diagnostics(text: &str) -> Vec { + let Ok(value) = serde_yaml::from_str::(text) else { + return Vec::new(); + }; + let Some(mapping) = value.as_mapping() else { + return Vec::new(); + }; + let future_fields = [ + "apply", + "env_file", + "providers", + "pipelines", + "embeddings", + "ui", + "aliases", + "bindings", + ]; + mapping + .keys() + .filter_map(|key| key.as_str()) + .filter(|key| future_fields.contains(key)) + .map(|key| { + Diagnostic::error( + "future_phase_field", + key, + format!("`{key}` is reserved for a later cluster-control phase"), + ) + }) + .collect() +} + +pub(crate) fn validate_id(kind: &str, path: &str, value: &str, diagnostics: &mut Vec) { + let mut chars = value.chars(); + let valid = chars + .next() + .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_') + && chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-'); + if !valid { + diagnostics.push(Diagnostic::error( + "invalid_resource_id", + path, + format!("{kind} `{value}` must start with a letter or `_` and contain only ASCII letters, digits, `_`, or `-`"), + )); + } +} + +enum PolicyTarget { + Cluster, + Graph(String), + WrongKind(String), +} + +pub(crate) fn normalize_policy_target(value: &str) -> PolicyTarget { + if value == "cluster" { + PolicyTarget::Cluster + } else if let Some(graph_id) = value.strip_prefix("graph.") { + PolicyTarget::Graph(graph_id.to_string()) + } else if value.contains('.') { + PolicyTarget::WrongKind(value.to_string()) + } else { + PolicyTarget::Graph(value.to_string()) + } +} + +pub(crate) fn graph_address(graph_id: &str) -> String { + format!("graph.{graph_id}") +} + +pub(crate) fn schema_address(graph_id: &str) -> String { + format!("schema.{graph_id}") +} + +pub(crate) fn query_address(graph_id: &str, query_name: &str) -> String { + format!("query.{graph_id}.{query_name}") +} + +pub(crate) fn policy_address(policy_name: &str) -> String { + format!("policy.{policy_name}") +} + +pub(crate) fn resolve_config_path(config_dir: &Path, path: &Path) -> PathBuf { + if path.is_absolute() { + path.to_path_buf() + } else { + config_dir.join(path) + } +} diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index c54d245..8b41fdf 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -19,6 +19,7 @@ use ulid::Ulid; pub mod failpoints; +mod config; mod diff; mod serve; mod sweep; @@ -26,6 +27,7 @@ mod store; use store::{LocalStateBackend, StateLockGuard, StateSnapshot}; pub use serve::{ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, read_serving_snapshot}; use serve::read_verified_payload; +use config::{QueriesDecl, observe_declared_graphs, validate_cluster_header, future_field_diagnostics, initial_import_state, observe_live_graph, preview_schema_migration, state_resource_digests, graph_address, policy_address, query_address, schema_address, load_desired, normalize_policy_target, parse_cluster_config, resolve_config_path, resolve_query_decls, validate_id, validate_query_source}; use diff::{FailedGraphOrigin, ResourceKind, append_policy_binding_changes, approved_resources, classify_changes, compute_approvals, compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind}; use sweep::{mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars, tombstone_graph_subtree, warn_pending_recovery_sidecars}; @@ -430,138 +432,6 @@ struct GraphConfig { /// How a graph declares its stored queries. Terraform-style: the `.gq` /// files ARE the declaration — point at them (or a directory) and every -/// `query ` they contain is discovered. The explicit name->file map -/// remains for fine-grained control. -#[derive(Debug, Serialize, Deserialize)] -#[serde(untagged)] -enum QueriesDecl { - /// `queries: ./queries/` — a directory (top-level `*.gq`, sorted) or a - /// single `.gq` file; every declaration inside is registered. - Discover(PathBuf), - /// `queries: [./queries/, ./extra.gq]` — several directories/files. - DiscoverMany(Vec), - /// `queries: { name: { file: ... } }` — explicit registry. - Explicit(BTreeMap), -} - -impl Default for QueriesDecl { - fn default() -> Self { - QueriesDecl::Explicit(BTreeMap::new()) - } -} - -/// Expand a graph's query declaration into the canonical name->file map. -/// Discovery reads and parses each `.gq`; unreadable or unparseable files -/// and duplicate query names are loud validation errors — a declaration the -/// tool cannot enumerate is broken, not partially usable. -fn resolve_query_decls( - config_dir: &Path, - graph_id: &str, - decl: &QueriesDecl, - diagnostics: &mut Vec, -) -> (BTreeMap, BTreeMap) { - let paths: Vec = match decl { - QueriesDecl::Explicit(map) => { - return ( - map.iter() - .map(|(name, config)| { - (name.clone(), QueryConfig { file: config.file.clone() }) - }) - .collect(), - BTreeMap::new(), - ); - } - QueriesDecl::Discover(path) => vec![path.clone()], - QueriesDecl::DiscoverMany(paths) => paths.clone(), - }; - - let mut files: Vec<(PathBuf, PathBuf)> = Vec::new(); // (declared-relative, resolved) - for declared in &paths { - let resolved = resolve_config_path(config_dir, declared); - if resolved.is_dir() { - let mut entries: Vec = match fs::read_dir(&resolved) { - Ok(read) => read - .flatten() - .map(|entry| entry.path()) - .filter(|path| path.extension().is_some_and(|ext| ext == "gq")) - .collect(), - Err(err) => { - diagnostics.push(Diagnostic::error( - "query_dir_unreadable", - format!("graphs.{graph_id}.queries"), - format!("could not list query directory '{}': {err}", resolved.display()), - )); - continue; - } - }; - entries.sort(); - if entries.is_empty() { - diagnostics.push(Diagnostic::warning( - "query_dir_empty", - format!("graphs.{graph_id}.queries"), - format!("query directory '{}' contains no .gq files", resolved.display()), - )); - } - for path in entries { - let relative = declared.join(path.file_name().expect("dir entries have names")); - files.push((relative, path)); - } - } else { - files.push((declared.clone(), resolved)); - } - } - - let mut registry: BTreeMap = BTreeMap::new(); - let mut origin: BTreeMap = BTreeMap::new(); - // Content read once at discovery and handed to the caller — the per-query - // digest/typecheck pass reuses it instead of re-reading (no N+1 reads, no - // window for the file to change between enumeration and validation). - let mut contents: BTreeMap = BTreeMap::new(); - for (declared, resolved) in files { - let source = match fs::read_to_string(&resolved) { - Ok(source) => source, - Err(err) => { - diagnostics.push(Diagnostic::error( - "query_file_missing", - format!("graphs.{graph_id}.queries"), - format!("could not read query file '{}': {err}", resolved.display()), - )); - continue; - } - }; - let parsed = match parse_query(&source) { - Ok(parsed) => parsed, - Err(err) => { - diagnostics.push(Diagnostic::error( - "query_parse_error", - format!("graphs.{graph_id}.queries"), - format!("'{}' does not parse: {err}", resolved.display()), - )); - continue; - } - }; - for query_decl in &parsed.queries { - let name = query_decl.name.clone(); - if let Some(previous) = origin.get(&name) { - diagnostics.push(Diagnostic::error( - "duplicate_query_name", - format!("graphs.{graph_id}.queries.{name}"), - format!( - "query '{name}' is declared in both '{}' and '{}'", - previous.display(), - declared.display() - ), - )); - continue; - } - origin.insert(name.clone(), declared.clone()); - registry.insert(name, QueryConfig { file: declared.clone() }); - } - contents.insert(declared, source); - } - (registry, contents) -} - #[derive(Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct QueryConfig { @@ -2138,725 +2008,6 @@ async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> St } } -fn parse_cluster_config(config_dir: &Path) -> ParsedConfig { - let config_dir = config_dir.to_path_buf(); - let config_file = config_dir.join(CLUSTER_CONFIG_FILE); - let mut diagnostics = Vec::new(); - - if !config_dir.is_dir() { - diagnostics.push(Diagnostic::error( - "config_dir_not_found", - display_path(&config_dir), - "`--config` must point at a directory containing cluster.yaml", - )); - return ParsedConfig { - raw: None, - diagnostics, - config_dir, - config_file, - }; - } - - let text = match fs::read_to_string(&config_file) { - Ok(text) => text, - Err(err) => { - diagnostics.push(Diagnostic::error( - "cluster_config_read_error", - CLUSTER_CONFIG_FILE, - format!("could not read cluster.yaml: {err}"), - )); - return ParsedConfig { - raw: None, - diagnostics, - config_dir, - config_file, - }; - } - }; - - diagnostics.extend(duplicate_key_diagnostics(&text)); - diagnostics.extend(future_field_diagnostics(&text)); - if has_errors(&diagnostics) { - return ParsedConfig { - raw: None, - diagnostics, - config_dir, - config_file, - }; - } - - let raw = match serde_yaml::from_str::(&text) { - Ok(raw) => Some(raw), - Err(err) => { - diagnostics.push(Diagnostic::error( - "invalid_cluster_yaml", - CLUSTER_CONFIG_FILE, - format!("could not parse cluster.yaml: {err}"), - )); - None - } - }; - - ParsedConfig { - raw, - diagnostics, - config_dir, - config_file, - } -} - -fn validate_cluster_header( - raw: &RawClusterConfig, - diagnostics: &mut Vec, -) -> ClusterSettings { - if raw.version != 1 { - diagnostics.push(Diagnostic::error( - "unsupported_cluster_config_version", - "version", - format!( - "unsupported cluster config version {}; this build supports version 1", - raw.version - ), - )); - } - if let Some(name) = raw.metadata.name.as_deref() { - if name.trim().is_empty() { - diagnostics.push(Diagnostic::error( - "empty_metadata_name", - "metadata.name", - "metadata.name must not be empty when provided", - )); - } - } - if let Some(backend) = raw.state.backend.as_deref() { - if backend != "cluster" { - diagnostics.push(Diagnostic::error( - "unsupported_state_backend", - "state.backend", - "Stage 2C supports only omitted state.backend or `cluster`", - )); - } - } - - ClusterSettings { - state_lock: raw.state.lock.unwrap_or(true), - } -} - - -fn parse_lock_file_for_unlock(text: &str) -> Result { - let lock = serde_json::from_str::(text).map_err(|err| { - Diagnostic::error( - "invalid_state_lock", - CLUSTER_LOCK_FILE, - format!("could not parse state lock: {err}"), - ) - })?; - if lock.version != 1 { - return Err(Diagnostic::error( - "unsupported_state_lock_version", - CLUSTER_LOCK_FILE, - format!("unsupported cluster state lock version {}", lock.version), - )); - } - Ok(lock) -} - -fn state_lock_held_message(observations: &StateObservations) -> String { - match observations.lock_id.as_deref() { - Some(lock_id) => format!( - "cluster state lock already exists (lock id {lock_id}); run `omnigraph cluster force-unlock {lock_id}` only after confirming no cluster operation is active" - ), - None => "cluster state lock already exists; remove it only after confirming no cluster operation is active".to_string(), - } -} - -fn state_resource_digests(state: &ClusterState) -> BTreeMap { - state - .applied_revision - .resources - .iter() - .map(|(address, resource)| (address.clone(), resource.digest.clone())) - .collect() -} - -fn initial_import_state(desired: &DesiredCluster) -> ClusterState { - ClusterState { - version: 1, - state_revision: 0, - applied_revision: AppliedRevisionState { - config_digest: Some(desired.config_digest.clone()), - resources: BTreeMap::new(), - }, - resource_statuses: BTreeMap::new(), - approval_records: BTreeMap::new(), - recovery_records: BTreeMap::new(), - observations: BTreeMap::new(), - } -} - - -async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize { - let mut graph_error_count = 0; - for graph in &desired.graphs { - let graph_address = graph_address(&graph.id); - let schema_address = schema_address(&graph.id); - let graph_path = desired - .config_dir - .join(CLUSTER_GRAPHS_DIR) - .join(format!("{}.omni", graph.id)); - let graph_uri = display_path(&graph_path); - let observed_at = now_rfc3339(); - - if !graph_path.exists() { - state.applied_revision.resources.remove(&graph_address); - state.applied_revision.resources.remove(&schema_address); - state.observations.insert( - graph_address.clone(), - graph_observation_json(GraphObservationJson { - address: &graph_address, - graph_uri: &graph_uri, - observed_at: &observed_at, - exists: false, - manifest_version: None, - schema_digest: None, - desired_schema_digest: &graph.schema_digest, - schema_matches_desired: Some(false), - error: Some("derived graph root is missing"), - }), - ); - set_resource_status( - state, - &graph_address, - ResourceLifecycleStatus::Drifted, - "graph_missing", - "derived graph root is missing", - ); - set_resource_status( - state, - &schema_address, - ResourceLifecycleStatus::Drifted, - "graph_missing", - "derived graph root is missing", - ); - continue; - } - - match observe_live_graph(&graph_uri).await { - Ok(observation) => { - let schema_matches = observation.schema_digest == graph.schema_digest; - state.applied_revision.resources.insert( - schema_address.clone(), - StateResource { - digest: observation.schema_digest.clone(), - applies_to: None, - }, - ); - let query_digests = state_query_digests_for_graph(state, &graph.id); - let graph_digest_value = graph_digest( - &graph.id, - Some(&observation.schema_digest), - Some(&query_digests), - ); - state.applied_revision.resources.insert( - graph_address.clone(), - StateResource { - digest: graph_digest_value, - applies_to: None, - }, - ); - state.observations.insert( - graph_address.clone(), - graph_observation_json(GraphObservationJson { - address: &graph_address, - graph_uri: &graph_uri, - observed_at: &observed_at, - exists: true, - manifest_version: Some(observation.manifest_version), - schema_digest: Some(observation.schema_digest.as_str()), - desired_schema_digest: &graph.schema_digest, - schema_matches_desired: Some(schema_matches), - error: None, - }), - ); - if schema_matches { - set_resource_status_applied(state, &graph_address); - set_resource_status_applied(state, &schema_address); - } else { - set_resource_status( - state, - &graph_address, - ResourceLifecycleStatus::Drifted, - "schema_mismatch", - "live schema digest differs from desired schema digest", - ); - set_resource_status( - state, - &schema_address, - ResourceLifecycleStatus::Drifted, - "schema_mismatch", - "live schema digest differs from desired schema digest", - ); - } - } - Err(error) => { - graph_error_count += 1; - state.observations.insert( - graph_address.clone(), - graph_observation_json(GraphObservationJson { - address: &graph_address, - graph_uri: &graph_uri, - observed_at: &observed_at, - exists: true, - manifest_version: None, - schema_digest: None, - desired_schema_digest: &graph.schema_digest, - schema_matches_desired: None, - error: Some(error.as_str()), - }), - ); - set_resource_status( - state, - &graph_address, - ResourceLifecycleStatus::Error, - "graph_observation_error", - error.as_str(), - ); - set_resource_status( - state, - &schema_address, - ResourceLifecycleStatus::Error, - "graph_observation_error", - error.as_str(), - ); - } - } - } - graph_error_count -} - -/// RFC-004 §D7: the data-aware preview — the engine's migration plan for a -/// desired schema against the live graph, computed read-only (no lock). -async fn preview_schema_migration( - graph_uri: &str, - schema_path: &str, -) -> Result { - let source = fs::read_to_string(schema_path).map_err(|err| err.to_string())?; - let db = Omnigraph::open_read_only(graph_uri) - .await - .map_err(|err| err.to_string())?; - let preview = db - .preview_schema_apply_with_options(&source, SchemaApplyOptions::default()) - .await - .map_err(|err| err.to_string())?; - Ok(preview.plan) -} - -struct LiveGraphObservation { - manifest_version: u64, - schema_digest: String, -} - -async fn observe_live_graph(graph_uri: &str) -> Result { - let db = Omnigraph::open_read_only(graph_uri) - .await - .map_err(|err| err.to_string())?; - let snapshot = db - .snapshot_of(ReadTarget::branch("main")) - .await - .map_err(|err| err.to_string())?; - let schema_source = db.schema_source(); - Ok(LiveGraphObservation { - manifest_version: snapshot.version(), - schema_digest: sha256_hex(schema_source.as_bytes()), - }) -} - -struct GraphObservationJson<'a> { - address: &'a str, - graph_uri: &'a str, - observed_at: &'a str, - exists: bool, - manifest_version: Option, - schema_digest: Option<&'a str>, - desired_schema_digest: &'a str, - schema_matches_desired: Option, - error: Option<&'a str>, -} - -fn graph_observation_json(observation: GraphObservationJson<'_>) -> serde_json::Value { - json!({ - "kind": "graph", - "address": observation.address, - "graph_uri": observation.graph_uri, - "observed_at": observation.observed_at, - "exists": observation.exists, - "manifest_version": observation.manifest_version, - "schema_digest": observation.schema_digest, - "desired_schema_digest": observation.desired_schema_digest, - "schema_matches_desired": observation.schema_matches_desired, - "error": observation.error, - }) -} - -fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeMap { - let prefix = format!("query.{graph_id}."); - state - .applied_revision - .resources - .iter() - .filter_map(|(address, resource)| { - address - .strip_prefix(&prefix) - .map(|name| (name.to_string(), resource.digest.clone())) - }) - .collect() -} - -fn set_resource_status_applied(state: &mut ClusterState, address: &str) { - state.resource_statuses.insert( - address.to_string(), - ResourceStatusRecord { - status: ResourceLifecycleStatus::Applied, - conditions: Vec::new(), - message: None, - }, - ); -} - -fn set_resource_status( - state: &mut ClusterState, - address: &str, - status: ResourceLifecycleStatus, - condition: &str, - message: &str, -) { - state.resource_statuses.insert( - address.to_string(), - ResourceStatusRecord { - status, - conditions: vec![condition.to_string()], - message: Some(message.to_string()), - }, - ); -} - -fn load_desired(config_dir: &Path) -> LoadOutcome { - let parsed = parse_cluster_config(config_dir); - let config_dir = parsed.config_dir; - let config_file = parsed.config_file; - let mut diagnostics = parsed.diagnostics; - let Some(raw) = parsed.raw else { - return LoadOutcome { - desired: None, - diagnostics, - config_dir, - config_file, - }; - }; - let settings = validate_cluster_header(&raw, &mut diagnostics); - - let mut resources = BTreeMap::new(); - let mut dependencies = BTreeSet::new(); - let mut graph_query_digests: BTreeMap> = BTreeMap::new(); - let mut graph_schema_digests: BTreeMap = BTreeMap::new(); - - for (graph_id, graph) in &raw.graphs { - validate_id( - "graph id", - &format!("graphs.{graph_id}"), - graph_id, - &mut diagnostics, - ); - let graph_address = graph_address(graph_id); - let schema_address = schema_address(graph_id); - dependencies.insert(Dependency { - from: schema_address.clone(), - to: graph_address.clone(), - }); - - let schema_path = resolve_config_path(&config_dir, &graph.schema); - let schema_source = match fs::read_to_string(&schema_path) { - Ok(source) => { - let digest = sha256_hex(source.as_bytes()); - graph_schema_digests.insert(graph_id.clone(), digest.clone()); - resources.insert( - schema_address.clone(), - ResourceSummary { - address: schema_address.clone(), - kind: "schema".to_string(), - digest, - path: Some(display_path(&schema_path)), - }, - ); - Some(source) - } - Err(err) => { - diagnostics.push(Diagnostic::error( - "schema_file_missing", - format!("graphs.{graph_id}.schema"), - format!( - "could not read schema file '{}': {err}", - schema_path.display() - ), - )); - None - } - }; - - let catalog = schema_source.and_then(|source| match parse_schema(&source) { - Ok(schema) => match build_catalog(&schema) { - Ok(catalog) => Some(catalog), - Err(err) => { - diagnostics.push(Diagnostic::error( - "schema_catalog_error", - format!("graphs.{graph_id}.schema"), - err.to_string(), - )); - None - } - }, - Err(err) => { - diagnostics.push(Diagnostic::error( - "schema_parse_error", - format!("graphs.{graph_id}.schema"), - err.to_string(), - )); - None - } - }); - - let (graph_queries, query_contents) = - resolve_query_decls(&config_dir, graph_id, &graph.queries, &mut diagnostics); - for (query_name, query) in &graph_queries { - validate_id( - "query name", - &format!("graphs.{graph_id}.queries.{query_name}"), - query_name, - &mut diagnostics, - ); - let query_address = query_address(graph_id, query_name); - dependencies.insert(Dependency { - from: query_address.clone(), - to: graph_address.clone(), - }); - dependencies.insert(Dependency { - from: query_address.clone(), - to: schema_address.clone(), - }); - - let query_path = resolve_config_path(&config_dir, &query.file); - let source = match query_contents.get(&query.file) { - Some(cached) => Ok(cached.clone()), - None => fs::read_to_string(&query_path), - }; - match source { - Ok(source) => { - let digest = sha256_hex(source.as_bytes()); - graph_query_digests - .entry(graph_id.clone()) - .or_default() - .insert(query_name.clone(), digest.clone()); - resources.insert( - query_address.clone(), - ResourceSummary { - address: query_address, - kind: "query".to_string(), - digest, - path: Some(display_path(&query_path)), - }, - ); - validate_query_source( - graph_id, - query_name, - &source, - catalog.as_ref(), - &mut diagnostics, - ); - } - Err(err) => diagnostics.push(Diagnostic::error( - "query_file_missing", - format!("graphs.{graph_id}.queries.{query_name}.file"), - format!( - "could not read query file '{}': {err}", - query_path.display() - ), - )), - } - } - } - - for graph_id in raw.graphs.keys() { - let digest = graph_digest( - graph_id, - graph_schema_digests.get(graph_id), - graph_query_digests.get(graph_id), - ); - resources.insert( - graph_address(graph_id), - ResourceSummary { - address: graph_address(graph_id), - kind: "graph".to_string(), - digest, - path: None, - }, - ); - } - - let mut policy_bindings: BTreeMap> = BTreeMap::new(); - for (policy_name, policy) in &raw.policies { - validate_id( - "policy name", - &format!("policies.{policy_name}"), - policy_name, - &mut diagnostics, - ); - if policy.applies_to.is_empty() { - diagnostics.push(Diagnostic::error( - "policy_missing_applies_to", - format!("policies.{policy_name}.applies_to"), - "policy.applies_to must name `cluster` or at least one graph", - )); - } - - let policy_address = policy_address(policy_name); - let mut normalized_bindings: Vec = Vec::new(); - for (idx, target) in policy.applies_to.iter().enumerate() { - match normalize_policy_target(target) { - PolicyTarget::Cluster => { - normalized_bindings.push("cluster".to_string()); - } - PolicyTarget::Graph(graph_id) => { - normalized_bindings.push(graph_address(&graph_id)); - if raw.graphs.contains_key(&graph_id) { - dependencies.insert(Dependency { - from: policy_address.clone(), - to: graph_address(&graph_id), - }); - } else { - diagnostics.push(Diagnostic::error( - "dangling_graph_reference", - format!("policies.{policy_name}.applies_to[{idx}]"), - format!( - "policy references graph `{graph_id}`, but no graph with that id is declared" - ), - )); - } - } - PolicyTarget::WrongKind(kind) => diagnostics.push(Diagnostic::error( - "wrong_kind_reference", - format!("policies.{policy_name}.applies_to[{idx}]"), - format!("policy applies_to expects graph refs or `cluster`, got `{kind}`"), - )), - } - } - - normalized_bindings.sort(); - normalized_bindings.dedup(); - policy_bindings.insert(policy_address.clone(), normalized_bindings); - - let policy_path = resolve_config_path(&config_dir, &policy.file); - match fs::read(&policy_path) { - Ok(bytes) => { - resources.insert( - policy_address.clone(), - ResourceSummary { - address: policy_address, - kind: "policy".to_string(), - digest: sha256_hex(&bytes), - path: Some(display_path(&policy_path)), - }, - ); - } - Err(err) => diagnostics.push(Diagnostic::error( - "policy_file_missing", - format!("policies.{policy_name}.file"), - format!( - "could not read policy file '{}': {err}", - policy_path.display() - ), - )), - } - } - - let mut resource_digests = BTreeMap::new(); - let mut resource_list = Vec::new(); - for (address, resource) in resources { - resource_digests.insert(address, resource.digest.clone()); - resource_list.push(resource); - } - let dependencies: Vec<_> = dependencies.into_iter().collect(); - let graphs = raw - .graphs - .keys() - .map(|graph_id| DesiredGraph { - id: graph_id.clone(), - schema_digest: graph_schema_digests - .get(graph_id) - .cloned() - .unwrap_or_default(), - }) - .collect(); - let config_digest = desired_config_digest(&raw, &resource_digests); - - LoadOutcome { - desired: Some(DesiredCluster { - config_dir: config_dir.clone(), - config_digest, - state_lock: settings.state_lock, - graphs, - resource_digests, - resources: resource_list, - dependencies, - policy_bindings, - }), - diagnostics, - config_dir, - config_file, - } -} - -fn validate_query_source( - graph_id: &str, - query_name: &str, - source: &str, - catalog: Option<&omnigraph_compiler::catalog::Catalog>, - diagnostics: &mut Vec, -) { - let path = format!("graphs.{graph_id}.queries.{query_name}"); - match parse_query(source) { - Ok(query_file) => { - let Some(query_decl) = query_file.queries.iter().find(|q| q.name == query_name) else { - diagnostics.push(Diagnostic::error( - "query_key_mismatch", - path, - format!("no `query {query_name}` declaration found in the referenced .gq file"), - )); - return; - }; - if let Some(catalog) = catalog { - if let Err(err) = typecheck_query_decl(catalog, query_decl) { - diagnostics.push(Diagnostic::error( - "query_typecheck_error", - format!("graphs.{graph_id}.queries.{query_name}"), - err.to_string(), - )); - } - } else { - diagnostics.push(Diagnostic::warning( - "query_typecheck_skipped", - format!("graphs.{graph_id}.queries.{query_name}"), - "query parsed, but type-check was skipped because the graph schema is invalid", - )); - } - } - Err(err) => diagnostics.push(Diagnostic::error( - "query_parse_error", - path, - err.to_string(), - )), - } -} /// Content-addressed catalog path for an applied resource payload. Extensions @@ -3117,36 +2268,6 @@ fn duplicate_key_diagnostics(text: &str) -> Vec { diagnostics } -fn future_field_diagnostics(text: &str) -> Vec { - let Ok(value) = serde_yaml::from_str::(text) else { - return Vec::new(); - }; - let Some(mapping) = value.as_mapping() else { - return Vec::new(); - }; - let future_fields = [ - "apply", - "env_file", - "providers", - "pipelines", - "embeddings", - "ui", - "aliases", - "bindings", - ]; - mapping - .keys() - .filter_map(|key| key.as_str()) - .filter(|key| future_fields.contains(key)) - .map(|key| { - Diagnostic::error( - "future_phase_field", - key, - format!("`{key}` is reserved for a later cluster-control phase"), - ) - }) - .collect() -} fn strip_comment(line: &str) -> String { let mut in_single_quote = false; @@ -3170,61 +2291,47 @@ fn strip_comment(line: &str) -> String { line.to_string() } -fn validate_id(kind: &str, path: &str, value: &str, diagnostics: &mut Vec) { - let mut chars = value.chars(); - let valid = chars - .next() - .is_some_and(|ch| ch.is_ascii_alphabetic() || ch == '_') - && chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-'); - if !valid { - diagnostics.push(Diagnostic::error( - "invalid_resource_id", - path, - format!("{kind} `{value}` must start with a letter or `_` and contain only ASCII letters, digits, `_`, or `-`"), - )); - } + +fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeMap { + let prefix = format!("query.{graph_id}."); + state + .applied_revision + .resources + .iter() + .filter_map(|(address, resource)| { + address + .strip_prefix(&prefix) + .map(|name| (name.to_string(), resource.digest.clone())) + }) + .collect() } -enum PolicyTarget { - Cluster, - Graph(String), - WrongKind(String), +fn set_resource_status_applied(state: &mut ClusterState, address: &str) { + state.resource_statuses.insert( + address.to_string(), + ResourceStatusRecord { + status: ResourceLifecycleStatus::Applied, + conditions: Vec::new(), + message: None, + }, + ); } -fn normalize_policy_target(value: &str) -> PolicyTarget { - if value == "cluster" { - PolicyTarget::Cluster - } else if let Some(graph_id) = value.strip_prefix("graph.") { - PolicyTarget::Graph(graph_id.to_string()) - } else if value.contains('.') { - PolicyTarget::WrongKind(value.to_string()) - } else { - PolicyTarget::Graph(value.to_string()) - } -} - -fn graph_address(graph_id: &str) -> String { - format!("graph.{graph_id}") -} - -fn schema_address(graph_id: &str) -> String { - format!("schema.{graph_id}") -} - -fn query_address(graph_id: &str, query_name: &str) -> String { - format!("query.{graph_id}.{query_name}") -} - -fn policy_address(policy_name: &str) -> String { - format!("policy.{policy_name}") -} - -fn resolve_config_path(config_dir: &Path, path: &Path) -> PathBuf { - if path.is_absolute() { - path.to_path_buf() - } else { - config_dir.join(path) - } +fn set_resource_status( + state: &mut ClusterState, + address: &str, + status: ResourceLifecycleStatus, + condition: &str, + message: &str, +) { + state.resource_statuses.insert( + address.to_string(), + ResourceStatusRecord { + status, + conditions: vec![condition.to_string()], + message: Some(message.to_string()), + }, + ); } fn graph_digest( diff --git a/crates/omnigraph-cluster/src/store.rs b/crates/omnigraph-cluster/src/store.rs index f378660..8a95661 100644 --- a/crates/omnigraph-cluster/src/store.rs +++ b/crates/omnigraph-cluster/src/store.rs @@ -559,3 +559,30 @@ impl Drop for StateLockGuard { let _ = fs::remove_file(&self.path); } } + +pub(crate) fn parse_lock_file_for_unlock(text: &str) -> Result { + let lock = serde_json::from_str::(text).map_err(|err| { + Diagnostic::error( + "invalid_state_lock", + CLUSTER_LOCK_FILE, + format!("could not parse state lock: {err}"), + ) + })?; + if lock.version != 1 { + return Err(Diagnostic::error( + "unsupported_state_lock_version", + CLUSTER_LOCK_FILE, + format!("unsupported cluster state lock version {}", lock.version), + )); + } + Ok(lock) +} + +pub(crate) fn state_lock_held_message(observations: &StateObservations) -> String { + match observations.lock_id.as_deref() { + Some(lock_id) => format!( + "cluster state lock already exists (lock id {lock_id}); run `omnigraph cluster force-unlock {lock_id}` only after confirming no cluster operation is active" + ), + None => "cluster state lock already exists; remove it only after confirming no cluster operation is active".to_string(), + } +}