Merge origin/main (v0.6.2, cluster Stage 2B) into ragnorc/scrutinize-rfc-002

Conflict resolutions:
- cli/main.rs: keep the omnigraph_api_types import (branch extraction) and
  add main's omnigraph_cluster import; test-import list takes the branch's
  ResolvedCliGraph/is_remote_uri additions
- Cargo manifests: union of deps — branch's config/queries/api-types crates
  plus main's cluster crate; all versions unified at 0.6.2 (new crates bumped
  from 0.6.1)
- cli/Cargo.toml: omnigraph-server dep stays dropped (branch decision; CLI
  references it only in comments and the test harness binary spawn)
- AGENTS.md: 0.6.2 + union crate list
- cli-reference.md: branch's --graph/layered-config sections + main's
  cluster/repair rows
- Cargo.lock regenerated

cargo test --workspace --locked passes; scripts/check-agents-md.sh passes.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-09 22:02:50 +03:00
commit 48912167d0
91 changed files with 10889 additions and 736 deletions

View file

@ -1,6 +1,6 @@
[package]
name = "omnigraph-cli"
version = "0.6.1"
version = "0.6.2"
edition = "2024"
description = "CLI for the Omnigraph graph database."
license = "MIT"
@ -13,12 +13,13 @@ name = "omnigraph"
path = "src/main.rs"
[dependencies]
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.1" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.1" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.1" }
omnigraph-config = { path = "../omnigraph-config", version = "0.6.1" }
omnigraph-queries = { path = "../omnigraph-queries", version = "0.6.1" }
omnigraph-api-types = { path = "../omnigraph-api-types", version = "0.6.1" }
omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" }
omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" }
omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.6.2" }
omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" }
omnigraph-config = { path = "../omnigraph-config", version = "0.6.2" }
omnigraph-queries = { path = "../omnigraph-queries", version = "0.6.2" }
omnigraph-api-types = { path = "../omnigraph-api-types", version = "0.6.2" }
clap = { workspace = true }
color-eyre = { workspace = true }
serde = { workspace = true }

View file

@ -18,6 +18,10 @@ use omnigraph_api_types::{
SnapshotTableOutput, commit_output, ingest_output, read_output, schema_apply_output,
snapshot_payload,
};
use omnigraph_cluster::{
DiagnosticSeverity, PlanOutput, StateSyncOutput, StatusOutput, ValidateOutput,
import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir,
};
use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::schema::parser::parse_schema;
use omnigraph_compiler::{
@ -286,6 +290,25 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Classify and explicitly repair manifest/head drift
Repair {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
/// Publish verified maintenance drift. Without this flag, repair only
/// previews what it would do.
#[arg(long)]
confirm: bool,
/// Also publish suspicious or unverifiable drift. Requires
/// `--confirm`; use only after operator review.
#[arg(long, requires = "confirm")]
force: bool,
#[arg(long)]
json: bool,
},
/// Remove old Lance versions from every table of the graph (destructive)
Cleanup {
/// Graph URI
@ -308,6 +331,11 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Validate and plan read-only cluster configuration.
Cluster {
#[command(subcommand)]
command: ClusterCommand,
},
/// Manage graphs on a multi-graph server (MR-668)
Graphs {
#[command(subcommand)]
@ -328,6 +356,55 @@ enum Command {
},
}
#[derive(Debug, Subcommand)]
enum ClusterCommand {
/// Validate cluster.yaml and referenced schemas, queries, and policy files.
Validate {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Produce a read-only plan by diffing cluster.yaml against __cluster/state.json.
Plan {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Read the local JSON state ledger without scanning live graph resources.
Status {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Refresh existing local JSON state from declared graph observations.
Refresh {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Import initial local JSON state from declared graph observations.
Import {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
}
/// Operations on the graph registry of a multi-graph server (MR-668).
///
/// All operations target a remote multi-graph server URL (http:// or
@ -720,6 +797,159 @@ fn print_json<T: Serialize>(value: &T) -> Result<()> {
Ok(())
}
fn print_cluster_validate_human(output: &ValidateOutput) {
if output.ok {
println!(
"cluster config valid: {} resource(s), {} dependency edge(s)",
output.resources.len(),
output.dependencies.len()
);
} else {
println!("cluster config invalid");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_plan_human(output: &PlanOutput) {
if output.ok {
println!(
"cluster plan: {} change(s), {} approval gate(s)",
output.changes.len(),
output.approvals_required.len()
);
for change in &output.changes {
println!(" {:?} {}", change.operation, change.resource);
}
if output.changes.is_empty() {
println!(" no changes");
}
} else {
println!("cluster plan failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_status_human(output: &StatusOutput) {
if output.ok {
let state = &output.state_observations;
if state.state_found {
println!(
"cluster state: revision {}, {} resource(s)",
state.state_revision, state.resource_count
);
if let Some(digest) = state.applied_config_digest.as_deref() {
println!(" applied config: {digest}");
}
if state.locked {
match state.lock_id.as_deref() {
Some(lock_id) => println!(" lock: held ({lock_id})"),
None => println!(" lock: held"),
}
} else {
println!(" lock: not held");
}
} else {
println!("cluster state missing");
}
} else {
println!("cluster status failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_state_sync_human(output: &StateSyncOutput) {
let operation = match output.operation {
omnigraph_cluster::StateSyncOperation::Refresh => "refresh",
omnigraph_cluster::StateSyncOperation::Import => "import",
};
if output.ok {
let state = &output.state_observations;
println!(
"cluster {operation}: revision {}, {} resource(s)",
state.state_revision, state.resource_count
);
if let Some(cas) = state.state_cas.as_deref() {
println!(" state_cas: {cas}");
}
if state.locked {
match state.lock_id.as_deref() {
Some(lock_id) => println!(" lock: acquired ({lock_id})"),
None => println!(" lock: acquired"),
}
} else {
println!(" lock: not acquired");
}
} else {
println!("cluster {operation} failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) {
for diagnostic in diagnostics {
let label = match diagnostic.severity {
DiagnosticSeverity::Error => "ERROR",
DiagnosticSeverity::Warning => "WARN ",
};
println!(
"{label} {} {}: {}",
diagnostic.code, diagnostic.path, diagnostic.message
);
}
}
fn finish_cluster_validate(output: &ValidateOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_validate_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_plan_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_status_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn finish_cluster_state_sync(output: &StateSyncOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_state_sync_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
fn is_remote_uri(uri: &str) -> bool {
uri.starts_with("http://") || uri.starts_with("https://")
}
@ -3188,6 +3418,8 @@ async fn main() -> Result<()> {
"fragments_added": s.fragments_added,
"committed": s.committed,
"skipped": s.skipped.map(|r| r.as_str()),
"manifest_version": s.manifest_version,
"lance_head_version": s.lance_head_version,
})).collect::<Vec<_>>(),
});
print_json(&value)?;
@ -3207,6 +3439,89 @@ async fn main() -> Result<()> {
}
}
}
Command::Repair {
uri,
target,
config,
confirm,
force,
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let db = Omnigraph::open(&uri).await?;
let stats = db
.repair(omnigraph::db::RepairOptions { confirm, force })
.await?;
let refused_count = stats
.tables
.iter()
.filter(|s| matches!(s.action, omnigraph::db::RepairAction::Refused))
.count();
if json {
let value = serde_json::json!({
"uri": uri,
"confirm": confirm,
"force": force,
"manifest_version": stats.manifest_version,
"tables": stats.tables.iter().map(|s| serde_json::json!({
"table_key": s.table_key,
"manifest_version": s.manifest_version,
"lance_head_version": s.lance_head_version,
"classification": s.classification.as_str(),
"action": s.action.as_str(),
"operations": s.operations,
"error": s.error,
})).collect::<Vec<_>>(),
});
print_json(&value)?;
} else {
let mode = if confirm { "confirm" } else { "preview" };
println!(
"repair {} — {} mode, {} tables",
uri,
mode,
stats.tables.len()
);
for s in &stats.tables {
let drift = if s.manifest_version == s.lance_head_version {
format!("{}", s.manifest_version)
} else {
format!("{}{}", s.manifest_version, s.lance_head_version)
};
let ops = if s.operations.is_empty() {
String::new()
} else {
format!(" [{}]", s.operations.join(", "))
};
let err = s
.error
.as_ref()
.map(|err| format!(" ({err})"))
.unwrap_or_default();
println!(
" {:<40} {:<12} {:<22} {}{}{}",
s.table_key,
s.action.as_str(),
s.classification.as_str(),
drift,
ops,
err
);
}
if !confirm {
println!("rerun with --confirm to publish verified maintenance drift");
}
}
if refused_count > 0 {
bail!(
"repair refused {} suspicious or unverifiable table(s); review the preview \
output and rerun with --force --confirm only if publishing that drift is \
intentional",
refused_count
);
}
}
Command::Cleanup {
uri,
target,
@ -3287,6 +3602,28 @@ async fn main() -> Result<()> {
}
}
}
Command::Cluster { command } => match command {
ClusterCommand::Validate { config, json } => {
let output = validate_config_dir(config);
finish_cluster_validate(&output, json)?;
}
ClusterCommand::Plan { config, json } => {
let output = plan_config_dir(config);
finish_cluster_plan(&output, json)?;
}
ClusterCommand::Status { config, json } => {
let output = status_config_dir(config);
finish_cluster_status(&output, json)?;
}
ClusterCommand::Refresh { config, json } => {
let output = refresh_config_dir(config).await;
finish_cluster_state_sync(&output, json)?;
}
ClusterCommand::Import { config, json } => {
let output = import_config_dir(config).await;
finish_cluster_state_sync(&output, json)?;
}
},
Command::Graphs { command } => match command {
GraphsCommand::List {
uri,

View file

@ -1,5 +1,6 @@
use std::fs;
use lance::Dataset;
use lance::index::DatasetIndexExt;
use omnigraph::db::{Omnigraph, ReadTarget};
use serde_json::Value;
@ -60,6 +61,25 @@ fn manifest_dataset_version(graph: &std::path::Path) -> u64 {
})
}
fn forge_person_delete_drift(graph: &std::path::Path) -> (u64, u64) {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let uri = graph.to_string_lossy();
let db = Omnigraph::open(uri.as_ref()).await.unwrap();
let snap = db
.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap();
let entry = snap.entry("node:Person").unwrap();
let full_path = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
let mut ds = Dataset::open(&full_path).await.unwrap();
let deleted = ds.delete("name = 'Alice'").await.unwrap();
assert_eq!(deleted.num_deleted_rows, 1);
let head = deleted.new_dataset.version().version;
assert!(head > entry.table_version);
(entry.table_version, head)
})
}
fn write_policy_config_fixture(root: &std::path::Path) -> (std::path::PathBuf, std::path::PathBuf) {
let config = root.join("omnigraph.yaml");
let policy = root.join("policy.yaml");
@ -78,6 +98,64 @@ policy:
(config, policy)
}
fn write_cluster_config_fixture(root: &std::path::Path) {
fs::write(
root.join("people.pg"),
r#"
node Person {
name: String @key
age: I32?
}
"#,
)
.unwrap();
fs::write(
root.join("people.gq"),
r#"
query find_person($name: String) {
match { $p: Person { name: $name } }
return { $p.name, $p.age }
}
"#,
)
.unwrap();
fs::write(root.join("base.policy.yaml"), "rules: []\n").unwrap();
fs::write(
root.join("cluster.yaml"),
r#"
version: 1
metadata:
name: company-brain
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
base:
file: ./base.policy.yaml
applies_to: [knowledge]
"#,
)
.unwrap();
}
fn init_cluster_derived_graph(root: &std::path::Path) {
let graph_dir = root.join("graphs");
fs::create_dir_all(&graph_dir).unwrap();
output_success(
cli()
.arg("init")
.arg("--schema")
.arg(root.join("people.pg"))
.arg(graph_dir.join("knowledge.omni")),
);
}
#[test]
fn version_command_prints_current_cli_version() {
let output = output_success(cli().arg("version"));
@ -89,6 +167,470 @@ fn version_command_prints_current_cli_version() {
);
}
#[test]
fn cluster_validate_config_success() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path()),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("cluster config valid"), "{stdout}");
}
#[test]
fn cluster_validate_json_is_stable() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert!(json["resource_digests"]["graph.knowledge"].is_string());
assert!(json["resource_digests"]["query.knowledge.find_person"].is_string());
assert_eq!(json["dependencies"][0]["from"], "policy.base");
assert_eq!(json["dependencies"][0]["to"], "graph.knowledge");
}
#[test]
fn cluster_plan_json_reads_inferred_local_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"applied_revision": {
"config_digest": "old",
"resources": {
"graph.knowledge": { "digest": "old-graph" },
"policy.old": { "digest": "old-policy" }
}
}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_found"], true);
assert!(
json["changes"]
.as_array()
.unwrap()
.iter()
.any(|change| change["resource"] == "policy.old" && change["operation"] == "delete"),
"plan should read state and delete stale resources: {json}"
);
}
#[test]
fn cluster_status_json_reports_missing_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("status")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_found"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_missing"),
"missing state should be a warning diagnostic: {json}"
);
}
#[test]
fn cluster_status_json_reports_extended_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"state_revision": 5,
"applied_revision": {
"config_digest": "applied",
"resources": {
"graph.knowledge": { "digest": "graph-digest" }
}
},
"resource_statuses": {
"graph.knowledge": { "status": "applied", "conditions": ["healthy"] }
},
"approval_records": {},
"recovery_records": {},
"observations": {}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("status")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_revision"], 5);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["resource_digests"]["graph.knowledge"], "graph-digest");
assert_eq!(
json["resource_statuses"]["graph.knowledge"]["status"],
"applied"
);
}
#[test]
fn cluster_plan_json_includes_state_cas_revision_and_lock_observation() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"state_revision": 9,
"applied_revision": {
"config_digest": "old",
"resources": {
"graph.knowledge": { "digest": "old-graph" }
}
}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_revision"], 9);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["state_observations"]["locked"], false);
assert_eq!(json["state_observations"]["lock_acquired"], true);
assert!(json["state_observations"]["acquired_lock_id"].is_string());
assert!(!state_dir.join("lock.json").exists());
}
#[test]
fn cluster_plan_locked_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("lock.json"),
r#"
{
"version": 1,
"lock_id": "held-lock",
"operation": "plan",
"created_at": "2026-06-08T00:00:00Z",
"pid": 123
}
"#,
)
.unwrap();
let output = output_failure(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert_eq!(json["state_observations"]["locked"], true);
assert_eq!(json["state_observations"]["lock_acquired"], false);
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held"),
"locked state should produce a useful diagnostic: {json}"
);
}
#[test]
fn cluster_import_json_bootstraps_missing_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("import")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["operation"], "import");
assert_eq!(json["state_observations"]["state_revision"], 1);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["state_observations"]["locked"], false);
assert_eq!(json["state_observations"]["lock_acquired"], true);
assert!(json["state_observations"]["acquired_lock_id"].is_string());
assert!(json["observations"]["graph.knowledge"]["manifest_version"].is_number());
assert_eq!(
json["resource_statuses"]["graph.knowledge"]["status"],
"applied"
);
assert!(temp.path().join("__cluster/state.json").exists());
assert!(!temp.path().join("__cluster/lock.json").exists());
}
#[test]
fn cluster_refresh_json_updates_revision_cas_and_removes_lock() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"state_revision": 2,
"applied_revision": { "resources": {} }
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("refresh")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["operation"], "refresh");
assert_eq!(json["state_observations"]["state_revision"], 3);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["state_observations"]["locked"], false);
assert_eq!(json["state_observations"]["lock_acquired"], true);
assert!(json["state_observations"]["acquired_lock_id"].is_string());
assert!(!state_dir.join("lock.json").exists());
}
#[test]
fn cluster_refresh_missing_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_failure(
cli()
.arg("cluster")
.arg("refresh")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_missing"),
"missing state should produce a useful diagnostic: {json}"
);
}
#[test]
fn cluster_import_existing_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"{"version":1,"applied_revision":{"resources":{}}}"#,
)
.unwrap();
let output = output_failure(
cli()
.arg("cluster")
.arg("import")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_already_exists"),
"existing state should produce a useful diagnostic: {json}"
);
}
#[test]
fn cluster_refresh_and_import_locked_state_exit_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"{"version":1,"applied_revision":{"resources":{}}}"#,
)
.unwrap();
fs::write(
state_dir.join("lock.json"),
r#"{"version":1,"lock_id":"held-lock","operation":"refresh","created_at":"2026-06-08T00:00:00Z","pid":123}"#,
)
.unwrap();
let refresh = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("refresh")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(refresh["state_observations"]["locked"], true);
assert_eq!(refresh["state_observations"]["lock_id"], "held-lock");
assert_eq!(refresh["state_observations"]["lock_acquired"], false);
assert!(
refresh["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held")
);
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("lock.json"),
r#"{"version":1,"lock_id":"held-lock","operation":"import","created_at":"2026-06-08T00:00:00Z","pid":123}"#,
)
.unwrap();
let imported = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("import")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(imported["state_observations"]["locked"], true);
assert_eq!(imported["state_observations"]["lock_id"], "held-lock");
assert_eq!(imported["state_observations"]["lock_acquired"], false);
assert!(
imported["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held")
);
}
#[test]
fn cluster_validate_invalid_config_exits_nonzero() {
let temp = tempdir().unwrap();
fs::write(
temp.path().join("cluster.yaml"),
"version: 1\ngraphs: {}\npipelines: {}\n",
)
.unwrap();
let output = output_failure(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path()),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("future_phase_field"), "{stdout}");
}
#[test]
fn short_version_flag_prints_current_cli_version() {
let output = output_success(cli().arg("-v"));
@ -450,6 +992,83 @@ fn explicit_omnigraph_config_pointing_at_missing_file_errors() {
);
}
#[test]
fn repair_json_reports_noop_on_clean_graph() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
init_graph(&graph);
load_fixture(&graph);
let output = output_success(cli().arg("repair").arg("--json").arg(&graph));
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["confirm"], false);
assert_eq!(payload["force"], false);
assert_eq!(payload["manifest_version"], Value::Null);
let tables = payload["tables"].as_array().unwrap();
assert_eq!(tables.len(), 4);
assert!(tables.iter().all(|table| {
table["classification"] == "no_drift" && table["action"] == "no_op"
}));
}
#[test]
fn repair_confirm_json_refuses_suspicious_drift_with_nonzero_exit_then_force_succeeds() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
init_graph(&graph);
load_fixture(&graph);
let graph_manifest_before = manifest_dataset_version(&graph);
let (table_manifest_before, table_head_before) = forge_person_delete_drift(&graph);
let refused = output_failure(
cli()
.arg("repair")
.arg("--confirm")
.arg("--json")
.arg(&graph),
);
let refused_payload: Value = serde_json::from_slice(&refused.stdout).unwrap();
assert_eq!(refused_payload["manifest_version"], Value::Null);
let person = refused_payload["tables"]
.as_array()
.unwrap()
.iter()
.find(|table| table["table_key"] == "node:Person")
.unwrap();
assert_eq!(person["classification"], "suspicious");
assert_eq!(person["action"], "refused");
assert!(
String::from_utf8_lossy(&refused.stderr).contains("repair refused"),
"stderr should explain the non-zero exit; got: {}",
String::from_utf8_lossy(&refused.stderr)
);
assert_eq!(manifest_dataset_version(&graph), graph_manifest_before);
let forced = output_success(
cli()
.arg("repair")
.arg("--force")
.arg("--confirm")
.arg("--json")
.arg(&graph),
);
let forced_payload: Value = serde_json::from_slice(&forced.stdout).unwrap();
let forced_manifest = forced_payload["manifest_version"].as_u64().unwrap();
assert!(forced_manifest > graph_manifest_before);
let person = forced_payload["tables"]
.as_array()
.unwrap()
.iter()
.find(|table| table["table_key"] == "node:Person")
.unwrap();
assert_eq!(person["classification"], "suspicious");
assert_eq!(person["action"], "forced");
assert_eq!(person["manifest_version"], table_manifest_before);
assert_eq!(person["lance_head_version"], table_head_before);
assert_eq!(manifest_dataset_version(&graph), forced_manifest);
}
#[test]
fn schema_plan_json_reports_supported_additive_change() {
let temp = tempdir().unwrap();