diff --git a/Cargo.lock b/Cargo.lock index ebe5565..2b43ccf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4569,6 +4569,7 @@ name = "omnigraph-cluster" version = "0.6.1" dependencies = [ "omnigraph-compiler", + "omnigraph-engine", "serde", "serde_json", "serde_yaml", @@ -4576,6 +4577,7 @@ dependencies = [ "tempfile", "thiserror", "time", + "tokio", "ulid", ] diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 4ca4a4a..dd2645b 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ - DiagnosticSeverity, PlanOutput, StatusOutput, ValidateOutput, plan_config_dir, - status_config_dir, validate_config_dir, + DiagnosticSeverity, PlanOutput, StateSyncOutput, StatusOutput, ValidateOutput, + import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::schema::parser::parse_schema; @@ -350,6 +350,24 @@ enum ClusterCommand { #[arg(long)] json: bool, }, + /// Refresh existing local JSON state from declared graph observations. + Refresh { + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, + /// Import initial local JSON state from declared graph observations. + Import { + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, } /// Operations on the graph registry of a multi-graph server (MR-668). @@ -783,6 +801,34 @@ fn print_cluster_status_human(output: &StatusOutput) { print_cluster_diagnostics(&output.diagnostics); } +fn print_cluster_state_sync_human(output: &StateSyncOutput) { + let operation = match output.operation { + omnigraph_cluster::StateSyncOperation::Refresh => "refresh", + omnigraph_cluster::StateSyncOperation::Import => "import", + }; + if output.ok { + let state = &output.state_observations; + println!( + "cluster {operation}: revision {}, {} resource(s)", + state.state_revision, state.resource_count + ); + if let Some(cas) = state.state_cas.as_deref() { + println!(" state_cas: {cas}"); + } + if state.locked { + match state.lock_id.as_deref() { + Some(lock_id) => println!(" lock: acquired ({lock_id})"), + None => println!(" lock: acquired"), + } + } else { + println!(" lock: not acquired"); + } + } else { + println!("cluster {operation} failed"); + } + print_cluster_diagnostics(&output.diagnostics); +} + fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) { for diagnostic in diagnostics { let label = match diagnostic.severity { @@ -835,6 +881,19 @@ fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> { Ok(()) } +fn finish_cluster_state_sync(output: &StateSyncOutput, json: bool) -> Result<()> { + if json { + print_json(output)?; + } else { + print_cluster_state_sync_human(output); + } + if !output.ok { + io::stdout().flush()?; + std::process::exit(1); + } + Ok(()) +} + fn is_remote_uri(uri: &str) -> bool { uri.starts_with("http://") || uri.starts_with("https://") } @@ -3272,6 +3331,14 @@ async fn main() -> Result<()> { let output = status_config_dir(config); finish_cluster_status(&output, json)?; } + ClusterCommand::Refresh { config, json } => { + let output = refresh_config_dir(config).await; + finish_cluster_state_sync(&output, json)?; + } + ClusterCommand::Import { config, json } => { + let output = import_config_dir(config).await; + finish_cluster_state_sync(&output, json)?; + } }, Command::Graphs { command } => match command { GraphsCommand::List { diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 920ceda..b30bee4 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -124,6 +124,18 @@ policies: .unwrap(); } +fn init_cluster_derived_graph(root: &std::path::Path) { + let graph_dir = root.join("graphs"); + fs::create_dir_all(&graph_dir).unwrap(); + output_success( + cli() + .arg("init") + .arg("--schema") + .arg(root.join("people.pg")) + .arg(graph_dir.join("knowledge.omni")), + ); +} + #[test] fn version_command_prints_current_cli_version() { let output = output_success(cli().arg("version")); @@ -376,6 +388,196 @@ fn cluster_plan_locked_state_exits_nonzero() { ); } +#[test] +fn cluster_import_json_bootstraps_missing_state() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("import") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true); + assert_eq!(json["operation"], "import"); + assert_eq!(json["state_observations"]["state_revision"], 1); + assert!( + json["state_observations"]["state_cas"] + .as_str() + .unwrap() + .starts_with("sha256:") + ); + assert!(json["observations"]["graph.knowledge"]["manifest_version"].is_number()); + assert_eq!( + json["resource_statuses"]["graph.knowledge"]["status"], + "applied" + ); + assert!(temp.path().join("__cluster/state.json").exists()); + assert!(!temp.path().join("__cluster/lock.json").exists()); +} + +#[test] +fn cluster_refresh_json_updates_revision_cas_and_removes_lock() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + init_cluster_derived_graph(temp.path()); + let state_dir = temp.path().join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#" +{ + "version": 1, + "state_revision": 2, + "applied_revision": { "resources": {} } +} +"#, + ) + .unwrap(); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("refresh") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true); + assert_eq!(json["operation"], "refresh"); + assert_eq!(json["state_observations"]["state_revision"], 3); + assert!( + json["state_observations"]["state_cas"] + .as_str() + .unwrap() + .starts_with("sha256:") + ); + assert!(!state_dir.join("lock.json").exists()); +} + +#[test] +fn cluster_refresh_missing_state_exits_nonzero() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let output = output_failure( + cli() + .arg("cluster") + .arg("refresh") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + ); + let json = parse_stdout_json(&output); + assert_eq!(json["ok"], false); + assert!( + json["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_missing"), + "missing state should produce a useful diagnostic: {json}" + ); +} + +#[test] +fn cluster_import_existing_state_exits_nonzero() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + let state_dir = temp.path().join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{"version":1,"applied_revision":{"resources":{}}}"#, + ) + .unwrap(); + + let output = output_failure( + cli() + .arg("cluster") + .arg("import") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + ); + let json = parse_stdout_json(&output); + assert_eq!(json["ok"], false); + assert!( + json["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_already_exists"), + "existing state should produce a useful diagnostic: {json}" + ); +} + +#[test] +fn cluster_refresh_and_import_locked_state_exit_nonzero() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + let state_dir = temp.path().join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{"version":1,"applied_revision":{"resources":{}}}"#, + ) + .unwrap(); + fs::write( + state_dir.join("lock.json"), + r#"{"version":1,"lock_id":"held-lock","operation":"refresh","created_at":"2026-06-08T00:00:00Z","pid":123}"#, + ) + .unwrap(); + + let refresh = parse_stdout_json(&output_failure( + cli() + .arg("cluster") + .arg("refresh") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(refresh["state_observations"]["lock_id"], "held-lock"); + assert!( + refresh["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_lock_held") + ); + + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + let state_dir = temp.path().join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("lock.json"), + r#"{"version":1,"lock_id":"held-lock","operation":"import","created_at":"2026-06-08T00:00:00Z","pid":123}"#, + ) + .unwrap(); + + let imported = parse_stdout_json(&output_failure( + cli() + .arg("cluster") + .arg("import") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(imported["state_observations"]["lock_id"], "held-lock"); + assert!( + imported["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_lock_held") + ); +} + #[test] fn cluster_validate_invalid_config_exits_nonzero() { let temp = tempdir().unwrap(); diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml index d210b1c..749d825 100644 --- a/crates/omnigraph-cluster/Cargo.toml +++ b/crates/omnigraph-cluster/Cargo.toml @@ -9,6 +9,7 @@ homepage = "https://github.com/ModernRelay/omnigraph" documentation = "https://docs.rs/omnigraph-cluster" [dependencies] +omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" } omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" } serde = { workspace = true } serde_json = { workspace = true } @@ -20,3 +21,4 @@ ulid = { workspace = true } [dev-dependencies] tempfile = { workspace = true } +tokio = { workspace = true } diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 5115933..db77e14 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -4,17 +4,20 @@ use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::process; +use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph_compiler::build_catalog; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::query::typecheck::typecheck_query_decl; use omnigraph_compiler::schema::parser::parse_schema; use serde::{Deserialize, Serialize}; +use serde_json::json; use sha2::{Digest, Sha256}; use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; use ulid::Ulid; pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml"; +pub const CLUSTER_GRAPHS_DIR: &str = "graphs"; pub const CLUSTER_STATE_DIR: &str = "__cluster"; pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json"; @@ -179,6 +182,26 @@ pub struct StatusOutput { pub state_observations: StateObservations, pub resource_digests: BTreeMap, pub resource_statuses: BTreeMap, + pub observations: BTreeMap, + pub diagnostics: Vec, +} + +#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum StateSyncOperation { + Refresh, + Import, +} + +#[derive(Debug, Clone, Serialize)] +pub struct StateSyncOutput { + pub ok: bool, + pub operation: StateSyncOperation, + pub config_dir: String, + pub state_observations: StateObservations, + pub resource_digests: BTreeMap, + pub resource_statuses: BTreeMap, + pub observations: BTreeMap, pub diagnostics: Vec, } @@ -187,11 +210,18 @@ struct DesiredCluster { config_dir: PathBuf, config_digest: String, state_lock: bool, + graphs: Vec, resource_digests: BTreeMap, resources: Vec, dependencies: Vec, } +#[derive(Debug, Clone)] +struct DesiredGraph { + id: String, + schema_digest: String, +} + #[derive(Debug)] struct ParsedConfig { raw: Option, @@ -261,8 +291,10 @@ struct PolicyConfig { applies_to: Vec, } +// Stage 2A/2B accept these forward-compatible state sections so existing +// ledgers won't churn while approval/recovery semantics are staged later. #[allow(dead_code)] -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct ClusterState { version: u32, @@ -279,7 +311,7 @@ struct ClusterState { observations: BTreeMap, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct AppliedRevisionState { #[serde(default)] @@ -288,7 +320,7 @@ struct AppliedRevisionState { resources: BTreeMap, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] struct StateResource { digest: String, @@ -314,6 +346,7 @@ struct LocalStateBackend { #[derive(Debug)] struct StateSnapshot { state: Option, + state_cas: Option, } #[derive(Debug)] @@ -447,6 +480,7 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { let mut resource_digests = BTreeMap::new(); let mut resource_statuses = BTreeMap::new(); + let mut state_observation_records = BTreeMap::new(); if let Some(raw) = parsed.raw.as_ref() { let _settings = validate_cluster_header(raw, &mut diagnostics); @@ -456,6 +490,7 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { if let Some(state) = snapshot.state { resource_digests = state_resource_digests(&state); resource_statuses = state.resource_statuses; + state_observation_records = state.observations; } else { diagnostics.push(Diagnostic::warning( "state_missing", @@ -475,6 +510,185 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { state_observations: observations, resource_digests, resource_statuses, + observations: state_observation_records, + diagnostics, + } +} + +pub async fn refresh_config_dir(config_dir: impl AsRef) -> StateSyncOutput { + sync_config_dir(config_dir.as_ref(), StateSyncOperation::Refresh).await +} + +pub async fn import_config_dir(config_dir: impl AsRef) -> StateSyncOutput { + sync_config_dir(config_dir.as_ref(), StateSyncOperation::Import).await +} + +async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput { + let outcome = load_desired(config_dir); + let mut diagnostics = outcome.diagnostics; + let backend = LocalStateBackend::new(&outcome.config_dir); + let mut observations = backend.observations(); + + let Some(desired) = outcome.desired else { + return StateSyncOutput { + ok: false, + operation, + config_dir: display_path(&outcome.config_dir), + state_observations: observations, + resource_digests: BTreeMap::new(), + resource_statuses: BTreeMap::new(), + observations: BTreeMap::new(), + diagnostics, + }; + }; + + if has_errors(&diagnostics) { + return StateSyncOutput { + ok: false, + operation, + config_dir: display_path(&desired.config_dir), + state_observations: observations, + resource_digests: desired.resource_digests, + resource_statuses: BTreeMap::new(), + observations: BTreeMap::new(), + diagnostics, + }; + } + + let operation_label = state_sync_operation_label(operation); + let _lock_guard = if desired.state_lock { + match backend.acquire_lock(operation_label, &mut observations) { + Ok(guard) => Some(guard), + Err(diagnostic) => { + diagnostics.push(diagnostic); + None + } + } + } else { + diagnostics.push(Diagnostic::warning( + "state_lock_disabled", + "state.lock", + format!( + "state.lock is false; {operation_label} wrote state without acquiring the cluster state lock" + ), + )); + None + }; + + if has_errors(&diagnostics) { + return StateSyncOutput { + ok: false, + operation, + config_dir: display_path(&desired.config_dir), + state_observations: observations, + resource_digests: desired.resource_digests, + resource_statuses: BTreeMap::new(), + observations: BTreeMap::new(), + diagnostics, + }; + } + + let snapshot = match backend.read_state(&mut observations) { + Ok(snapshot) => snapshot, + Err(diagnostic) => { + diagnostics.push(diagnostic); + return StateSyncOutput { + ok: false, + operation, + config_dir: display_path(&desired.config_dir), + state_observations: observations, + resource_digests: desired.resource_digests, + resource_statuses: BTreeMap::new(), + observations: BTreeMap::new(), + diagnostics, + }; + } + }; + + let expected_cas = snapshot.state_cas; + let mut state = match (operation, snapshot.state) { + (StateSyncOperation::Refresh, Some(state)) => state, + (StateSyncOperation::Refresh, None) => { + diagnostics.push(Diagnostic::error( + "state_missing", + CLUSTER_STATE_FILE, + "refresh requires an existing state.json; run `cluster import` to bootstrap state", + )); + return StateSyncOutput { + ok: false, + operation, + config_dir: display_path(&desired.config_dir), + state_observations: observations, + resource_digests: BTreeMap::new(), + resource_statuses: BTreeMap::new(), + observations: BTreeMap::new(), + diagnostics, + }; + } + (StateSyncOperation::Import, Some(state)) => { + diagnostics.push(Diagnostic::error( + "state_already_exists", + CLUSTER_STATE_FILE, + "import creates initial state only when state.json is missing; use `cluster refresh` for an existing state ledger", + )); + return StateSyncOutput { + ok: false, + operation, + config_dir: display_path(&desired.config_dir), + state_observations: observations, + resource_digests: state_resource_digests(&state), + resource_statuses: state.resource_statuses, + observations: state.observations, + diagnostics, + }; + } + (StateSyncOperation::Import, None) => initial_import_state(&desired), + }; + + let graph_error_count = observe_declared_graphs(&desired, &mut state).await; + if graph_error_count > 0 { + diagnostics.push(Diagnostic::error( + "graph_observation_error", + CLUSTER_GRAPHS_DIR, + format!("{graph_error_count} graph observation(s) failed"), + )); + } + + if operation == StateSyncOperation::Import && has_errors(&diagnostics) { + return StateSyncOutput { + ok: false, + operation, + config_dir: display_path(&desired.config_dir), + state_observations: observations, + resource_digests: state_resource_digests(&state), + resource_statuses: state.resource_statuses, + observations: state.observations, + diagnostics, + }; + } + + if operation == StateSyncOperation::Import { + state.state_revision = 1; + } else { + state.state_revision = state.state_revision.saturating_add(1); + } + + match backend.write_state(&state, expected_cas.as_deref(), &mut observations) { + Ok(()) => {} + Err(diagnostic) => diagnostics.push(diagnostic), + } + + let resource_digests = state_resource_digests(&state); + let ok = !has_errors(&diagnostics); + + StateSyncOutput { + ok, + operation, + config_dir: display_path(&desired.config_dir), + state_observations: observations, + resource_digests, + resource_statuses: state.resource_statuses, + observations: state.observations, diagnostics, } } @@ -574,7 +788,7 @@ fn validate_cluster_header( diagnostics.push(Diagnostic::error( "unsupported_state_backend", "state.backend", - "Stage 2A supports only omitted state.backend or `cluster`", + "Stage 2B supports only omitted state.backend or `cluster`", )); } } @@ -615,7 +829,10 @@ impl LocalStateBackend { let text = match fs::read_to_string(&self.state_path) { Ok(text) => text, Err(err) if err.kind() == ErrorKind::NotFound => { - return Ok(StateSnapshot { state: None }); + return Ok(StateSnapshot { + state: None, + state_cas: None, + }); } Err(err) => { return Err(Diagnostic::error( @@ -627,7 +844,8 @@ impl LocalStateBackend { }; observations.state_found = true; - observations.state_cas = Some(format!("sha256:{}", sha256_hex(text.as_bytes()))); + let state_cas = format!("sha256:{}", sha256_hex(text.as_bytes())); + observations.state_cas = Some(state_cas.clone()); let state = serde_json::from_str::(&text).map_err(|err| { Diagnostic::error( @@ -652,7 +870,109 @@ impl LocalStateBackend { observations.state_revision = state.state_revision; observations.resource_count = state.applied_revision.resources.len(); - Ok(StateSnapshot { state: Some(state) }) + Ok(StateSnapshot { + state: Some(state), + state_cas: Some(state_cas), + }) + } + + fn write_state( + &self, + state: &ClusterState, + expected_cas: Option<&str>, + observations: &mut StateObservations, + ) -> Result<(), Diagnostic> { + fs::create_dir_all(&self.state_dir).map_err(|err| { + Diagnostic::error( + "state_write_error", + CLUSTER_STATE_DIR, + format!("could not create cluster state directory: {err}"), + ) + })?; + + let current_cas = self.current_state_cas()?; + if current_cas.as_deref() != expected_cas { + return Err(Diagnostic::error( + "state_cas_mismatch", + CLUSTER_STATE_FILE, + "state.json changed while the command was running; re-run the command against the latest state", + )); + } + + let mut payload = serde_json::to_string_pretty(state).map_err(|err| { + Diagnostic::error( + "state_write_error", + CLUSTER_STATE_FILE, + format!("could not encode state JSON: {err}"), + ) + })?; + payload.push('\n'); + + let tmp_path = self + .state_dir + .join(format!("state.json.tmp.{}", Ulid::new())); + let mut file = OpenOptions::new() + .write(true) + .create_new(true) + .open(&tmp_path) + .map_err(|err| { + Diagnostic::error( + "state_write_error", + display_path(&tmp_path), + format!("could not create temporary state file: {err}"), + ) + })?; + file.write_all(payload.as_bytes()).map_err(|err| { + Diagnostic::error( + "state_write_error", + display_path(&tmp_path), + format!("could not write temporary state file: {err}"), + ) + })?; + file.sync_all().map_err(|err| { + Diagnostic::error( + "state_write_error", + display_path(&tmp_path), + format!("could not sync temporary state file: {err}"), + ) + })?; + drop(file); + + if let Err(err) = fs::rename(&tmp_path, &self.state_path) { + let _ = fs::remove_file(&tmp_path); + return Err(Diagnostic::error( + "state_write_error", + CLUSTER_STATE_FILE, + format!("could not replace state.json atomically: {err}"), + )); + } + + let written = fs::read_to_string(&self.state_path).map_err(|err| { + Diagnostic::error( + "state_write_error", + CLUSTER_STATE_FILE, + format!("could not read state.json after write: {err}"), + ) + })?; + observations.state_found = true; + observations.applied_config_digest = state.applied_revision.config_digest.clone(); + observations.state_revision = state.state_revision; + observations.state_cas = Some(format!("sha256:{}", sha256_hex(written.as_bytes()))); + observations.resource_count = state.applied_revision.resources.len(); + + Ok(()) + } + + fn current_state_cas(&self) -> Result, Diagnostic> { + match fs::read(&self.state_path) { + Ok(bytes) => Ok(Some(format!("sha256:{}", sha256_hex(&bytes)))), + Err(err) if err.kind() == ErrorKind::NotFound => Ok(None), + Err(err) => Err(Diagnostic::error( + "state_read_error", + CLUSTER_STATE_FILE, + format!("could not read state file for CAS check: {err}"), + )), + } } fn acquire_lock( @@ -780,6 +1100,247 @@ fn state_resource_digests(state: &ClusterState) -> BTreeMap { .collect() } +fn initial_import_state(desired: &DesiredCluster) -> ClusterState { + ClusterState { + version: 1, + state_revision: 0, + applied_revision: AppliedRevisionState { + config_digest: Some(desired.config_digest.clone()), + resources: BTreeMap::new(), + }, + resource_statuses: BTreeMap::new(), + approval_records: BTreeMap::new(), + recovery_records: BTreeMap::new(), + observations: BTreeMap::new(), + } +} + +async fn observe_declared_graphs(desired: &DesiredCluster, state: &mut ClusterState) -> usize { + let mut graph_error_count = 0; + for graph in &desired.graphs { + let graph_address = graph_address(&graph.id); + let schema_address = schema_address(&graph.id); + let graph_path = desired + .config_dir + .join(CLUSTER_GRAPHS_DIR) + .join(format!("{}.omni", graph.id)); + let graph_uri = display_path(&graph_path); + let observed_at = now_rfc3339(); + + if !graph_path.exists() { + state.applied_revision.resources.remove(&graph_address); + state.applied_revision.resources.remove(&schema_address); + state.observations.insert( + graph_address.clone(), + graph_observation_json(GraphObservationJson { + address: &graph_address, + graph_uri: &graph_uri, + observed_at: &observed_at, + exists: false, + manifest_version: None, + schema_digest: None, + desired_schema_digest: &graph.schema_digest, + schema_matches_desired: Some(false), + error: Some("derived graph root is missing"), + }), + ); + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "graph_missing", + "derived graph root is missing", + ); + set_resource_status( + state, + &schema_address, + ResourceLifecycleStatus::Drifted, + "graph_missing", + "derived graph root is missing", + ); + continue; + } + + match observe_live_graph(&graph_uri).await { + Ok(observation) => { + let schema_matches = observation.schema_digest == graph.schema_digest; + state.applied_revision.resources.insert( + schema_address.clone(), + StateResource { + digest: observation.schema_digest.clone(), + }, + ); + let query_digests = state_query_digests_for_graph(state, &graph.id); + let graph_digest_value = graph_digest( + &graph.id, + Some(&observation.schema_digest), + Some(&query_digests), + ); + state.applied_revision.resources.insert( + graph_address.clone(), + StateResource { + digest: graph_digest_value, + }, + ); + state.observations.insert( + graph_address.clone(), + graph_observation_json(GraphObservationJson { + address: &graph_address, + graph_uri: &graph_uri, + observed_at: &observed_at, + exists: true, + manifest_version: Some(observation.manifest_version), + schema_digest: Some(observation.schema_digest.as_str()), + desired_schema_digest: &graph.schema_digest, + schema_matches_desired: Some(schema_matches), + error: None, + }), + ); + if schema_matches { + set_resource_status_applied(state, &graph_address); + set_resource_status_applied(state, &schema_address); + } else { + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Drifted, + "schema_mismatch", + "live schema digest differs from desired schema digest", + ); + set_resource_status( + state, + &schema_address, + ResourceLifecycleStatus::Drifted, + "schema_mismatch", + "live schema digest differs from desired schema digest", + ); + } + } + Err(error) => { + graph_error_count += 1; + state.observations.insert( + graph_address.clone(), + graph_observation_json(GraphObservationJson { + address: &graph_address, + graph_uri: &graph_uri, + observed_at: &observed_at, + exists: true, + manifest_version: None, + schema_digest: None, + desired_schema_digest: &graph.schema_digest, + schema_matches_desired: None, + error: Some(error.as_str()), + }), + ); + set_resource_status( + state, + &graph_address, + ResourceLifecycleStatus::Error, + "graph_observation_error", + error.as_str(), + ); + set_resource_status( + state, + &schema_address, + ResourceLifecycleStatus::Error, + "graph_observation_error", + error.as_str(), + ); + } + } + } + graph_error_count +} + +struct LiveGraphObservation { + manifest_version: u64, + schema_digest: String, +} + +async fn observe_live_graph(graph_uri: &str) -> Result { + let db = Omnigraph::open_read_only(graph_uri) + .await + .map_err(|err| err.to_string())?; + let snapshot = db + .snapshot_of(ReadTarget::branch("main")) + .await + .map_err(|err| err.to_string())?; + let schema_source = db.schema_source(); + Ok(LiveGraphObservation { + manifest_version: snapshot.version(), + schema_digest: sha256_hex(schema_source.as_bytes()), + }) +} + +struct GraphObservationJson<'a> { + address: &'a str, + graph_uri: &'a str, + observed_at: &'a str, + exists: bool, + manifest_version: Option, + schema_digest: Option<&'a str>, + desired_schema_digest: &'a str, + schema_matches_desired: Option, + error: Option<&'a str>, +} + +fn graph_observation_json(observation: GraphObservationJson<'_>) -> serde_json::Value { + json!({ + "kind": "graph", + "address": observation.address, + "graph_uri": observation.graph_uri, + "observed_at": observation.observed_at, + "exists": observation.exists, + "manifest_version": observation.manifest_version, + "schema_digest": observation.schema_digest, + "desired_schema_digest": observation.desired_schema_digest, + "schema_matches_desired": observation.schema_matches_desired, + "error": observation.error, + }) +} + +fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeMap { + let prefix = format!("query.{graph_id}."); + state + .applied_revision + .resources + .iter() + .filter_map(|(address, resource)| { + address + .strip_prefix(&prefix) + .map(|name| (name.to_string(), resource.digest.clone())) + }) + .collect() +} + +fn set_resource_status_applied(state: &mut ClusterState, address: &str) { + state.resource_statuses.insert( + address.to_string(), + ResourceStatusRecord { + status: ResourceLifecycleStatus::Applied, + conditions: Vec::new(), + message: None, + }, + ); +} + +fn set_resource_status( + state: &mut ClusterState, + address: &str, + status: ResourceLifecycleStatus, + condition: &str, + message: &str, +) { + state.resource_statuses.insert( + address.to_string(), + ResourceStatusRecord { + status, + conditions: vec![condition.to_string()], + message: Some(message.to_string()), + }, + ); +} + fn load_desired(config_dir: &Path) -> LoadOutcome { let parsed = parse_cluster_config(config_dir); let config_dir = parsed.config_dir; @@ -1026,6 +1587,17 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { resource_list.push(resource); } let dependencies: Vec<_> = dependencies.into_iter().collect(); + let graphs = raw + .graphs + .keys() + .map(|graph_id| DesiredGraph { + id: graph_id.clone(), + schema_digest: graph_schema_digests + .get(graph_id) + .cloned() + .unwrap_or_default(), + }) + .collect(); let config_digest = desired_config_digest(&config_text, &resource_digests); LoadOutcome { @@ -1033,6 +1605,7 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { config_dir: config_dir.clone(), config_digest, state_lock: settings.state_lock, + graphs, resource_digests, resources: resource_list, dependencies, @@ -1368,13 +1941,28 @@ fn desired_config_digest( fn sha256_hex(bytes: &[u8]) -> String { let digest = Sha256::digest(bytes); + const HEX: &[u8; 16] = b"0123456789abcdef"; let mut out = String::with_capacity(digest.len() * 2); for byte in digest { - out.push_str(&format!("{byte:02x}")); + out.push(HEX[(byte >> 4) as usize] as char); + out.push(HEX[(byte & 0x0f) as usize] as char); } out } +fn now_rfc3339() -> String { + OffsetDateTime::now_utc() + .format(&Rfc3339) + .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()) +} + +fn state_sync_operation_label(operation: StateSyncOperation) -> &'static str { + match operation { + StateSyncOperation::Refresh => "refresh", + StateSyncOperation::Import => "import", + } +} + fn has_errors(diagnostics: &[Diagnostic]) -> bool { diagnostics .iter() @@ -1388,7 +1976,9 @@ fn display_path(path: &Path) -> String { #[cfg(test)] mod tests { use std::fs; + use std::path::Path; + use omnigraph::db::Omnigraph; use serde_json::json; use tempfile::tempdir; @@ -1438,6 +2028,15 @@ policies: dir } + async fn init_derived_graph(root: &Path) { + let graph_dir = root.join(CLUSTER_GRAPHS_DIR); + fs::create_dir_all(&graph_dir).unwrap(); + let graph = graph_dir.join("knowledge.omni"); + Omnigraph::init(graph.to_string_lossy().as_ref(), SCHEMA) + .await + .unwrap(); + } + #[test] fn valid_minimal_config() { let dir = fixture(); @@ -1868,4 +2467,273 @@ graphs: .any(|diagnostic| diagnostic.code == "unsupported_state_backend") ); } + + #[tokio::test] + async fn import_missing_state_creates_state_with_graph_observation() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + + let out = import_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(out.state_observations.state_revision, 1); + assert!(out.state_observations.state_cas.is_some()); + assert!(out.state_observations.locked); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + assert_eq!( + out.resource_digests + .get("schema.knowledge") + .map(String::as_str), + Some(sha256_hex(SCHEMA.as_bytes()).as_str()) + ); + assert!(out.observations["graph.knowledge"]["manifest_version"].is_number()); + assert_eq!( + out.observations["graph.knowledge"]["schema_matches_desired"], + true + ); + + let state: serde_json::Value = + serde_json::from_str(&fs::read_to_string(dir.path().join(CLUSTER_STATE_FILE)).unwrap()) + .unwrap(); + assert_eq!(state["state_revision"], 1); + assert_eq!( + state["resource_statuses"]["graph.knowledge"]["status"], + "applied" + ); + } + + #[tokio::test] + async fn import_existing_state_fails() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{"version":1,"applied_revision":{"resources":{}}}"#, + ) + .unwrap(); + + let out = import_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_already_exists") + ); + } + + #[tokio::test] + async fn refresh_missing_state_fails() { + let dir = fixture(); + let out = refresh_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_missing") + ); + } + + #[tokio::test] + async fn refresh_existing_minimal_state_increments_revision_and_updates_cas() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{"version":1,"applied_revision":{"config_digest":"old","resources":{"graph.knowledge":{"digest":"old"}}}}"#, + ) + .unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(out.state_observations.state_revision, 1); + assert!(out.state_observations.state_cas.is_some()); + assert_eq!( + out.resource_statuses["graph.knowledge"].status, + ResourceLifecycleStatus::Applied + ); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[tokio::test] + async fn refresh_records_live_schema_digest_and_manifest_version() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{"version":1,"state_revision":4,"applied_revision":{"resources":{}}}"#, + ) + .unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(out.state_observations.state_revision, 5); + assert_eq!( + out.observations["graph.knowledge"]["schema_digest"], + sha256_hex(SCHEMA.as_bytes()) + ); + assert!(out.observations["graph.knowledge"]["manifest_version"].is_u64()); + } + + #[tokio::test] + async fn missing_derived_graph_root_marks_drifted_and_plans_creates() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{"version":1,"applied_revision":{"resources":{"graph.knowledge":{"digest":"old-graph"},"schema.knowledge":{"digest":"old-schema"}}}}"#, + ) + .unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!( + out.resource_statuses["graph.knowledge"].status, + ResourceLifecycleStatus::Drifted + ); + assert!(!out.resource_digests.contains_key("graph.knowledge")); + assert_eq!(out.observations["graph.knowledge"]["exists"], false); + + let plan = plan_config_dir(dir.path()); + assert!(plan.ok, "{:?}", plan.diagnostics); + assert!(plan.changes.iter().any(|change| { + change.resource == "graph.knowledge" && change.operation == PlanOperation::Create + })); + assert!(plan.changes.iter().any(|change| { + change.resource == "schema.knowledge" && change.operation == PlanOperation::Create + })); + } + + #[tokio::test] + async fn live_schema_mismatch_marks_drifted_and_causes_plan_update() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + fs::write( + dir.path().join("people.pg"), + SCHEMA.replace("age: I32?", "age: I32?\n nickname: String?"), + ) + .unwrap(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{"version":1,"applied_revision":{"resources":{"graph.knowledge":{"digest":"old-graph"},"schema.knowledge":{"digest":"old-schema"}}}}"#, + ) + .unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!( + out.resource_statuses["schema.knowledge"].status, + ResourceLifecycleStatus::Drifted + ); + assert_eq!( + out.observations["graph.knowledge"]["schema_matches_desired"], + false + ); + + let plan = plan_config_dir(dir.path()); + assert!(plan.ok, "{:?}", plan.diagnostics); + assert!(plan.changes.iter().any(|change| { + change.resource == "schema.knowledge" && change.operation == PlanOperation::Update + })); + } + + #[tokio::test] + async fn existing_lock_makes_refresh_fail() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{"version":1,"applied_revision":{"resources":{}}}"#, + ) + .unwrap(); + fs::write( + state_dir.join("lock.json"), + r#"{"version":1,"lock_id":"held-lock","operation":"refresh","created_at":"2026-06-08T00:00:00Z","pid":123}"#, + ) + .unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(!out.ok); + assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_held") + ); + } + + #[tokio::test] + async fn state_lock_false_bypasses_refresh_lock_with_warning() { + let dir = fixture(); + init_derived_graph(dir.path()).await; + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +state: + backend: cluster + lock: false +graphs: + knowledge: + schema: ./people.pg +"#, + ) + .unwrap(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{"version":1,"applied_revision":{"resources":{}}}"#, + ) + .unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!out.state_observations.locked); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_disabled") + ); + } + + #[tokio::test] + async fn external_state_backend_refresh_rejected() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + "version: 1\nstate:\n backend: s3://bucket/state\ngraphs: {}\n", + ) + .unwrap(); + + let out = refresh_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "unsupported_state_backend") + ); + } + + #[tokio::test] + async fn import_graph_open_error_does_not_create_state() { + let dir = fixture(); + fs::create_dir_all(dir.path().join(CLUSTER_GRAPHS_DIR).join("knowledge.omni")).unwrap(); + + let out = import_config_dir(dir.path()).await; + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "graph_observation_error") + ); + assert!(!dir.path().join(CLUSTER_STATE_FILE).exists()); + } } diff --git a/docs/dev/cluster-config-specs.md b/docs/dev/cluster-config-specs.md index 8094be2..8aa63cb 100644 --- a/docs/dev/cluster-config-specs.md +++ b/docs/dev/cluster-config-specs.md @@ -5,6 +5,13 @@ **Date:** 2026-06-07 **Relationship:** generalizes today's `omnigraph.yaml` graph/query/policy configuration surface ([CLI reference](../user/cli-reference.md), [server docs](../user/server.md)) into a future cluster control plane. The distilled rules are in [cluster-axioms.md](cluster-axioms.md); detailed downstream implementation spec and blast-radius assessment in [cluster-config-implementation-spec.md](cluster-config-implementation-spec.md). This is a proposed architecture, not an implemented RFC. +> **Implementation status.** The examples below describe the full target schema. +> Stage 2B only accepts the read-only subset documented in +> [cluster-config.md](../user/cluster-config.md). Future-phase fields such as +> `env_file`, `apply`, `providers`, `pipelines`, `embeddings`, `ui`, `aliases`, +> and `bindings` are intentionally rejected with typed diagnostics until their +> reconciler semantics are implemented. + > **Revision 2026-06-07 — full commitment to the Terraform paradigm.** Three changes from the earlier draft: (1) **state is an authoritative, locked ledger in a backend** (server-hosted *or* a separate cloud store), not "a mostly-rebuildable projection"; (2) `plan` is framed as the **CLI diff between local config and state**; (3) **ETL pipelines** (external data sources) are a first-class config asset — a second seam, alongside schema, where a definition triggers a data-plane effect. The full set of config assets (incl. **aliases**, **embeddings**) is enumerated below. --- diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 1035d84..e732bd7 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling, read-only validate/plan/status | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling, read-only validate/plan/status plus explicit refresh/import graph observations | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 92ad303..68efce7 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -21,7 +21,7 @@ A reference for the `omnigraph` binary's command surface and `omnigraph.yaml` sc | `schema plan \| apply \| show (alias: get)` | migrations | | `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` | | `queries validate \| list` | operate on the server-side stored-query registry (the `queries:` block). `validate` type-checks every stored query against the live schema offline (opens the selected graph; exits non-zero on any breakage), catching schema drift without restarting the server; `list` prints the selected registry's query names, MCP exposure, and typed params. For per-graph registries, pass `--target ` or set `cli.graph`; with no graph selection, `list` shows only top-level `queries:`. Distinct from `lint`, which validates a single `.gq` file | -| `cluster validate \| plan \| status` | read-only cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` while briefly holding `__cluster/lock.json`; `status` reads the state ledger. No apply, graph open, live drift scan, server change, or `state.json` mutation occurs in Stage 2A | +| `cluster validate \| plan \| status \| refresh \| import` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`; `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations. No apply, graph-resource mutation, server change, or `plan --refresh` occurs in Stage 2B | | `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns; `--json` reports a `skipped` field) | | `cleanup --keep N --older-than 7d --confirm` | destructive version GC | | `embed` | offline JSONL embedding pipeline | @@ -80,16 +80,21 @@ policy: omnigraph cluster validate --config ./company-brain omnigraph cluster plan --config ./company-brain --json omnigraph cluster status --config ./company-brain --json +omnigraph cluster refresh --config ./company-brain --json +omnigraph cluster import --config ./company-brain --json ``` `--config` is a directory containing `cluster.yaml`; it defaults to `.`. -Stage 2A accepts graphs, schemas, stored queries, and policy bundle file +Stage 2B accepts graphs, schemas, stored queries, and policy bundle file references. `cluster plan` reads local JSON state from -`/__cluster/state.json`; a missing file means empty state. Plan -acquires `__cluster/lock.json` by default and releases it before returning. -`cluster status` reads state only and reports any existing lock. External state -backends, apply, refresh/import, pipelines, UI specs, embeddings, aliases, and -bindings are reserved for later stages. See [cluster-config.md](cluster-config.md). +`/__cluster/state.json`; a missing file means empty state. Plan, +refresh, and import acquire `__cluster/lock.json` by default and release it +before returning. `cluster status` reads state only and reports any existing +lock. `refresh` requires an existing `state.json`; `import` creates one only +when it is missing. Both observe declared graphs read-only at +`/graphs/.omni`. External state backends, apply, +`plan --refresh`, pipelines, UI specs, embeddings, aliases, and bindings are +reserved for later stages. See [cluster-config.md](cluster-config.md). ## Output formats (`query` command, alias: `read`) diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 9fdbf55..a464a74 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -1,12 +1,13 @@ # Cluster Config -**Status:** Stage 2A read-only preview. +**Status:** Stage 2B state-observation preview. Cluster config is the future control-plane configuration surface for a whole OmniGraph deployment. In this stage, OmniGraph can validate a local -`cluster.yaml` folder, produce a deterministic read-only plan, and inspect the -local JSON state ledger. It does not apply changes, open graph roots, scan live -cluster state, start servers, or write graph resources. +`cluster.yaml` folder, produce a deterministic read-only plan, inspect the +local JSON state ledger, and explicitly refresh/import graph observations into +that ledger. It does not apply desired changes, start servers, or write graph +resources. ## Commands @@ -14,6 +15,8 @@ cluster state, start servers, or write graph resources. omnigraph cluster validate --config ./company-brain omnigraph cluster plan --config ./company-brain --json omnigraph cluster status --config ./company-brain --json +omnigraph cluster refresh --config ./company-brain --json +omnigraph cluster import --config ./company-brain --json ``` `--config` points at a directory, not a file. The directory must contain @@ -21,7 +24,7 @@ omnigraph cluster status --config ./company-brain --json ## Supported `cluster.yaml` -Stage 2A accepts only the read-only resource subset: +Stage 2B accepts only the read-only resource subset: ```yaml version: 1 @@ -47,10 +50,10 @@ policies: `metadata.name` is a display label. `state.backend` may be omitted or set to `cluster`; external state backends are reserved for a later stage. `state.lock` -defaults to `true`. When enabled, `cluster plan` briefly acquires -`/__cluster/lock.json` while it reads state, then removes it before -returning. `cluster status` never acquires the lock; it only reports whether one -is present. +defaults to `true`. When enabled, `cluster plan`, `cluster refresh`, and +`cluster import` briefly acquire `/__cluster/lock.json`, then remove +it before returning. `cluster status` never acquires the lock; it only reports +whether one is present. ## Validation @@ -113,8 +116,10 @@ Missing `state_revision` is treated as `0`. Resource status values are Plan output compares desired resource digests against state resource digests and reports `create`, `update`, and `delete` changes. It also reports the state CAS (`sha256:`), state revision, and lock id used for the read. The -command never writes `state.json`; apply, refresh, import, and live drift scans -are later-stage work. +command never writes `state.json` and does not scan live graphs. Use explicit +`cluster refresh` / `cluster import` when the state ledger should be updated +from live observations. Apply and live drift scans during plan are later-stage +work. ## Status @@ -122,3 +127,24 @@ are later-stage work. ledger says is deployed. It does not validate referenced schema/query/policy files and does not inspect live graphs. Missing `state.json` succeeds with a warning; invalid state JSON or an unsupported state version fails. + +## Refresh And Import + +`cluster refresh` updates an existing `state.json` from actual observations. +`cluster import` creates the first `state.json` when the ledger is missing. +Both commands open declared graphs read-only at: + +```text +/graphs/.omni +``` + +They observe only branch `main`, recording graph existence, manifest version, +live schema digest, desired schema digest, and schema-match status under +`observations["graph."]`. Missing graph roots are recorded as drift and +remove the graph/schema digests from state so a later `plan` proposes creates. +Invalid graph roots are recorded as errors; `refresh` persists the error +observation and exits non-zero, while `import` exits non-zero without creating +initial state. + +Refresh/import do not observe query or policy resources yet. Existing query and +policy state digests are preserved on refresh and are not invented on import.