Add cluster state lock recovery

This commit is contained in:
aaltshuler 2026-06-09 02:12:00 +03:00 committed by Andrew Altshuler
parent 737a0f6e45
commit 89b876c797
6 changed files with 597 additions and 51 deletions

View file

@ -11,8 +11,9 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
use omnigraph_cluster::{
DiagnosticSeverity, PlanOutput, StateSyncOutput, StatusOutput, ValidateOutput,
import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir,
DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput,
ValidateOutput, force_unlock_config_dir, import_config_dir, plan_config_dir,
refresh_config_dir, status_config_dir, validate_config_dir,
};
use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::schema::parser::parse_schema;
@ -387,6 +388,17 @@ enum ClusterCommand {
#[arg(long)]
json: bool,
},
/// Remove a held local JSON state lock after operator confirmation.
ForceUnlock {
/// Exact lock id from cluster status or a state_lock_held diagnostic.
lock_id: String,
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
}
/// Operations on the graph registry of a multi-graph server (MR-668).
@ -804,10 +816,7 @@ fn print_cluster_status_human(output: &StatusOutput) {
println!(" applied config: {digest}");
}
if state.locked {
match state.lock_id.as_deref() {
Some(lock_id) => println!(" lock: held ({lock_id})"),
None => println!(" lock: held"),
}
println!(" lock: held{}", cluster_lock_summary(state));
} else {
println!(" lock: not held");
}
@ -835,10 +844,7 @@ fn print_cluster_state_sync_human(output: &StateSyncOutput) {
println!(" state_cas: {cas}");
}
if state.locked {
match state.lock_id.as_deref() {
Some(lock_id) => println!(" lock: acquired ({lock_id})"),
None => println!(" lock: acquired"),
}
println!(" lock: acquired{}", cluster_lock_summary(state));
} else {
println!(" lock: not acquired");
}
@ -848,6 +854,48 @@ fn print_cluster_state_sync_human(output: &StateSyncOutput) {
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_force_unlock_human(output: &ForceUnlockOutput) {
if output.ok {
if output.lock_removed {
println!(
"cluster force-unlock: removed lock{}",
cluster_lock_summary(&output.state_observations)
);
} else {
println!("cluster force-unlock: no lock removed");
}
} else {
println!("cluster force-unlock failed");
if output.state_observations.locked {
println!(
" lock: held{}",
cluster_lock_summary(&output.state_observations)
);
}
}
print_cluster_diagnostics(&output.diagnostics);
}
fn cluster_lock_summary(state: &omnigraph_cluster::StateObservations) -> String {
let Some(lock_id) = state.lock_id.as_deref() else {
return String::new();
};
let mut parts = vec![format!("id={lock_id}")];
if let Some(operation) = state.lock_operation.as_deref() {
parts.push(format!("operation={operation}"));
}
if let Some(pid) = state.lock_pid {
parts.push(format!("pid={pid}"));
}
if let Some(created_at) = state.lock_created_at.as_deref() {
parts.push(format!("created_at={created_at}"));
}
if let Some(age_seconds) = state.lock_age_seconds {
parts.push(format!("age_seconds={age_seconds}"));
}
format!(" ({})", parts.join(", "))
}
fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) {
for diagnostic in diagnostics {
let label = match diagnostic.severity {
@ -913,6 +961,19 @@ fn finish_cluster_state_sync(output: &StateSyncOutput, json: bool) -> Result<()>
Ok(())
}
fn finish_cluster_force_unlock(output: &ForceUnlockOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_force_unlock_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn is_remote_uri(uri: &str) -> bool {
uri.starts_with("http://") || uri.starts_with("https://")
}
@ -3443,6 +3504,14 @@ async fn main() -> Result<()> {
let output = import_config_dir(config).await;
finish_cluster_state_sync(&output, json)?;
}
ClusterCommand::ForceUnlock {
lock_id,
config,
json,
} => {
let output = force_unlock_config_dir(config, lock_id);
finish_cluster_force_unlock(&output, json)?;
}
},
Command::Graphs { command } => match command {
GraphsCommand::List {