Add cluster state lock recovery

This commit is contained in:
aaltshuler 2026-06-09 02:12:00 +03:00
parent cb1e7bb5ea
commit 4fffddc6b7
6 changed files with 596 additions and 52 deletions

View file

@ -11,8 +11,9 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
use omnigraph_cluster::{
DiagnosticSeverity, PlanOutput, StateSyncOutput, StatusOutput, ValidateOutput,
import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir,
DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, force_unlock_config_dir, import_config_dir, plan_config_dir,
refresh_config_dir, status_config_dir, validate_config_dir,
};
use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::schema::parser::parse_schema;
@ -368,6 +369,17 @@ enum ClusterCommand {
#[arg(long)]
json: bool,
},
/// Remove a held local JSON state lock after operator confirmation.
ForceUnlock {
/// Exact lock id from cluster status or a state_lock_held diagnostic.
lock_id: String,
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
}
/// Operations on the graph registry of a multi-graph server (MR-668).
@ -785,10 +797,7 @@ fn print_cluster_status_human(output: &StatusOutput) {
println!(" applied config: {digest}");
}
if state.locked {
match state.lock_id.as_deref() {
Some(lock_id) => println!(" lock: held ({lock_id})"),
None => println!(" lock: held"),
}
println!(" lock: held{}", cluster_lock_summary(state));
} else {
println!(" lock: not held");
}
@ -816,10 +825,7 @@ fn print_cluster_state_sync_human(output: &StateSyncOutput) {
println!(" state_cas: {cas}");
}
if state.locked {
match state.lock_id.as_deref() {
Some(lock_id) => println!(" lock: acquired ({lock_id})"),
None => println!(" lock: acquired"),
}
println!(" lock: acquired{}", cluster_lock_summary(state));
} else {
println!(" lock: not acquired");
}
@ -829,6 +835,48 @@ fn print_cluster_state_sync_human(output: &StateSyncOutput) {
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_force_unlock_human(output: &ForceUnlockOutput) {
if output.ok {
if output.lock_removed {
println!(
"cluster force-unlock: removed lock{}",
cluster_lock_summary(&output.state_observations)
);
} else {
println!("cluster force-unlock: no lock removed");
}
} else {
println!("cluster force-unlock failed");
if output.state_observations.locked {
println!(
" lock: held{}",
cluster_lock_summary(&output.state_observations)
);
}
}
print_cluster_diagnostics(&output.diagnostics);
}
fn cluster_lock_summary(state: &omnigraph_cluster::StateObservations) -> String {
let Some(lock_id) = state.lock_id.as_deref() else {
return String::new();
};
let mut parts = vec![format!("id={lock_id}")];
if let Some(operation) = state.lock_operation.as_deref() {
parts.push(format!("operation={operation}"));
}
if let Some(pid) = state.lock_pid {
parts.push(format!("pid={pid}"));
}
if let Some(created_at) = state.lock_created_at.as_deref() {
parts.push(format!("created_at={created_at}"));
}
if let Some(age_seconds) = state.lock_age_seconds {
parts.push(format!("age_seconds={age_seconds}"));
}
format!(" ({})", parts.join(", "))
}
fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) {
for diagnostic in diagnostics {
let label = match diagnostic.severity {
@ -894,6 +942,19 @@ fn finish_cluster_state_sync(output: &StateSyncOutput, json: bool) -> Result<()>
Ok(())
}
fn finish_cluster_force_unlock(output: &ForceUnlockOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_force_unlock_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn is_remote_uri(uri: &str) -> bool {
uri.starts_with("http://") || uri.starts_with("https://")
}
@ -3339,6 +3400,14 @@ async fn main() -> Result<()> {
let output = import_config_dir(config).await;
finish_cluster_state_sync(&output, json)?;
}
ClusterCommand::ForceUnlock {
lock_id,
config,
json,
} => {
let output = force_unlock_config_dir(config, lock_id);
finish_cluster_force_unlock(&output, json)?;
}
},
Command::Graphs { command } => match command {
GraphsCommand::List {

View file

@ -136,6 +136,18 @@ fn init_cluster_derived_graph(root: &std::path::Path) {
);
}
fn write_cluster_lock(root: &std::path::Path, lock_id: &str, operation: &str) {
let state_dir = root.join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("lock.json"),
format!(
r#"{{"version":1,"lock_id":"{lock_id}","operation":"{operation}","created_at":"1970-01-01T00:00:00Z","pid":123}}"#
),
)
.unwrap();
}
#[test]
fn version_command_prints_current_cli_version() {
let output = output_success(cli().arg("version"));
@ -251,6 +263,32 @@ fn cluster_status_json_reports_missing_state() {
);
}
#[test]
fn cluster_status_json_reports_lock_metadata() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_lock(temp.path(), "held-lock", "refresh");
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("status")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["locked"], true);
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
assert_eq!(json["state_observations"]["lock_operation"], "refresh");
assert_eq!(json["state_observations"]["lock_pid"], 123);
assert_eq!(
json["state_observations"]["lock_created_at"],
"1970-01-01T00:00:00Z"
);
assert!(json["state_observations"]["lock_age_seconds"].is_number());
}
#[test]
fn cluster_status_json_reports_extended_state() {
let temp = tempdir().unwrap();
@ -351,21 +389,7 @@ fn cluster_plan_json_includes_state_cas_revision_and_lock_observation() {
fn cluster_plan_locked_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("lock.json"),
r#"
{
"version": 1,
"lock_id": "held-lock",
"operation": "plan",
"created_at": "2026-06-08T00:00:00Z",
"pid": 123
}
"#,
)
.unwrap();
write_cluster_lock(temp.path(), "held-lock", "plan");
let output = output_failure(
cli()
@ -378,16 +402,116 @@ fn cluster_plan_locked_state_exits_nonzero() {
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert_eq!(json["state_observations"]["locked"], true);
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
assert_eq!(json["state_observations"]["lock_operation"], "plan");
assert_eq!(json["state_observations"]["lock_pid"], 123);
assert_eq!(
json["state_observations"]["lock_created_at"],
"1970-01-01T00:00:00Z"
);
assert!(json["state_observations"]["lock_age_seconds"].is_number());
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held"),
.any(|diagnostic| diagnostic["code"] == "state_lock_held"
&& diagnostic["message"]
.as_str()
.unwrap()
.contains("force-unlock held-lock")),
"locked state should produce a useful diagnostic: {json}"
);
}
#[test]
fn cluster_force_unlock_json_removes_lock() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_lock(temp.path(), "held-lock", "plan");
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("force-unlock")
.arg("held-lock")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["lock_removed"], true);
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
assert_eq!(json["state_observations"]["lock_operation"], "plan");
assert!(!temp.path().join("__cluster/lock.json").exists());
}
#[test]
fn cluster_force_unlock_wrong_id_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_lock(temp.path(), "held-lock", "plan");
let json = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("force-unlock")
.arg("other-lock")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], false);
assert_eq!(json["lock_removed"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_id_mismatch")
);
assert!(temp.path().join("__cluster/lock.json").exists());
}
#[test]
fn cluster_locked_plan_then_force_unlock_then_plan_succeeds() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_lock(temp.path(), "held-lock", "plan");
let locked = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(locked["ok"], false);
assert_eq!(locked["state_observations"]["lock_id"], "held-lock");
let unlocked = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("force-unlock")
.arg("held-lock")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(unlocked["lock_removed"], true);
let planned = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(planned["ok"], true);
}
#[test]
fn cluster_import_json_bootstraps_missing_state() {
let temp = tempdir().unwrap();