diff --git a/Cargo.lock b/Cargo.lock index 2ee6b7d..ebe5565 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4575,6 +4575,8 @@ dependencies = [ "sha2", "tempfile", "thiserror", + "time", + "ulid", ] [[package]] diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 23f1569..4ca4a4a 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -11,7 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ - DiagnosticSeverity, PlanOutput, ValidateOutput, plan_config_dir, validate_config_dir, + DiagnosticSeverity, PlanOutput, StatusOutput, ValidateOutput, plan_config_dir, + status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::schema::parser::parse_schema; @@ -340,6 +341,15 @@ enum ClusterCommand { #[arg(long)] json: bool, }, + /// Read the local JSON state ledger without scanning live graph resources. + Status { + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, } /// Operations on the graph registry of a multi-graph server (MR-668). @@ -745,6 +755,34 @@ fn print_cluster_plan_human(output: &PlanOutput) { print_cluster_diagnostics(&output.diagnostics); } +fn print_cluster_status_human(output: &StatusOutput) { + if output.ok { + let state = &output.state_observations; + if state.state_found { + println!( + "cluster state: revision {}, {} resource(s)", + state.state_revision, state.resource_count + ); + if let Some(digest) = state.applied_config_digest.as_deref() { + println!(" applied config: {digest}"); + } + if state.locked { + match state.lock_id.as_deref() { + Some(lock_id) => println!(" lock: held ({lock_id})"), + None => println!(" lock: held"), + } + } else { + println!(" lock: not held"); + } + } else { + println!("cluster state missing"); + } + } else { + println!("cluster status failed"); + } + print_cluster_diagnostics(&output.diagnostics); +} + fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) { for diagnostic in diagnostics { let label = match diagnostic.severity { @@ -784,6 +822,19 @@ fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> { Ok(()) } +fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> { + if json { + print_json(output)?; + } else { + print_cluster_status_human(output); + } + if !output.ok { + io::stdout().flush()?; + std::process::exit(1); + } + Ok(()) +} + fn is_remote_uri(uri: &str) -> bool { uri.starts_with("http://") || uri.starts_with("https://") } @@ -3217,6 +3268,10 @@ async fn main() -> Result<()> { let output = plan_config_dir(config); finish_cluster_plan(&output, json)?; } + ClusterCommand::Status { config, json } => { + let output = status_config_dir(config); + finish_cluster_status(&output, json)?; + } }, Command::Graphs { command } => match command { GraphsCommand::List { diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 156dd6e..920ceda 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -214,6 +214,168 @@ fn cluster_plan_json_reads_inferred_local_state() { ); } +#[test] +fn cluster_status_json_reports_missing_state() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("status") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true); + assert_eq!(json["state_observations"]["state_found"], false); + assert!( + json["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_missing"), + "missing state should be a warning diagnostic: {json}" + ); +} + +#[test] +fn cluster_status_json_reports_extended_state() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + let state_dir = temp.path().join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#" +{ + "version": 1, + "state_revision": 5, + "applied_revision": { + "config_digest": "applied", + "resources": { + "graph.knowledge": { "digest": "graph-digest" } + } + }, + "resource_statuses": { + "graph.knowledge": { "status": "applied", "conditions": ["healthy"] } + }, + "approval_records": {}, + "recovery_records": {}, + "observations": {} +} +"#, + ) + .unwrap(); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("status") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true); + assert_eq!(json["state_observations"]["state_revision"], 5); + assert!( + json["state_observations"]["state_cas"] + .as_str() + .unwrap() + .starts_with("sha256:") + ); + assert_eq!(json["resource_digests"]["graph.knowledge"], "graph-digest"); + assert_eq!( + json["resource_statuses"]["graph.knowledge"]["status"], + "applied" + ); +} + +#[test] +fn cluster_plan_json_includes_state_cas_revision_and_lock_observation() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + let state_dir = temp.path().join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#" +{ + "version": 1, + "state_revision": 9, + "applied_revision": { + "config_digest": "old", + "resources": { + "graph.knowledge": { "digest": "old-graph" } + } + } +} +"#, + ) + .unwrap(); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("plan") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true); + assert_eq!(json["state_observations"]["state_revision"], 9); + assert!( + json["state_observations"]["state_cas"] + .as_str() + .unwrap() + .starts_with("sha256:") + ); + assert_eq!(json["state_observations"]["locked"], true); + assert!(json["state_observations"]["lock_id"].is_string()); + assert!(!state_dir.join("lock.json").exists()); +} + +#[test] +fn cluster_plan_locked_state_exits_nonzero() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + let state_dir = temp.path().join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("lock.json"), + r#" +{ + "version": 1, + "lock_id": "held-lock", + "operation": "plan", + "created_at": "2026-06-08T00:00:00Z", + "pid": 123 +} +"#, + ) + .unwrap(); + + let output = output_failure( + cli() + .arg("cluster") + .arg("plan") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + ); + let json = parse_stdout_json(&output); + assert_eq!(json["ok"], false); + assert_eq!(json["state_observations"]["locked"], true); + assert!( + json["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_lock_held"), + "locked state should produce a useful diagnostic: {json}" + ); +} + #[test] fn cluster_validate_invalid_config_exits_nonzero() { let temp = tempdir().unwrap(); diff --git a/crates/omnigraph-cluster/Cargo.toml b/crates/omnigraph-cluster/Cargo.toml index 60e7785..d210b1c 100644 --- a/crates/omnigraph-cluster/Cargo.toml +++ b/crates/omnigraph-cluster/Cargo.toml @@ -15,6 +15,8 @@ serde_json = { workspace = true } serde_yaml = { workspace = true } sha2 = { workspace = true } thiserror = { workspace = true } +time = { workspace = true } +ulid = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 861ae22..5115933 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -1,6 +1,8 @@ use std::collections::{BTreeMap, BTreeSet}; -use std::fs; +use std::fs::{self, OpenOptions}; +use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; +use std::process; use omnigraph_compiler::build_catalog; use omnigraph_compiler::query::parser::parse_query; @@ -8,11 +10,16 @@ use omnigraph_compiler::query::typecheck::typecheck_query_decl; use omnigraph_compiler::schema::parser::parse_schema; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; +use time::OffsetDateTime; +use time::format_description::well_known::Rfc3339; +use ulid::Ulid; pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml"; +pub const CLUSTER_STATE_DIR: &str = "__cluster"; pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json"; +pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json"; -#[derive(Debug, Clone, Serialize, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum DiagnosticSeverity { Error, @@ -86,10 +93,39 @@ pub struct DesiredRevision { #[derive(Debug, Clone, Serialize)] pub struct StateObservations { pub state_path: String, + pub lock_path: String, pub state_found: bool, #[serde(skip_serializing_if = "Option::is_none")] pub applied_config_digest: Option, + pub state_revision: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub state_cas: Option, pub resource_count: usize, + pub locked: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub lock_id: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ResourceLifecycleStatus { + Pending, + Planned, + Applying, + Applied, + Drifted, + Blocked, + Error, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct ResourceStatusRecord { + pub status: ResourceLifecycleStatus, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub conditions: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub message: Option, } #[derive(Debug, Clone, Serialize, PartialEq, Eq)] @@ -136,15 +172,39 @@ pub struct PlanOutput { pub diagnostics: Vec, } +#[derive(Debug, Clone, Serialize)] +pub struct StatusOutput { + pub ok: bool, + pub config_dir: String, + pub state_observations: StateObservations, + pub resource_digests: BTreeMap, + pub resource_statuses: BTreeMap, + pub diagnostics: Vec, +} + #[derive(Debug, Clone)] struct DesiredCluster { config_dir: PathBuf, config_digest: String, + state_lock: bool, resource_digests: BTreeMap, resources: Vec, dependencies: Vec, } +#[derive(Debug)] +struct ParsedConfig { + raw: Option, + diagnostics: Vec, + config_dir: PathBuf, + config_file: PathBuf, +} + +#[derive(Debug, Clone, Copy)] +struct ClusterSettings { + state_lock: bool, +} + #[derive(Debug)] struct LoadOutcome { desired: Option, @@ -201,11 +261,22 @@ struct PolicyConfig { applies_to: Vec, } +#[allow(dead_code)] #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] struct ClusterState { version: u32, + #[serde(default)] + state_revision: u64, applied_revision: AppliedRevisionState, + #[serde(default)] + resource_statuses: BTreeMap, + #[serde(default)] + approval_records: BTreeMap, + #[serde(default)] + recovery_records: BTreeMap, + #[serde(default)] + observations: BTreeMap, } #[derive(Debug, Deserialize)] @@ -223,6 +294,33 @@ struct StateResource { digest: String, } +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +struct StateLockFile { + version: u32, + lock_id: String, + operation: String, + created_at: String, + pid: u32, +} + +#[derive(Debug)] +struct LocalStateBackend { + state_dir: PathBuf, + state_path: PathBuf, + lock_path: PathBuf, +} + +#[derive(Debug)] +struct StateSnapshot { + state: Option, +} + +#[derive(Debug)] +struct StateLockGuard { + path: PathBuf, +} + pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { let outcome = load_desired(config_dir.as_ref()); let (resource_digests, resources, dependencies) = match outcome.desired { @@ -249,13 +347,8 @@ pub fn validate_config_dir(config_dir: impl AsRef) -> ValidateOutput { pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { let outcome = load_desired(config_dir.as_ref()); let mut diagnostics = outcome.diagnostics; - let state_path = outcome.config_dir.join(CLUSTER_STATE_FILE); - let mut observations = StateObservations { - state_path: display_path(&state_path), - state_found: false, - applied_config_digest: None, - resource_count: 0, - }; + let backend = LocalStateBackend::new(&outcome.config_dir); + let mut observations = backend.observations(); let Some(desired) = outcome.desired else { return PlanOutput { @@ -274,40 +367,49 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { }; }; - let mut prior_resources = BTreeMap::new(); - if state_path.exists() { - observations.state_found = true; - match fs::read_to_string(&state_path) { - Ok(text) => match serde_json::from_str::(&text) { - Ok(state) if state.version == 1 => { - observations.applied_config_digest = state.applied_revision.config_digest; - observations.resource_count = state.applied_revision.resources.len(); - prior_resources = state - .applied_revision - .resources - .into_iter() - .map(|(address, resource)| (address, resource.digest)) - .collect(); - } - Ok(state) => diagnostics.push(Diagnostic::error( - "unsupported_state_version", - "state.version", - format!( - "unsupported cluster state version {}; this build supports version 1", - state.version - ), - )), - Err(err) => diagnostics.push(Diagnostic::error( - "invalid_state_json", - CLUSTER_STATE_FILE, - format!("could not parse state JSON: {err}"), - )), + if has_errors(&diagnostics) { + return PlanOutput { + ok: false, + config_dir: display_path(&desired.config_dir), + desired_revision: DesiredRevision { + config_digest: Some(desired.config_digest), }, - Err(err) => diagnostics.push(Diagnostic::error( - "state_read_error", - CLUSTER_STATE_FILE, - format!("could not read state file: {err}"), - )), + resource_digests: desired.resource_digests, + dependencies: desired.dependencies, + state_observations: observations, + changes: Vec::new(), + blast_radius: Vec::new(), + approvals_required: Vec::new(), + diagnostics, + }; + } + + let _lock_guard = if desired.state_lock { + match backend.acquire_lock("plan", &mut observations) { + Ok(guard) => Some(guard), + Err(diagnostic) => { + diagnostics.push(diagnostic); + None + } + } + } else { + diagnostics.push(Diagnostic::warning( + "state_lock_disabled", + "state.lock", + "state.lock is false; plan read state without acquiring the cluster state lock", + )); + None + }; + + let mut prior_resources = BTreeMap::new(); + if !has_errors(&diagnostics) { + match backend.read_state(&mut observations) { + Ok(snapshot) => { + if let Some(state) = snapshot.state { + prior_resources = state_resource_digests(&state); + } + } + Err(diagnostic) => diagnostics.push(diagnostic), } } @@ -336,7 +438,48 @@ pub fn plan_config_dir(config_dir: impl AsRef) -> PlanOutput { } } -fn load_desired(config_dir: &Path) -> LoadOutcome { +pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { + let parsed = parse_cluster_config(config_dir.as_ref()); + let mut diagnostics = parsed.diagnostics; + let backend = LocalStateBackend::new(&parsed.config_dir); + let mut observations = backend.observations(); + backend.observe_lock(&mut observations, &mut diagnostics); + + let mut resource_digests = BTreeMap::new(); + let mut resource_statuses = BTreeMap::new(); + + if let Some(raw) = parsed.raw.as_ref() { + let _settings = validate_cluster_header(raw, &mut diagnostics); + if !has_errors(&diagnostics) { + match backend.read_state(&mut observations) { + Ok(snapshot) => { + if let Some(state) = snapshot.state { + resource_digests = state_resource_digests(&state); + resource_statuses = state.resource_statuses; + } else { + diagnostics.push(Diagnostic::warning( + "state_missing", + CLUSTER_STATE_FILE, + "state.json is missing; no applied cluster revision has been recorded", + )); + } + } + Err(diagnostic) => diagnostics.push(diagnostic), + } + } + } + + StatusOutput { + ok: !has_errors(&diagnostics), + config_dir: display_path(&parsed.config_dir), + state_observations: observations, + resource_digests, + resource_statuses, + diagnostics, + } +} + +fn parse_cluster_config(config_dir: &Path) -> ParsedConfig { let config_dir = config_dir.to_path_buf(); let config_file = config_dir.join(CLUSTER_CONFIG_FILE); let mut diagnostics = Vec::new(); @@ -347,8 +490,8 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { display_path(&config_dir), "`--config` must point at a directory containing cluster.yaml", )); - return LoadOutcome { - desired: None, + return ParsedConfig { + raw: None, diagnostics, config_dir, config_file, @@ -363,8 +506,8 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { CLUSTER_CONFIG_FILE, format!("could not read cluster.yaml: {err}"), )); - return LoadOutcome { - desired: None, + return ParsedConfig { + raw: None, diagnostics, config_dir, config_file, @@ -375,8 +518,8 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { diagnostics.extend(duplicate_key_diagnostics(&text)); diagnostics.extend(future_field_diagnostics(&text)); if has_errors(&diagnostics) { - return LoadOutcome { - desired: None, + return ParsedConfig { + raw: None, diagnostics, config_dir, config_file, @@ -384,22 +527,29 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { } let raw = match serde_yaml::from_str::(&text) { - Ok(raw) => raw, + Ok(raw) => Some(raw), Err(err) => { diagnostics.push(Diagnostic::error( "invalid_cluster_yaml", CLUSTER_CONFIG_FILE, format!("could not parse cluster.yaml: {err}"), )); - return LoadOutcome { - desired: None, - diagnostics, - config_dir, - config_file, - }; + None } }; + ParsedConfig { + raw, + diagnostics, + config_dir, + config_file, + } +} + +fn validate_cluster_header( + raw: &RawClusterConfig, + diagnostics: &mut Vec, +) -> ClusterSettings { if raw.version != 1 { diagnostics.push(Diagnostic::error( "unsupported_cluster_config_version", @@ -424,11 +574,242 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { diagnostics.push(Diagnostic::error( "unsupported_state_backend", "state.backend", - "Stage 1 supports only omitted state.backend or `cluster`", + "Stage 2A supports only omitted state.backend or `cluster`", )); } } - let _lock_parsed_for_forward_compat = raw.state.lock; + + ClusterSettings { + state_lock: raw.state.lock.unwrap_or(true), + } +} + +impl LocalStateBackend { + fn new(config_dir: &Path) -> Self { + let state_dir = config_dir.join(CLUSTER_STATE_DIR); + Self { + state_path: config_dir.join(CLUSTER_STATE_FILE), + lock_path: config_dir.join(CLUSTER_LOCK_FILE), + state_dir, + } + } + + fn observations(&self) -> StateObservations { + StateObservations { + state_path: display_path(&self.state_path), + lock_path: display_path(&self.lock_path), + state_found: false, + applied_config_digest: None, + state_revision: 0, + state_cas: None, + resource_count: 0, + locked: false, + lock_id: None, + } + } + + fn read_state( + &self, + observations: &mut StateObservations, + ) -> Result { + let text = match fs::read_to_string(&self.state_path) { + Ok(text) => text, + Err(err) if err.kind() == ErrorKind::NotFound => { + return Ok(StateSnapshot { state: None }); + } + Err(err) => { + return Err(Diagnostic::error( + "state_read_error", + CLUSTER_STATE_FILE, + format!("could not read state file: {err}"), + )); + } + }; + + observations.state_found = true; + observations.state_cas = Some(format!("sha256:{}", sha256_hex(text.as_bytes()))); + + let state = serde_json::from_str::(&text).map_err(|err| { + Diagnostic::error( + "invalid_state_json", + CLUSTER_STATE_FILE, + format!("could not parse state JSON: {err}"), + ) + })?; + + if state.version != 1 { + return Err(Diagnostic::error( + "unsupported_state_version", + "state.version", + format!( + "unsupported cluster state version {}; this build supports version 1", + state.version + ), + )); + } + + observations.applied_config_digest = state.applied_revision.config_digest.clone(); + observations.state_revision = state.state_revision; + observations.resource_count = state.applied_revision.resources.len(); + + Ok(StateSnapshot { state: Some(state) }) + } + + fn acquire_lock( + &self, + operation: &str, + observations: &mut StateObservations, + ) -> Result { + fs::create_dir_all(&self.state_dir).map_err(|err| { + Diagnostic::error( + "state_lock_error", + CLUSTER_STATE_DIR, + format!("could not create cluster state directory: {err}"), + ) + })?; + + let lock_id = Ulid::new().to_string(); + let lock = StateLockFile { + version: 1, + lock_id: lock_id.clone(), + operation: operation.to_string(), + created_at: OffsetDateTime::now_utc() + .format(&Rfc3339) + .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()), + pid: process::id(), + }; + let payload = serde_json::to_string_pretty(&lock).map_err(|err| { + Diagnostic::error( + "state_lock_error", + CLUSTER_LOCK_FILE, + format!("could not encode state lock: {err}"), + ) + })?; + + match OpenOptions::new() + .write(true) + .create_new(true) + .open(&self.lock_path) + { + Ok(mut file) => { + file.write_all(payload.as_bytes()).map_err(|err| { + Diagnostic::error( + "state_lock_error", + CLUSTER_LOCK_FILE, + format!("could not write state lock: {err}"), + ) + })?; + observations.locked = true; + observations.lock_id = Some(lock_id.clone()); + Ok(StateLockGuard { + path: self.lock_path.clone(), + }) + } + Err(err) if err.kind() == ErrorKind::AlreadyExists => { + self.observe_lock_id(observations); + Err(Diagnostic::error( + "state_lock_held", + CLUSTER_LOCK_FILE, + "cluster state lock already exists; remove it only after confirming no cluster operation is active", + )) + } + Err(err) => Err(Diagnostic::error( + "state_lock_error", + CLUSTER_LOCK_FILE, + format!("could not acquire state lock: {err}"), + )), + } + } + + fn observe_lock( + &self, + observations: &mut StateObservations, + diagnostics: &mut Vec, + ) { + if self.lock_path.exists() { + observations.locked = true; + match fs::read_to_string(&self.lock_path) { + Ok(text) => match serde_json::from_str::(&text) { + Ok(lock) if lock.version == 1 => { + observations.lock_id = Some(lock.lock_id); + } + Ok(lock) => diagnostics.push(Diagnostic::warning( + "unsupported_state_lock_version", + CLUSTER_LOCK_FILE, + format!("unsupported cluster state lock version {}", lock.version), + )), + Err(err) => diagnostics.push(Diagnostic::warning( + "invalid_state_lock", + CLUSTER_LOCK_FILE, + format!("could not parse state lock: {err}"), + )), + }, + Err(err) => diagnostics.push(Diagnostic::warning( + "state_lock_read_error", + CLUSTER_LOCK_FILE, + format!("could not read state lock: {err}"), + )), + } + } + } + + fn observe_lock_id(&self, observations: &mut StateObservations) { + observations.locked = true; + if let Ok(text) = fs::read_to_string(&self.lock_path) { + if let Ok(lock) = serde_json::from_str::(&text) { + if lock.version == 1 { + observations.lock_id = Some(lock.lock_id); + } + } + } + } +} + +impl Drop for StateLockGuard { + fn drop(&mut self) { + let _ = fs::remove_file(&self.path); + } +} + +fn state_resource_digests(state: &ClusterState) -> BTreeMap { + state + .applied_revision + .resources + .iter() + .map(|(address, resource)| (address.clone(), resource.digest.clone())) + .collect() +} + +fn load_desired(config_dir: &Path) -> LoadOutcome { + let parsed = parse_cluster_config(config_dir); + let config_dir = parsed.config_dir; + let config_file = parsed.config_file; + let mut diagnostics = parsed.diagnostics; + let Some(raw) = parsed.raw else { + return LoadOutcome { + desired: None, + diagnostics, + config_dir, + config_file, + }; + }; + let settings = validate_cluster_header(&raw, &mut diagnostics); + let config_text = match fs::read_to_string(&config_file) { + Ok(text) => text, + Err(err) => { + diagnostics.push(Diagnostic::error( + "cluster_config_read_error", + CLUSTER_CONFIG_FILE, + format!("could not re-read cluster.yaml: {err}"), + )); + return LoadOutcome { + desired: None, + diagnostics, + config_dir, + config_file, + }; + } + }; let mut resources = BTreeMap::new(); let mut dependencies = BTreeSet::new(); @@ -645,12 +1026,13 @@ fn load_desired(config_dir: &Path) -> LoadOutcome { resource_list.push(resource); } let dependencies: Vec<_> = dependencies.into_iter().collect(); - let config_digest = desired_config_digest(&text, &resource_digests); + let config_digest = desired_config_digest(&config_text, &resource_digests); LoadOutcome { desired: Some(DesiredCluster { config_dir: config_dir.clone(), config_digest, + state_lock: settings.state_lock, resource_digests, resources: resource_list, dependencies, @@ -1217,6 +1599,7 @@ graphs: .all(|c| c.operation == PlanOperation::Create) ); assert!(out.changes.iter().any(|c| c.resource == "graph.knowledge")); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); } #[test] @@ -1260,6 +1643,202 @@ graphs: ); } + #[test] + fn old_minimal_state_json_still_plans_with_default_revision() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("state.json"), + r#"{ + "version": 1, + "applied_revision": { + "config_digest": "old", + "resources": { + "graph.knowledge": { "digest": "old-graph" } + } + } +}"#, + ) + .unwrap(); + + let out = plan_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(out.state_observations.state_revision, 0); + assert!(out.state_observations.state_cas.is_some()); + assert!(out.changes.iter().any(|change| { + change.resource == "graph.knowledge" && change.operation == PlanOperation::Update + })); + } + + #[test] + fn extended_state_json_status_surfaces_statuses() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + let state = r#"{ + "version": 1, + "state_revision": 42, + "applied_revision": { + "config_digest": "applied-config", + "resources": { + "graph.knowledge": { "digest": "graph-digest" } + } + }, + "resource_statuses": { + "graph.knowledge": { + "status": "applied", + "conditions": ["healthy"], + "message": "ready" + } + }, + "approval_records": {}, + "recovery_records": {}, + "observations": { + "graph.knowledge": { "manifest_version": 12 } + } +}"#; + fs::write(state_dir.join("state.json"), state).unwrap(); + + let out = status_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.state_observations.state_found); + assert_eq!(out.state_observations.state_revision, 42); + assert_eq!( + out.state_observations.state_cas.as_deref(), + Some(format!("sha256:{}", sha256_hex(state.as_bytes())).as_str()) + ); + assert_eq!( + out.resource_digests + .get("graph.knowledge") + .map(String::as_str), + Some("graph-digest") + ); + assert_eq!( + out.resource_statuses["graph.knowledge"].status, + ResourceLifecycleStatus::Applied + ); + } + + #[test] + fn missing_state_status_succeeds_with_warning() { + let dir = fixture(); + let out = status_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!out.state_observations.state_found); + assert_eq!(out.state_observations.state_revision, 0); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_missing") + ); + } + + #[test] + fn invalid_state_status_fails() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write(state_dir.join("state.json"), "{").unwrap(); + + let out = status_config_dir(dir.path()); + assert!(!out.ok); + assert!(out.state_observations.state_found); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "invalid_state_json") + ); + } + + #[test] + fn plan_reports_state_cas_revision_and_removes_lock() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + let state = r#"{ + "version": 1, + "state_revision": 7, + "applied_revision": { + "config_digest": "old", + "resources": { + "graph.knowledge": { "digest": "old-graph" } + } + } +}"#; + fs::write(state_dir.join("state.json"), state).unwrap(); + + let out = plan_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert_eq!(out.state_observations.state_revision, 7); + assert_eq!( + out.state_observations.state_cas.as_deref(), + Some(format!("sha256:{}", sha256_hex(state.as_bytes())).as_str()) + ); + assert!(out.state_observations.locked); + assert!(out.state_observations.lock_id.is_some()); + assert!( + !dir.path().join(CLUSTER_LOCK_FILE).exists(), + "plan must release lock before returning" + ); + } + + #[test] + fn existing_lock_makes_plan_fail() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("lock.json"), + r#"{ + "version": 1, + "lock_id": "held-lock", + "operation": "plan", + "created_at": "2026-06-08T00:00:00Z", + "pid": 123 +}"#, + ) + .unwrap(); + + let out = plan_config_dir(dir.path()); + assert!(!out.ok); + assert!(out.state_observations.locked); + assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_held") + ); + } + + #[test] + fn state_lock_false_bypasses_lock_with_warning() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +state: + backend: cluster + lock: false +graphs: + knowledge: + schema: ./people.pg +"#, + ) + .unwrap(); + + let out = plan_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(!out.state_observations.locked); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_disabled") + ); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + #[test] fn external_state_backend_rejected() { let dir = fixture(); @@ -1272,4 +1851,21 @@ graphs: assert!(!out.ok); assert_eq!(out.diagnostics[0].code, "unsupported_state_backend"); } + + #[test] + fn external_state_backend_plan_rejected() { + let dir = fixture(); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + "version: 1\nstate:\n backend: s3://bucket/state\ngraphs: {}\n", + ) + .unwrap(); + let out = plan_config_dir(dir.path()); + assert!(!out.ok); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "unsupported_state_backend") + ); + } } diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 0b5a234..1035d84 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, read-only validate/plan | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling, read-only validate/plan/status | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index 2f27322..92ad303 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -21,7 +21,7 @@ A reference for the `omnigraph` binary's command surface and `omnigraph.yaml` sc | `schema plan \| apply \| show (alias: get)` | migrations | | `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` | | `queries validate \| list` | operate on the server-side stored-query registry (the `queries:` block). `validate` type-checks every stored query against the live schema offline (opens the selected graph; exits non-zero on any breakage), catching schema drift without restarting the server; `list` prints the selected registry's query names, MCP exposure, and typed params. For per-graph registries, pass `--target ` or set `cli.graph`; with no graph selection, `list` shows only top-level `queries:`. Distinct from `lint`, which validates a single `.gq` file | -| `cluster validate \| plan` | read-only cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`. No apply, lock, graph open, server change, or state write occurs in Stage 1 | +| `cluster validate \| plan \| status` | read-only cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json` while briefly holding `__cluster/lock.json`; `status` reads the state ledger. No apply, graph open, live drift scan, server change, or `state.json` mutation occurs in Stage 2A | | `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns; `--json` reports a `skipped` field) | | `cleanup --keep N --older-than 7d --confirm` | destructive version GC | | `embed` | offline JSONL embedding pipeline | @@ -79,13 +79,16 @@ policy: ```bash omnigraph cluster validate --config ./company-brain omnigraph cluster plan --config ./company-brain --json +omnigraph cluster status --config ./company-brain --json ``` `--config` is a directory containing `cluster.yaml`; it defaults to `.`. -Stage 1 accepts graphs, schemas, stored queries, and policy bundle file +Stage 2A accepts graphs, schemas, stored queries, and policy bundle file references. `cluster plan` reads local JSON state from -`/__cluster/state.json`; a missing file means empty state. External -state backends, apply, locks, pipelines, UI specs, embeddings, aliases, and +`/__cluster/state.json`; a missing file means empty state. Plan +acquires `__cluster/lock.json` by default and releases it before returning. +`cluster status` reads state only and reports any existing lock. External state +backends, apply, refresh/import, pipelines, UI specs, embeddings, aliases, and bindings are reserved for later stages. See [cluster-config.md](cluster-config.md). ## Output formats (`query` command, alias: `read`) diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 29d9c32..9fdbf55 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -1,17 +1,19 @@ # Cluster Config -**Status:** Stage 1 read-only preview. +**Status:** Stage 2A read-only preview. Cluster config is the future control-plane configuration surface for a whole OmniGraph deployment. In this stage, OmniGraph can validate a local -`cluster.yaml` folder and produce a deterministic read-only plan. It does not -apply changes, acquire locks, open graph roots, start servers, or write state. +`cluster.yaml` folder, produce a deterministic read-only plan, and inspect the +local JSON state ledger. It does not apply changes, open graph roots, scan live +cluster state, start servers, or write graph resources. ## Commands ```bash omnigraph cluster validate --config ./company-brain omnigraph cluster plan --config ./company-brain --json +omnigraph cluster status --config ./company-brain --json ``` `--config` points at a directory, not a file. The directory must contain @@ -19,7 +21,7 @@ omnigraph cluster plan --config ./company-brain --json ## Supported `cluster.yaml` -Stage 1 accepts only the read-only resource subset: +Stage 2A accepts only the read-only resource subset: ```yaml version: 1 @@ -43,10 +45,12 @@ policies: applies_to: [knowledge] ``` -`metadata.name` is a display label. `state.lock` is parsed for forward -compatibility, but no lock is acquired in this read-only stage. `state.backend` -may be omitted or set to `cluster`; external state backends are reserved for a -later stage. +`metadata.name` is a display label. `state.backend` may be omitted or set to +`cluster`; external state backends are reserved for a later stage. `state.lock` +defaults to `true`. When enabled, `cluster plan` briefly acquires +`/__cluster/lock.json` while it reads state, then removes it before +returning. `cluster status` never acquires the lock; it only reports whether one +is present. ## Validation @@ -78,6 +82,7 @@ resource is planned as a create. If present, the file must use this shape: ```json { "version": 1, + "state_revision": 0, "applied_revision": { "config_digest": "...", "resources": { @@ -86,10 +91,34 @@ resource is planned as a create. If present, the file must use this shape: "query.knowledge.find_experts": { "digest": "..." }, "policy.base": { "digest": "..." } } - } + }, + "resource_statuses": { + "graph.knowledge": { + "status": "applied", + "conditions": [], + "message": "optional status detail" + } + }, + "approval_records": {}, + "recovery_records": {}, + "observations": {} } ``` +`state_revision`, `resource_statuses`, `approval_records`, `recovery_records`, +and `observations` are optional so older Stage 1 state fixtures keep working. +Missing `state_revision` is treated as `0`. Resource status values are +`pending`, `planned`, `applying`, `applied`, `drifted`, `blocked`, or `error`. + Plan output compares desired resource digests against state resource digests -and reports `create`, `update`, and `delete` changes. The command never writes -`state.json`; apply and locking are later-stage work. +and reports `create`, `update`, and `delete` changes. It also reports the state +CAS (`sha256:`), state revision, and lock id used for the read. The +command never writes `state.json`; apply, refresh, import, and live drift scans +are later-stage work. + +## Status + +`cluster status` reads the same local JSON state ledger and prints what the +ledger says is deployed. It does not validate referenced schema/query/policy +files and does not inspect live graphs. Missing `state.json` succeeds with a +warning; invalid state JSON or an unsupported state version fails.