From 89b876c797277a7d86b8bc6375be0b379d379597 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Tue, 9 Jun 2026 02:12:00 +0300 Subject: [PATCH] Add cluster state lock recovery --- crates/omnigraph-cli/src/main.rs | 89 ++++++- crates/omnigraph-cli/tests/cli.rs | 155 +++++++++++-- crates/omnigraph-cluster/src/lib.rs | 346 +++++++++++++++++++++++++++- docs/dev/testing.md | 2 +- docs/user/cli-reference.md | 15 +- docs/user/cluster-config.md | 41 +++- 6 files changed, 597 insertions(+), 51 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 9c16722..971ffff 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -11,8 +11,9 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ - DiagnosticSeverity, PlanOutput, StateSyncOutput, StatusOutput, ValidateOutput, - import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir, + DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, + ValidateOutput, force_unlock_config_dir, import_config_dir, plan_config_dir, + refresh_config_dir, status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::schema::parser::parse_schema; @@ -387,6 +388,17 @@ enum ClusterCommand { #[arg(long)] json: bool, }, + /// Remove a held local JSON state lock after operator confirmation. + ForceUnlock { + /// Exact lock id from cluster status or a state_lock_held diagnostic. + lock_id: String, + /// Cluster config directory containing cluster.yaml. + #[arg(long, default_value = ".")] + config: PathBuf, + /// Emit JSON instead of human text. + #[arg(long)] + json: bool, + }, } /// Operations on the graph registry of a multi-graph server (MR-668). @@ -804,10 +816,7 @@ fn print_cluster_status_human(output: &StatusOutput) { println!(" applied config: {digest}"); } if state.locked { - match state.lock_id.as_deref() { - Some(lock_id) => println!(" lock: held ({lock_id})"), - None => println!(" lock: held"), - } + println!(" lock: held{}", cluster_lock_summary(state)); } else { println!(" lock: not held"); } @@ -835,10 +844,7 @@ fn print_cluster_state_sync_human(output: &StateSyncOutput) { println!(" state_cas: {cas}"); } if state.locked { - match state.lock_id.as_deref() { - Some(lock_id) => println!(" lock: acquired ({lock_id})"), - None => println!(" lock: acquired"), - } + println!(" lock: acquired{}", cluster_lock_summary(state)); } else { println!(" lock: not acquired"); } @@ -848,6 +854,48 @@ fn print_cluster_state_sync_human(output: &StateSyncOutput) { print_cluster_diagnostics(&output.diagnostics); } +fn print_cluster_force_unlock_human(output: &ForceUnlockOutput) { + if output.ok { + if output.lock_removed { + println!( + "cluster force-unlock: removed lock{}", + cluster_lock_summary(&output.state_observations) + ); + } else { + println!("cluster force-unlock: no lock removed"); + } + } else { + println!("cluster force-unlock failed"); + if output.state_observations.locked { + println!( + " lock: held{}", + cluster_lock_summary(&output.state_observations) + ); + } + } + print_cluster_diagnostics(&output.diagnostics); +} + +fn cluster_lock_summary(state: &omnigraph_cluster::StateObservations) -> String { + let Some(lock_id) = state.lock_id.as_deref() else { + return String::new(); + }; + let mut parts = vec![format!("id={lock_id}")]; + if let Some(operation) = state.lock_operation.as_deref() { + parts.push(format!("operation={operation}")); + } + if let Some(pid) = state.lock_pid { + parts.push(format!("pid={pid}")); + } + if let Some(created_at) = state.lock_created_at.as_deref() { + parts.push(format!("created_at={created_at}")); + } + if let Some(age_seconds) = state.lock_age_seconds { + parts.push(format!("age_seconds={age_seconds}")); + } + format!(" ({})", parts.join(", ")) +} + fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) { for diagnostic in diagnostics { let label = match diagnostic.severity { @@ -913,6 +961,19 @@ fn finish_cluster_state_sync(output: &StateSyncOutput, json: bool) -> Result<()> Ok(()) } +fn finish_cluster_force_unlock(output: &ForceUnlockOutput, json: bool) -> Result<()> { + if json { + print_json(output)?; + } else { + print_cluster_force_unlock_human(output); + } + if !output.ok { + io::stdout().flush()?; + std::process::exit(1); + } + Ok(()) +} + fn is_remote_uri(uri: &str) -> bool { uri.starts_with("http://") || uri.starts_with("https://") } @@ -3443,6 +3504,14 @@ async fn main() -> Result<()> { let output = import_config_dir(config).await; finish_cluster_state_sync(&output, json)?; } + ClusterCommand::ForceUnlock { + lock_id, + config, + json, + } => { + let output = force_unlock_config_dir(config, lock_id); + finish_cluster_force_unlock(&output, json)?; + } }, Command::Graphs { command } => match command { GraphsCommand::List { diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index 504f0ef..1dd26a7 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -156,6 +156,18 @@ fn init_cluster_derived_graph(root: &std::path::Path) { ); } +fn write_cluster_lock(root: &std::path::Path, lock_id: &str, operation: &str) { + let state_dir = root.join("__cluster"); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("lock.json"), + format!( + r#"{{"version":1,"lock_id":"{lock_id}","operation":"{operation}","created_at":"1970-01-01T00:00:00Z","pid":123}}"# + ), + ) + .unwrap(); +} + #[test] fn version_command_prints_current_cli_version() { let output = output_success(cli().arg("version")); @@ -271,6 +283,32 @@ fn cluster_status_json_reports_missing_state() { ); } +#[test] +fn cluster_status_json_reports_lock_metadata() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + write_cluster_lock(temp.path(), "held-lock", "refresh"); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("status") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true); + assert_eq!(json["state_observations"]["locked"], true); + assert_eq!(json["state_observations"]["lock_id"], "held-lock"); + assert_eq!(json["state_observations"]["lock_operation"], "refresh"); + assert_eq!(json["state_observations"]["lock_pid"], 123); + assert_eq!( + json["state_observations"]["lock_created_at"], + "1970-01-01T00:00:00Z" + ); + assert!(json["state_observations"]["lock_age_seconds"].is_number()); +} + #[test] fn cluster_status_json_reports_extended_state() { let temp = tempdir().unwrap(); @@ -372,21 +410,7 @@ fn cluster_plan_json_includes_state_cas_revision_and_lock_observation() { fn cluster_plan_locked_state_exits_nonzero() { let temp = tempdir().unwrap(); write_cluster_config_fixture(temp.path()); - let state_dir = temp.path().join("__cluster"); - fs::create_dir_all(&state_dir).unwrap(); - fs::write( - state_dir.join("lock.json"), - r#" -{ - "version": 1, - "lock_id": "held-lock", - "operation": "plan", - "created_at": "2026-06-08T00:00:00Z", - "pid": 123 -} -"#, - ) - .unwrap(); + write_cluster_lock(temp.path(), "held-lock", "plan"); let output = output_failure( cli() @@ -401,16 +425,115 @@ fn cluster_plan_locked_state_exits_nonzero() { assert_eq!(json["state_observations"]["locked"], true); assert_eq!(json["state_observations"]["lock_acquired"], false); assert_eq!(json["state_observations"]["lock_id"], "held-lock"); + assert_eq!(json["state_observations"]["lock_operation"], "plan"); + assert_eq!(json["state_observations"]["lock_pid"], 123); + assert_eq!( + json["state_observations"]["lock_created_at"], + "1970-01-01T00:00:00Z" + ); + assert!(json["state_observations"]["lock_age_seconds"].is_number()); assert!( json["diagnostics"] .as_array() .unwrap() .iter() - .any(|diagnostic| diagnostic["code"] == "state_lock_held"), + .any(|diagnostic| diagnostic["code"] == "state_lock_held" + && diagnostic["message"] + .as_str() + .unwrap() + .contains("force-unlock held-lock")), "locked state should produce a useful diagnostic: {json}" ); } +#[test] +fn cluster_force_unlock_json_removes_lock() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + write_cluster_lock(temp.path(), "held-lock", "plan"); + + let json = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("force-unlock") + .arg("held-lock") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], true); + assert_eq!(json["lock_removed"], true); + assert_eq!(json["state_observations"]["lock_id"], "held-lock"); + assert_eq!(json["state_observations"]["lock_operation"], "plan"); + assert!(!temp.path().join("__cluster/lock.json").exists()); +} + +#[test] +fn cluster_force_unlock_wrong_id_exits_nonzero() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + write_cluster_lock(temp.path(), "held-lock", "plan"); + + let json = parse_stdout_json(&output_failure( + cli() + .arg("cluster") + .arg("force-unlock") + .arg("other-lock") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(json["ok"], false); + assert_eq!(json["lock_removed"], false); + assert!( + json["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| diagnostic["code"] == "state_lock_id_mismatch") + ); + assert!(temp.path().join("__cluster/lock.json").exists()); +} + +#[test] +fn cluster_locked_plan_then_force_unlock_then_plan_succeeds() { + let temp = tempdir().unwrap(); + write_cluster_config_fixture(temp.path()); + write_cluster_lock(temp.path(), "held-lock", "plan"); + + let locked = parse_stdout_json(&output_failure( + cli() + .arg("cluster") + .arg("plan") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(locked["ok"], false); + assert_eq!(locked["state_observations"]["lock_id"], "held-lock"); + + let unlocked = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("force-unlock") + .arg("held-lock") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(unlocked["lock_removed"], true); + + let planned = parse_stdout_json(&output_success( + cli() + .arg("cluster") + .arg("plan") + .arg("--config") + .arg(temp.path()) + .arg("--json"), + )); + assert_eq!(planned["ok"], true); +} + #[test] fn cluster_import_json_bootstraps_missing_state() { let temp = tempdir().unwrap(); diff --git a/crates/omnigraph-cluster/src/lib.rs b/crates/omnigraph-cluster/src/lib.rs index 9a6ea78..7ae824c 100644 --- a/crates/omnigraph-cluster/src/lib.rs +++ b/crates/omnigraph-cluster/src/lib.rs @@ -110,6 +110,25 @@ pub struct StateObservations { pub lock_acquired: bool, #[serde(skip_serializing_if = "Option::is_none")] pub acquired_lock_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub lock_operation: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub lock_created_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub lock_pid: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub lock_age_seconds: Option, +} + +impl StateObservations { + fn observe_lock_metadata(&mut self, lock: &StateLockFile) { + self.locked = true; + self.lock_id = Some(lock.lock_id.clone()); + self.lock_operation = Some(lock.operation.clone()); + self.lock_created_at = Some(lock.created_at.clone()); + self.lock_pid = Some(lock.pid); + self.lock_age_seconds = lock_age_seconds(&lock.created_at); + } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -208,6 +227,15 @@ pub struct StateSyncOutput { pub diagnostics: Vec, } +#[derive(Debug, Clone, Serialize)] +pub struct ForceUnlockOutput { + pub ok: bool, + pub config_dir: String, + pub state_observations: StateObservations, + pub lock_removed: bool, + pub diagnostics: Vec, +} + #[derive(Debug, Clone)] struct DesiredCluster { config_dir: PathBuf, @@ -518,6 +546,35 @@ pub fn status_config_dir(config_dir: impl AsRef) -> StatusOutput { } } +pub fn force_unlock_config_dir( + config_dir: impl AsRef, + lock_id: impl AsRef, +) -> ForceUnlockOutput { + let parsed = parse_cluster_config(config_dir.as_ref()); + let mut diagnostics = parsed.diagnostics; + let backend = LocalStateBackend::new(&parsed.config_dir); + let mut observations = backend.observations(); + let mut lock_removed = false; + + if let Some(raw) = parsed.raw.as_ref() { + let _settings = validate_cluster_header(raw, &mut diagnostics); + if !has_errors(&diagnostics) { + match backend.force_unlock(lock_id.as_ref(), &mut observations) { + Ok(()) => lock_removed = true, + Err(diagnostic) => diagnostics.push(diagnostic), + } + } + } + + ForceUnlockOutput { + ok: !has_errors(&diagnostics), + config_dir: display_path(&parsed.config_dir), + state_observations: observations, + lock_removed, + diagnostics, + } +} + pub async fn refresh_config_dir(config_dir: impl AsRef) -> StateSyncOutput { sync_config_dir(config_dir.as_ref(), StateSyncOperation::Refresh).await } @@ -791,7 +848,7 @@ fn validate_cluster_header( diagnostics.push(Diagnostic::error( "unsupported_state_backend", "state.backend", - "Stage 2B supports only omitted state.backend or `cluster`", + "Stage 2C supports only omitted state.backend or `cluster`", )); } } @@ -824,6 +881,10 @@ impl LocalStateBackend { lock_id: None, lock_acquired: false, acquired_lock_id: None, + lock_operation: None, + lock_created_at: None, + lock_pid: None, + lock_age_seconds: None, } } @@ -1035,11 +1096,11 @@ impl LocalStateBackend { }) } Err(err) if err.kind() == ErrorKind::AlreadyExists => { - self.observe_lock_id(observations); + self.observe_lock_metadata_lossy(observations); Err(Diagnostic::error( "state_lock_held", CLUSTER_LOCK_FILE, - "cluster state lock already exists; remove it only after confirming no cluster operation is active", + state_lock_held_message(observations), )) } Err(err) => Err(Diagnostic::error( @@ -1050,6 +1111,52 @@ impl LocalStateBackend { } } + fn force_unlock( + &self, + requested_lock_id: &str, + observations: &mut StateObservations, + ) -> Result<(), Diagnostic> { + let text = match fs::read_to_string(&self.lock_path) { + Ok(text) => text, + Err(err) if err.kind() == ErrorKind::NotFound => { + return Err(Diagnostic::error( + "state_lock_missing", + CLUSTER_LOCK_FILE, + "cluster state lock is not present; nothing was unlocked", + )); + } + Err(err) => { + return Err(Diagnostic::error( + "state_lock_read_error", + CLUSTER_LOCK_FILE, + format!("could not read state lock: {err}"), + )); + } + }; + observations.locked = true; + let lock = parse_lock_file_for_unlock(&text)?; + observations.observe_lock_metadata(&lock); + + if lock.lock_id != requested_lock_id { + return Err(Diagnostic::error( + "state_lock_id_mismatch", + CLUSTER_LOCK_FILE, + format!( + "cluster state lock id is {}; refusing to unlock with requested id {requested_lock_id}", + lock.lock_id + ), + )); + } + + fs::remove_file(&self.lock_path).map_err(|err| { + Diagnostic::error( + "state_unlock_error", + CLUSTER_LOCK_FILE, + format!("could not remove state lock: {err}"), + ) + }) + } + fn observe_lock( &self, observations: &mut StateObservations, @@ -1060,7 +1167,7 @@ impl LocalStateBackend { match fs::read_to_string(&self.lock_path) { Ok(text) => match serde_json::from_str::(&text) { Ok(lock) if lock.version == 1 => { - observations.lock_id = Some(lock.lock_id); + observations.observe_lock_metadata(&lock); } Ok(lock) => diagnostics.push(Diagnostic::warning( "unsupported_state_lock_version", @@ -1082,12 +1189,12 @@ impl LocalStateBackend { } } - fn observe_lock_id(&self, observations: &mut StateObservations) { + fn observe_lock_metadata_lossy(&self, observations: &mut StateObservations) { observations.locked = true; if let Ok(text) = fs::read_to_string(&self.lock_path) { if let Ok(lock) = serde_json::from_str::(&text) { if lock.version == 1 { - observations.lock_id = Some(lock.lock_id); + observations.observe_lock_metadata(&lock); } } } @@ -1100,6 +1207,33 @@ impl Drop for StateLockGuard { } } +fn parse_lock_file_for_unlock(text: &str) -> Result { + let lock = serde_json::from_str::(text).map_err(|err| { + Diagnostic::error( + "invalid_state_lock", + CLUSTER_LOCK_FILE, + format!("could not parse state lock: {err}"), + ) + })?; + if lock.version != 1 { + return Err(Diagnostic::error( + "unsupported_state_lock_version", + CLUSTER_LOCK_FILE, + format!("unsupported cluster state lock version {}", lock.version), + )); + } + Ok(lock) +} + +fn state_lock_held_message(observations: &StateObservations) -> String { + match observations.lock_id.as_deref() { + Some(lock_id) => format!( + "cluster state lock already exists (lock id {lock_id}); run `omnigraph cluster force-unlock {lock_id}` only after confirming no cluster operation is active" + ), + None => "cluster state lock already exists; remove it only after confirming no cluster operation is active".to_string(), + } +} + fn state_resource_digests(state: &ClusterState) -> BTreeMap { state .applied_revision @@ -1953,6 +2087,15 @@ fn now_rfc3339() -> String { .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()) } +fn lock_age_seconds(created_at: &str) -> Option { + let created_at = OffsetDateTime::parse(created_at, &Rfc3339).ok()?; + Some( + (OffsetDateTime::now_utc() - created_at) + .whole_seconds() + .max(0) as u64, + ) +} + fn state_sync_operation_label(operation: StateSyncOperation) -> &'static str { match operation { StateSyncOperation::Refresh => "refresh", @@ -2034,6 +2177,23 @@ policies: .unwrap(); } + fn write_lock_file(config_dir: &Path, lock_id: &str, operation: &str) { + let state_dir = config_dir.join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("lock.json"), + json!({ + "version": 1, + "lock_id": lock_id, + "operation": operation, + "created_at": "1970-01-01T00:00:00Z", + "pid": 123 + }) + .to_string(), + ) + .unwrap(); + } + #[test] fn valid_minimal_config() { let dir = fixture(); @@ -2383,6 +2543,164 @@ policies: ); } + #[test] + fn status_surfaces_full_lock_metadata() { + let dir = fixture(); + write_lock_file(dir.path(), "held-lock", "refresh"); + + let out = status_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.state_observations.locked); + assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); + assert_eq!( + out.state_observations.lock_operation.as_deref(), + Some("refresh") + ); + assert_eq!( + out.state_observations.lock_created_at.as_deref(), + Some("1970-01-01T00:00:00Z") + ); + assert_eq!(out.state_observations.lock_pid, Some(123)); + assert!(out.state_observations.lock_age_seconds.is_some()); + } + + #[test] + fn force_unlock_matching_id_removes_lock() { + let dir = fixture(); + write_lock_file(dir.path(), "held-lock", "plan"); + + let out = force_unlock_config_dir(dir.path(), "held-lock"); + assert!(out.ok, "{:?}", out.diagnostics); + assert!(out.lock_removed); + assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); + assert_eq!( + out.state_observations.lock_operation.as_deref(), + Some("plan") + ); + assert!(!dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[test] + fn force_unlock_wrong_id_fails_and_preserves_lock() { + let dir = fixture(); + write_lock_file(dir.path(), "held-lock", "plan"); + + let out = force_unlock_config_dir(dir.path(), "other-lock"); + assert!(!out.ok); + assert!(!out.lock_removed); + assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_id_mismatch") + ); + assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[test] + fn force_unlock_missing_lock_fails() { + let dir = fixture(); + + let out = force_unlock_config_dir(dir.path(), "held-lock"); + assert!(!out.ok); + assert!(!out.lock_removed); + assert!(!out.state_observations.locked); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_missing") + ); + } + + #[test] + fn force_unlock_invalid_lock_json_fails_and_preserves_lock() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write(state_dir.join("lock.json"), "{").unwrap(); + + let out = force_unlock_config_dir(dir.path(), "held-lock"); + assert!(!out.ok); + assert!(!out.lock_removed); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "invalid_state_lock") + ); + assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[test] + fn force_unlock_unsupported_lock_version_fails_and_preserves_lock() { + let dir = fixture(); + let state_dir = dir.path().join(CLUSTER_STATE_DIR); + fs::create_dir_all(&state_dir).unwrap(); + fs::write( + state_dir.join("lock.json"), + r#"{"version":2,"lock_id":"held-lock","operation":"plan","created_at":"1970-01-01T00:00:00Z","pid":123}"#, + ) + .unwrap(); + + let out = force_unlock_config_dir(dir.path(), "held-lock"); + assert!(!out.ok); + assert!(!out.lock_removed); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "unsupported_state_lock_version") + ); + assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[test] + fn force_unlock_external_state_backend_rejected() { + let dir = fixture(); + write_lock_file(dir.path(), "held-lock", "plan"); + fs::write( + dir.path().join(CLUSTER_CONFIG_FILE), + r#" +version: 1 +state: + backend: s3://state-bucket/cluster +graphs: + knowledge: + schema: ./people.pg +"#, + ) + .unwrap(); + + let out = force_unlock_config_dir(dir.path(), "held-lock"); + assert!(!out.ok); + assert!(!out.lock_removed); + assert!( + out.diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "unsupported_state_backend") + ); + assert!(dir.path().join(CLUSTER_LOCK_FILE).exists()); + } + + #[test] + fn plan_succeeds_after_force_unlock() { + let dir = fixture(); + write_lock_file(dir.path(), "held-lock", "plan"); + + let locked = plan_config_dir(dir.path()); + assert!(!locked.ok); + assert!( + locked + .diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "state_lock_held") + ); + + let unlocked = force_unlock_config_dir(dir.path(), "held-lock"); + assert!(unlocked.ok, "{:?}", unlocked.diagnostics); + + let out = plan_config_dir(dir.path()); + assert!(out.ok, "{:?}", out.diagnostics); + } + #[test] fn plan_reports_state_cas_revision_and_removes_lock() { let dir = fixture(); @@ -2440,11 +2758,19 @@ policies: assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); assert!(!out.state_observations.lock_acquired); assert!(out.state_observations.acquired_lock_id.is_none()); + assert_eq!( + out.state_observations.lock_operation.as_deref(), + Some("plan") + ); assert!( out.diagnostics .iter() .any(|diagnostic| diagnostic.code == "state_lock_held") ); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "state_lock_held" + && diagnostic.message.contains("force-unlock held-lock") + })); } #[test] @@ -2706,11 +3032,19 @@ graphs: assert!(out.state_observations.locked); assert_eq!(out.state_observations.lock_id.as_deref(), Some("held-lock")); assert!(!out.state_observations.lock_acquired); + assert_eq!( + out.state_observations.lock_operation.as_deref(), + Some("refresh") + ); assert!( out.diagnostics .iter() .any(|diagnostic| diagnostic.code == "state_lock_held") ); + assert!(out.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "state_lock_held" + && diagnostic.message.contains("force-unlock held-lock") + })); } #[tokio::test] diff --git a/docs/dev/testing.md b/docs/dev/testing.md index 214dbf0..3c5ee32 100644 --- a/docs/dev/testing.md +++ b/docs/dev/testing.md @@ -8,7 +8,7 @@ This file is the always-on map of the test surface. **Consult it before every ta |---|---|---| | `omnigraph` (engine) | `crates/omnigraph/tests/` | Integration tests (21 files), fixture-driven, share `tests/helpers/mod.rs` | | `omnigraph-cli` | `crates/omnigraph-cli/tests/` | `cli.rs` (unit-ish), `system_local.rs`, `system_remote.rs`, share `tests/support/mod.rs` | -| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling, read-only validate/plan/status plus explicit refresh/import graph observations | +| `omnigraph-cluster` | mostly in-source `#[cfg(test)] mod tests` | Cluster config parser, local JSON state diff, state CAS/lock handling/recovery, read-only validate/plan/status plus explicit refresh/import graph observations | | `omnigraph-server` | `crates/omnigraph-server/tests/` | `server.rs` (HTTP-level), `openapi.rs` (OpenAPI drift / regeneration) | | `omnigraph-compiler` | mostly in-source `#[cfg(test)] mod tests` | Parser, type-checker, IR lowering, lint | diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index ae47a4b..70ac6f4 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -19,7 +19,7 @@ Top-level command families and subcommands. Graph-targeting commands accept eith | `commit list \| show` | inspect commit graph | | `schema plan \| apply \| show (alias: get)` | migrations | | `lint` (alias: `check`) | offline / graph-backed query validation. Replaces `query lint` / `query check`, which are kept as deprecated argv-level shims that print a one-line warning and rewrite to `omnigraph lint` | -| `cluster validate \| plan \| status \| refresh \| import` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`; `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations. No apply, graph-resource mutation, server change, or `plan --refresh` occurs in Stage 2B | +| `cluster validate \| plan \| status \| refresh \| import \| force-unlock` | cluster-control preview. `validate` checks a local `cluster.yaml` folder and referenced schema/query/policy files; `plan` diffs it against local JSON state at `__cluster/state.json`; `status` reads the state ledger; `refresh`/`import` explicitly update local JSON state from read-only graph observations; `force-unlock ` manually removes a held local state lock by exact id. No apply, graph-resource mutation, server change, automatic stale-lock breaking, or `plan --refresh` occurs in Stage 2C | | `optimize` | non-destructive Lance compaction (skips tables with `Blob` columns or uncovered drift; `--json` reports `skipped`) | | `repair [--confirm] [--force]` | preview or explicitly publish uncovered manifest/head drift. `--confirm` heals verified maintenance drift and exits non-zero if suspicious/unverifiable drift is refused; `--force --confirm` publishes suspicious/unverifiable drift after operator review | | `cleanup --keep N --older-than 7d --confirm` | destructive version GC | @@ -81,19 +81,22 @@ omnigraph cluster plan --config ./company-brain --json omnigraph cluster status --config ./company-brain --json omnigraph cluster refresh --config ./company-brain --json omnigraph cluster import --config ./company-brain --json +omnigraph cluster force-unlock --config ./company-brain --json ``` `--config` is a directory containing `cluster.yaml`; it defaults to `.`. -Stage 2B accepts graphs, schemas, stored queries, and policy bundle file +Stage 2C accepts graphs, schemas, stored queries, and policy bundle file references. `cluster plan` reads local JSON state from `/__cluster/state.json`; a missing file means empty state. Plan, refresh, and import acquire `__cluster/lock.json` by default and release it before returning. `cluster status` reads state only and reports any existing -lock. `refresh` requires an existing `state.json`; `import` creates one only -when it is missing. Both observe declared graphs read-only at +lock metadata. `force-unlock` removes a lock only when the supplied id exactly +matches the lock file. `refresh` requires an existing `state.json`; `import` +creates one only when it is missing. Both observe declared graphs read-only at `/graphs/.omni`. External state backends, apply, -`plan --refresh`, pipelines, UI specs, embeddings, aliases, and bindings are -reserved for later stages. See [cluster-config.md](cluster-config.md). +automatic stale-lock breaking, `plan --refresh`, pipelines, UI specs, +embeddings, aliases, and bindings are reserved for later stages. See +[cluster-config.md](cluster-config.md). ## Output formats (`query` command, alias: `read`) diff --git a/docs/user/cluster-config.md b/docs/user/cluster-config.md index 77954bd..24718b1 100644 --- a/docs/user/cluster-config.md +++ b/docs/user/cluster-config.md @@ -1,13 +1,13 @@ # Cluster Config -**Status:** Stage 2B state-observation preview. +**Status:** Stage 2C state-lock recovery preview. Cluster config is the future control-plane configuration surface for a whole OmniGraph deployment. In this stage, OmniGraph can validate a local `cluster.yaml` folder, produce a deterministic read-only plan, inspect the local JSON state ledger, and explicitly refresh/import graph observations into -that ledger. It does not apply desired changes, start servers, or write graph -resources. +that ledger. It can also manually remove a held local state lock by exact lock +id. It does not apply desired changes, start servers, or write graph resources. ## Commands @@ -17,6 +17,7 @@ omnigraph cluster plan --config ./company-brain --json omnigraph cluster status --config ./company-brain --json omnigraph cluster refresh --config ./company-brain --json omnigraph cluster import --config ./company-brain --json +omnigraph cluster force-unlock --config ./company-brain --json ``` `--config` points at a directory, not a file. The directory must contain @@ -24,7 +25,7 @@ omnigraph cluster import --config ./company-brain --json ## Supported `cluster.yaml` -Stage 2B accepts only the read-only resource subset: +Stage 2C accepts only the read-only resource subset: ```yaml version: 1 @@ -53,7 +54,9 @@ policies: defaults to `true`. When enabled, `cluster plan`, `cluster refresh`, and `cluster import` briefly acquire `/__cluster/lock.json`, then remove it before returning. `cluster status` never acquires the lock; it only reports -whether one is present. +whether one is present. `cluster force-unlock` is the only lock-removal command; +it requires the exact lock id and should be run only after confirming no cluster +operation is active. ## Validation @@ -116,19 +119,22 @@ Missing `state_revision` is treated as `0`. Resource status values are Plan output compares desired resource digests against state resource digests and reports `create`, `update`, and `delete` changes. It also reports the state CAS (`sha256:`) and state revision. `state_observations.locked` means an -existing lock file was observed; a successful `plan` instead reports -`lock_acquired: true` and an `acquired_lock_id`, then releases the lock before -returning. The command never writes `state.json` and does not scan live graphs. -Use explicit `cluster refresh` / `cluster import` when the state ledger should -be updated from live observations. Apply and live drift scans during plan are -later-stage work. +existing lock file was observed, along with its metadata (`lock_id`, +`lock_operation`, `lock_created_at`, `lock_pid`, `lock_age_seconds`); a +successful `plan` instead reports `lock_acquired: true` and an +`acquired_lock_id`, then releases the lock before returning. The command never +writes `state.json` and does not scan live graphs. Use explicit +`cluster refresh` / `cluster import` when the state ledger should be updated +from live observations. Apply and live drift scans during plan are later-stage +work. ## Status `cluster status` reads the same local JSON state ledger and prints what the ledger says is deployed. It does not validate referenced schema/query/policy files and does not inspect live graphs. Missing `state.json` succeeds with a -warning; invalid state JSON or an unsupported state version fails. +warning; invalid state JSON or an unsupported state version fails. If a lock is +present, status reports its id, operation, creation time, pid, and age. ## Refresh And Import @@ -150,3 +156,14 @@ initial state. Refresh/import do not observe query or policy resources yet. Existing query and policy state digests are preserved on refresh and are not invented on import. + +## Force Unlock + +`cluster force-unlock ` removes `/__cluster/lock.json` only +when the file exists, is valid version-1 lock JSON, and its `lock_id` exactly +matches the argument. A wrong id, missing lock, invalid lock JSON, or unsupported +lock version exits non-zero and leaves the file untouched. + +This is manual recovery for abandoned local locks. OmniGraph does not perform +PID-liveness checks, TTL expiry, stale-lock breaking, or automatic unlock in +Stage 2C.