mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-15 01:55:13 +02:00
Implement cluster refresh and import
This commit is contained in:
parent
a7956ea5a9
commit
cb1e7bb5ea
9 changed files with 1208 additions and 29 deletions
|
|
@ -11,8 +11,8 @@ use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
|
|||
use omnigraph::loader::LoadMode;
|
||||
use omnigraph::storage::normalize_root_uri;
|
||||
use omnigraph_cluster::{
|
||||
DiagnosticSeverity, PlanOutput, StatusOutput, ValidateOutput, plan_config_dir,
|
||||
status_config_dir, validate_config_dir,
|
||||
DiagnosticSeverity, PlanOutput, StateSyncOutput, StatusOutput, ValidateOutput,
|
||||
import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir,
|
||||
};
|
||||
use omnigraph_compiler::query::parser::parse_query;
|
||||
use omnigraph_compiler::schema::parser::parse_schema;
|
||||
|
|
@ -350,6 +350,24 @@ enum ClusterCommand {
|
|||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Refresh existing local JSON state from declared graph observations.
|
||||
Refresh {
|
||||
/// Cluster config directory containing cluster.yaml.
|
||||
#[arg(long, default_value = ".")]
|
||||
config: PathBuf,
|
||||
/// Emit JSON instead of human text.
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Import initial local JSON state from declared graph observations.
|
||||
Import {
|
||||
/// Cluster config directory containing cluster.yaml.
|
||||
#[arg(long, default_value = ".")]
|
||||
config: PathBuf,
|
||||
/// Emit JSON instead of human text.
|
||||
#[arg(long)]
|
||||
json: bool,
|
||||
},
|
||||
}
|
||||
|
||||
/// Operations on the graph registry of a multi-graph server (MR-668).
|
||||
|
|
@ -783,6 +801,34 @@ fn print_cluster_status_human(output: &StatusOutput) {
|
|||
print_cluster_diagnostics(&output.diagnostics);
|
||||
}
|
||||
|
||||
fn print_cluster_state_sync_human(output: &StateSyncOutput) {
|
||||
let operation = match output.operation {
|
||||
omnigraph_cluster::StateSyncOperation::Refresh => "refresh",
|
||||
omnigraph_cluster::StateSyncOperation::Import => "import",
|
||||
};
|
||||
if output.ok {
|
||||
let state = &output.state_observations;
|
||||
println!(
|
||||
"cluster {operation}: revision {}, {} resource(s)",
|
||||
state.state_revision, state.resource_count
|
||||
);
|
||||
if let Some(cas) = state.state_cas.as_deref() {
|
||||
println!(" state_cas: {cas}");
|
||||
}
|
||||
if state.locked {
|
||||
match state.lock_id.as_deref() {
|
||||
Some(lock_id) => println!(" lock: acquired ({lock_id})"),
|
||||
None => println!(" lock: acquired"),
|
||||
}
|
||||
} else {
|
||||
println!(" lock: not acquired");
|
||||
}
|
||||
} else {
|
||||
println!("cluster {operation} failed");
|
||||
}
|
||||
print_cluster_diagnostics(&output.diagnostics);
|
||||
}
|
||||
|
||||
fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) {
|
||||
for diagnostic in diagnostics {
|
||||
let label = match diagnostic.severity {
|
||||
|
|
@ -835,6 +881,19 @@ fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn finish_cluster_state_sync(output: &StateSyncOutput, json: bool) -> Result<()> {
|
||||
if json {
|
||||
print_json(output)?;
|
||||
} else {
|
||||
print_cluster_state_sync_human(output);
|
||||
}
|
||||
if !output.ok {
|
||||
io::stdout().flush()?;
|
||||
std::process::exit(1);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_remote_uri(uri: &str) -> bool {
|
||||
uri.starts_with("http://") || uri.starts_with("https://")
|
||||
}
|
||||
|
|
@ -3272,6 +3331,14 @@ async fn main() -> Result<()> {
|
|||
let output = status_config_dir(config);
|
||||
finish_cluster_status(&output, json)?;
|
||||
}
|
||||
ClusterCommand::Refresh { config, json } => {
|
||||
let output = refresh_config_dir(config).await;
|
||||
finish_cluster_state_sync(&output, json)?;
|
||||
}
|
||||
ClusterCommand::Import { config, json } => {
|
||||
let output = import_config_dir(config).await;
|
||||
finish_cluster_state_sync(&output, json)?;
|
||||
}
|
||||
},
|
||||
Command::Graphs { command } => match command {
|
||||
GraphsCommand::List {
|
||||
|
|
|
|||
|
|
@ -124,6 +124,18 @@ policies:
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
fn init_cluster_derived_graph(root: &std::path::Path) {
|
||||
let graph_dir = root.join("graphs");
|
||||
fs::create_dir_all(&graph_dir).unwrap();
|
||||
output_success(
|
||||
cli()
|
||||
.arg("init")
|
||||
.arg("--schema")
|
||||
.arg(root.join("people.pg"))
|
||||
.arg(graph_dir.join("knowledge.omni")),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn version_command_prints_current_cli_version() {
|
||||
let output = output_success(cli().arg("version"));
|
||||
|
|
@ -376,6 +388,196 @@ fn cluster_plan_locked_state_exits_nonzero() {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_import_json_bootstraps_missing_state() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("import")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["operation"], "import");
|
||||
assert_eq!(json["state_observations"]["state_revision"], 1);
|
||||
assert!(
|
||||
json["state_observations"]["state_cas"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.starts_with("sha256:")
|
||||
);
|
||||
assert!(json["observations"]["graph.knowledge"]["manifest_version"].is_number());
|
||||
assert_eq!(
|
||||
json["resource_statuses"]["graph.knowledge"]["status"],
|
||||
"applied"
|
||||
);
|
||||
assert!(temp.path().join("__cluster/state.json").exists());
|
||||
assert!(!temp.path().join("__cluster/lock.json").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_refresh_json_updates_revision_cas_and_removes_lock() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"
|
||||
{
|
||||
"version": 1,
|
||||
"state_revision": 2,
|
||||
"applied_revision": { "resources": {} }
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("refresh")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["operation"], "refresh");
|
||||
assert_eq!(json["state_observations"]["state_revision"], 3);
|
||||
assert!(
|
||||
json["state_observations"]["state_cas"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.starts_with("sha256:")
|
||||
);
|
||||
assert!(!state_dir.join("lock.json").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_refresh_missing_state_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("refresh")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
);
|
||||
let json = parse_stdout_json(&output);
|
||||
assert_eq!(json["ok"], false);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_missing"),
|
||||
"missing state should produce a useful diagnostic: {json}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_import_existing_state_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"{"version":1,"applied_revision":{"resources":{}}}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("import")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
);
|
||||
let json = parse_stdout_json(&output);
|
||||
assert_eq!(json["ok"], false);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_already_exists"),
|
||||
"existing state should produce a useful diagnostic: {json}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_refresh_and_import_locked_state_exit_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"{"version":1,"applied_revision":{"resources":{}}}"#,
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
state_dir.join("lock.json"),
|
||||
r#"{"version":1,"lock_id":"held-lock","operation":"refresh","created_at":"2026-06-08T00:00:00Z","pid":123}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let refresh = parse_stdout_json(&output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("refresh")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(refresh["state_observations"]["lock_id"], "held-lock");
|
||||
assert!(
|
||||
refresh["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_lock_held")
|
||||
);
|
||||
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("lock.json"),
|
||||
r#"{"version":1,"lock_id":"held-lock","operation":"import","created_at":"2026-06-08T00:00:00Z","pid":123}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let imported = parse_stdout_json(&output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("import")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(imported["state_observations"]["lock_id"], "held-lock");
|
||||
assert!(
|
||||
imported["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_lock_held")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_validate_invalid_config_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue