mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
refactor(cli): split the test monolith into command-area suites
tests/cli.rs (4,548 lines, 112 tests) becomes five area files — cli_cluster (24), cli_cluster_e2e (10, the spawned-binary lifecycle compositions), cli_data (49), cli_schema_config (16), cli_queries (13) — with the file-local helpers joining the existing tests/support harness. Verbatim moves + visibility bumps; 161 crate tests green. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
parent
916015c416
commit
d5e75df272
7 changed files with 4521 additions and 4548 deletions
File diff suppressed because it is too large
Load diff
884
crates/omnigraph-cli/tests/cli_cluster.rs
Normal file
884
crates/omnigraph-cli/tests/cli_cluster.rs
Normal file
|
|
@ -0,0 +1,884 @@
|
|||
//! Cluster command surface: validate/plan/apply/approve/status/sync/force-unlock.
|
||||
//! Moved verbatim from tests/cli.rs in the modularization.
|
||||
|
||||
use std::fs;
|
||||
|
||||
use tempfile::tempdir;
|
||||
|
||||
mod support;
|
||||
|
||||
use support::*;
|
||||
|
||||
|
||||
#[test]
|
||||
fn cluster_validate_config_success() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("validate")
|
||||
.arg("--config")
|
||||
.arg(temp.path()),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
assert!(stdout.contains("cluster config valid"), "{stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_validate_json_is_stable() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("validate")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert!(json["resource_digests"]["graph.knowledge"].is_string());
|
||||
assert!(json["resource_digests"]["query.knowledge.find_person"].is_string());
|
||||
assert_eq!(json["dependencies"][0]["from"], "policy.base");
|
||||
assert_eq!(json["dependencies"][0]["to"], "graph.knowledge");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_plan_json_reads_inferred_local_state() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"
|
||||
{
|
||||
"version": 1,
|
||||
"applied_revision": {
|
||||
"config_digest": "old",
|
||||
"resources": {
|
||||
"graph.knowledge": { "digest": "old-graph" },
|
||||
"policy.old": { "digest": "old-policy" }
|
||||
}
|
||||
}
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("plan")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["state_observations"]["state_found"], true);
|
||||
assert!(
|
||||
json["changes"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|change| change["resource"] == "policy.old" && change["operation"] == "delete"),
|
||||
"plan should read state and delete stale resources: {json}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_status_json_reports_missing_state() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("status")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["state_observations"]["state_found"], false);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_missing"),
|
||||
"missing state should be a warning diagnostic: {json}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_status_json_reports_lock_metadata() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
write_cluster_lock(temp.path(), "held-lock", "refresh");
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("status")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["state_observations"]["locked"], true);
|
||||
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
|
||||
assert_eq!(json["state_observations"]["lock_operation"], "refresh");
|
||||
assert_eq!(json["state_observations"]["lock_pid"], 123);
|
||||
assert_eq!(
|
||||
json["state_observations"]["lock_created_at"],
|
||||
"1970-01-01T00:00:00Z"
|
||||
);
|
||||
assert!(json["state_observations"]["lock_age_seconds"].is_number());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_status_json_reports_extended_state() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"
|
||||
{
|
||||
"version": 1,
|
||||
"state_revision": 5,
|
||||
"applied_revision": {
|
||||
"config_digest": "applied",
|
||||
"resources": {
|
||||
"graph.knowledge": { "digest": "graph-digest" }
|
||||
}
|
||||
},
|
||||
"resource_statuses": {
|
||||
"graph.knowledge": { "status": "applied", "conditions": ["healthy"] }
|
||||
},
|
||||
"approval_records": {},
|
||||
"recovery_records": {},
|
||||
"observations": {}
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("status")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["state_observations"]["state_revision"], 5);
|
||||
assert!(
|
||||
json["state_observations"]["state_cas"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.starts_with("sha256:")
|
||||
);
|
||||
assert_eq!(json["resource_digests"]["graph.knowledge"], "graph-digest");
|
||||
assert_eq!(
|
||||
json["resource_statuses"]["graph.knowledge"]["status"],
|
||||
"applied"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_plan_json_includes_state_cas_revision_and_lock_observation() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"
|
||||
{
|
||||
"version": 1,
|
||||
"state_revision": 9,
|
||||
"applied_revision": {
|
||||
"config_digest": "old",
|
||||
"resources": {
|
||||
"graph.knowledge": { "digest": "old-graph" }
|
||||
}
|
||||
}
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("plan")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["state_observations"]["state_revision"], 9);
|
||||
assert!(
|
||||
json["state_observations"]["state_cas"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.starts_with("sha256:")
|
||||
);
|
||||
assert_eq!(json["state_observations"]["locked"], false);
|
||||
assert_eq!(json["state_observations"]["lock_acquired"], true);
|
||||
assert!(json["state_observations"]["acquired_lock_id"].is_string());
|
||||
assert!(!state_dir.join("lock.json").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_plan_locked_state_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
write_cluster_lock(temp.path(), "held-lock", "plan");
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("plan")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
);
|
||||
let json = parse_stdout_json(&output);
|
||||
assert_eq!(json["ok"], false);
|
||||
assert_eq!(json["state_observations"]["locked"], true);
|
||||
assert_eq!(json["state_observations"]["lock_acquired"], false);
|
||||
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
|
||||
assert_eq!(json["state_observations"]["lock_operation"], "plan");
|
||||
assert_eq!(json["state_observations"]["lock_pid"], 123);
|
||||
assert_eq!(
|
||||
json["state_observations"]["lock_created_at"],
|
||||
"1970-01-01T00:00:00Z"
|
||||
);
|
||||
assert!(json["state_observations"]["lock_age_seconds"].is_number());
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_lock_held"
|
||||
&& diagnostic["message"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.contains("force-unlock held-lock")),
|
||||
"locked state should produce a useful diagnostic: {json}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_force_unlock_json_removes_lock() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
write_cluster_lock(temp.path(), "held-lock", "plan");
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("force-unlock")
|
||||
.arg("held-lock")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["lock_removed"], true);
|
||||
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
|
||||
assert_eq!(json["state_observations"]["lock_operation"], "plan");
|
||||
assert!(!temp.path().join("__cluster/lock.json").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_force_unlock_wrong_id_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
write_cluster_lock(temp.path(), "held-lock", "plan");
|
||||
|
||||
let json = parse_stdout_json(&output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("force-unlock")
|
||||
.arg("other-lock")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], false);
|
||||
assert_eq!(json["lock_removed"], false);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_lock_id_mismatch")
|
||||
);
|
||||
assert!(temp.path().join("__cluster/lock.json").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_locked_plan_then_force_unlock_then_plan_succeeds() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
write_cluster_lock(temp.path(), "held-lock", "plan");
|
||||
|
||||
let locked = parse_stdout_json(&output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("plan")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(locked["ok"], false);
|
||||
assert_eq!(locked["state_observations"]["lock_id"], "held-lock");
|
||||
|
||||
let unlocked = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("force-unlock")
|
||||
.arg("held-lock")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(unlocked["lock_removed"], true);
|
||||
|
||||
let planned = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("plan")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(planned["ok"], true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_import_json_bootstraps_missing_state() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("import")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["operation"], "import");
|
||||
assert_eq!(json["state_observations"]["state_revision"], 1);
|
||||
assert!(
|
||||
json["state_observations"]["state_cas"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.starts_with("sha256:")
|
||||
);
|
||||
assert_eq!(json["state_observations"]["locked"], false);
|
||||
assert_eq!(json["state_observations"]["lock_acquired"], true);
|
||||
assert!(json["state_observations"]["acquired_lock_id"].is_string());
|
||||
assert!(json["observations"]["graph.knowledge"]["manifest_version"].is_number());
|
||||
assert_eq!(
|
||||
json["resource_statuses"]["graph.knowledge"]["status"],
|
||||
"applied"
|
||||
);
|
||||
assert!(temp.path().join("__cluster/state.json").exists());
|
||||
assert!(!temp.path().join("__cluster/lock.json").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_refresh_json_updates_revision_cas_and_removes_lock() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"
|
||||
{
|
||||
"version": 1,
|
||||
"state_revision": 2,
|
||||
"applied_revision": { "resources": {} }
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("refresh")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true);
|
||||
assert_eq!(json["operation"], "refresh");
|
||||
assert_eq!(json["state_observations"]["state_revision"], 3);
|
||||
assert!(
|
||||
json["state_observations"]["state_cas"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.starts_with("sha256:")
|
||||
);
|
||||
assert_eq!(json["state_observations"]["locked"], false);
|
||||
assert_eq!(json["state_observations"]["lock_acquired"], true);
|
||||
assert!(json["state_observations"]["acquired_lock_id"].is_string());
|
||||
assert!(!state_dir.join("lock.json").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_refresh_missing_state_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("refresh")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
);
|
||||
let json = parse_stdout_json(&output);
|
||||
assert_eq!(json["ok"], false);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_missing"),
|
||||
"missing state should produce a useful diagnostic: {json}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_import_existing_state_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"{"version":1,"applied_revision":{"resources":{}}}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("import")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
);
|
||||
let json = parse_stdout_json(&output);
|
||||
assert_eq!(json["ok"], false);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_already_exists"),
|
||||
"existing state should produce a useful diagnostic: {json}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_refresh_and_import_locked_state_exit_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
r#"{"version":1,"applied_revision":{"resources":{}}}"#,
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
state_dir.join("lock.json"),
|
||||
r#"{"version":1,"lock_id":"held-lock","operation":"refresh","created_at":"2026-06-08T00:00:00Z","pid":123}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let refresh = parse_stdout_json(&output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("refresh")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(refresh["state_observations"]["locked"], true);
|
||||
assert_eq!(refresh["state_observations"]["lock_id"], "held-lock");
|
||||
assert_eq!(refresh["state_observations"]["lock_acquired"], false);
|
||||
assert!(
|
||||
refresh["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_lock_held")
|
||||
);
|
||||
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let state_dir = temp.path().join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("lock.json"),
|
||||
r#"{"version":1,"lock_id":"held-lock","operation":"import","created_at":"2026-06-08T00:00:00Z","pid":123}"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let imported = parse_stdout_json(&output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("import")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(imported["state_observations"]["locked"], true);
|
||||
assert_eq!(imported["state_observations"]["lock_id"], "held-lock");
|
||||
assert_eq!(imported["state_observations"]["lock_acquired"], false);
|
||||
assert!(
|
||||
imported["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_lock_held")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_validate_invalid_config_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
fs::write(
|
||||
temp.path().join("cluster.yaml"),
|
||||
"version: 1\ngraphs: {}\npipelines: {}\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("validate")
|
||||
.arg("--config")
|
||||
.arg(temp.path()),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
assert!(stdout.contains("future_phase_field"), "{stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_apply_json_applies_query_and_policy() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
let validate = write_cluster_applyable_state(temp.path());
|
||||
|
||||
let json = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("apply")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(json["ok"], true, "{json}");
|
||||
assert_eq!(json["applied_count"], 2, "{json}");
|
||||
assert_eq!(json["converged"], true, "{json}");
|
||||
assert_eq!(json["state_written"], true, "{json}");
|
||||
assert_eq!(
|
||||
json["resource_statuses"]["query.knowledge.find_person"]["status"],
|
||||
"applied"
|
||||
);
|
||||
|
||||
let query_digest = validate["resource_digests"]["query.knowledge.find_person"]
|
||||
.as_str()
|
||||
.unwrap();
|
||||
let payload = temp
|
||||
.path()
|
||||
.join("__cluster/resources/query/knowledge/find_person")
|
||||
.join(format!("{query_digest}.gq"));
|
||||
assert!(payload.exists(), "missing payload {}", payload.display());
|
||||
|
||||
let state: serde_json::Value = serde_json::from_str(
|
||||
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(state["state_revision"], 2);
|
||||
assert_eq!(
|
||||
state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"],
|
||||
*query_digest
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_apply_missing_state_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("apply")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
);
|
||||
let json = parse_stdout_json(&output);
|
||||
assert_eq!(json["ok"], false);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_missing"),
|
||||
"{json}"
|
||||
);
|
||||
assert!(!temp.path().join("__cluster/resources").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_apply_locked_exits_nonzero() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
write_cluster_applyable_state(temp.path());
|
||||
write_cluster_lock(temp.path(), "held-lock", "plan");
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("apply")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
);
|
||||
let json = parse_stdout_json(&output);
|
||||
assert_eq!(json["ok"], false);
|
||||
assert!(
|
||||
json["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "state_lock_held"),
|
||||
"{json}"
|
||||
);
|
||||
assert!(temp.path().join("__cluster/lock.json").exists());
|
||||
assert!(!temp.path().join("__cluster/resources").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_apply_uses_cli_actor_from_local_config() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
fs::write(
|
||||
temp.path().join("omnigraph.yaml"),
|
||||
"cli:\n actor: act-local\n",
|
||||
)
|
||||
.unwrap();
|
||||
// Phase 1: import once (setup, not under test).
|
||||
let output = cli()
|
||||
.current_dir(temp.path())
|
||||
.arg("cluster")
|
||||
.arg("import")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.output()
|
||||
.unwrap();
|
||||
assert!(output.status.success(), "{output:?}");
|
||||
|
||||
// Phase 2: apply alone, capturing the echoed actor (idempotent re-runs).
|
||||
let apply = |extra: &[&str]| {
|
||||
let mut command = cli();
|
||||
command.current_dir(temp.path());
|
||||
for arg in extra {
|
||||
command.arg(arg);
|
||||
}
|
||||
let output = command
|
||||
.arg("cluster")
|
||||
.arg("apply")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json")
|
||||
.output()
|
||||
.unwrap();
|
||||
let json: serde_json::Value =
|
||||
serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap();
|
||||
json["actor"].clone()
|
||||
};
|
||||
assert_eq!(apply(&[]), "act-local", "cli.actor is the no-flag default");
|
||||
assert_eq!(apply(&["--as", "andrew"]), "andrew", "--as overrides cli.actor");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_approve_uses_cli_actor_fallback() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
fs::write(
|
||||
temp.path().join("omnigraph.yaml"),
|
||||
"cli:\n actor: act-local\n",
|
||||
)
|
||||
.unwrap();
|
||||
// Converge, then remove the graph so a gated delete is pending.
|
||||
for command in ["import", "apply"] {
|
||||
let output = cli()
|
||||
.current_dir(temp.path())
|
||||
.arg("cluster")
|
||||
.arg(command)
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.output()
|
||||
.unwrap();
|
||||
assert!(output.status.success(), "cluster {command} failed");
|
||||
}
|
||||
fs::write(temp.path().join("cluster.yaml"), "version: 1\ngraphs: {}\n").unwrap();
|
||||
|
||||
let output = cli()
|
||||
.current_dir(temp.path())
|
||||
.arg("cluster")
|
||||
.arg("approve")
|
||||
.arg("graph.knowledge")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json")
|
||||
.output()
|
||||
.unwrap();
|
||||
assert!(output.status.success(), "{output:?}");
|
||||
let json: serde_json::Value =
|
||||
serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap();
|
||||
assert_eq!(json["approved_by"], "act-local");
|
||||
|
||||
// With neither flag nor config: refused with the actionable message.
|
||||
let bare = tempdir().unwrap();
|
||||
write_cluster_config_fixture(bare.path());
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.current_dir(bare.path())
|
||||
.arg("cluster")
|
||||
.arg("approve")
|
||||
.arg("graph.knowledge")
|
||||
.arg("--config")
|
||||
.arg(bare.path()),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("--as"), "{stderr}");
|
||||
assert!(stderr.contains("cli.actor"), "{stderr}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_commands_ignore_malformed_local_config() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
fs::write(temp.path().join("omnigraph.yaml"), "{{{{ not yaml").unwrap();
|
||||
|
||||
for command in ["validate", "plan", "status"] {
|
||||
let output = cli()
|
||||
.current_dir(temp.path())
|
||||
.arg("cluster")
|
||||
.arg(command)
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json")
|
||||
.output()
|
||||
.unwrap();
|
||||
assert!(
|
||||
output.status.success() || command == "plan", // plan warns state-missing pre-import; still must not config-error
|
||||
"cluster {command} affected by malformed omnigraph.yaml: {output:?}"
|
||||
);
|
||||
assert!(
|
||||
!String::from_utf8_lossy(&output.stderr).contains("omnigraph.yaml"),
|
||||
"cluster {command} touched omnigraph.yaml"
|
||||
);
|
||||
}
|
||||
// import + apply with an explicit --as: the config is never loaded.
|
||||
for (command, args) in [("import", vec![]), ("apply", vec!["--as", "andrew"])] {
|
||||
let mut invocation = cli();
|
||||
invocation.current_dir(temp.path());
|
||||
for arg in &args {
|
||||
invocation.arg(arg);
|
||||
}
|
||||
let output = invocation
|
||||
.arg("cluster")
|
||||
.arg(command)
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.output()
|
||||
.unwrap();
|
||||
assert!(
|
||||
output.status.success(),
|
||||
"cluster {command} affected by malformed omnigraph.yaml: {}",
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
);
|
||||
}
|
||||
// Only the no-flag actor lookup is allowed to fail, and loudly.
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.current_dir(temp.path())
|
||||
.arg("cluster")
|
||||
.arg("apply")
|
||||
.arg("--config")
|
||||
.arg(temp.path()),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(
|
||||
stderr.contains("omnigraph.yaml") && stderr.contains("--as"),
|
||||
"the actor-default config read must fail loudly and actionably: {stderr}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_commands_ignore_conflicting_local_config() {
|
||||
let baseline = tempdir().unwrap();
|
||||
write_cluster_config_fixture(baseline.path());
|
||||
let with_config = tempdir().unwrap();
|
||||
write_cluster_config_fixture(with_config.path());
|
||||
fs::write(
|
||||
with_config.path().join("omnigraph.yaml"),
|
||||
r#"
|
||||
server:
|
||||
bind: 0.0.0.0:9999
|
||||
graphs:
|
||||
phantom:
|
||||
uri: ./phantom.omni
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let validate = |dir: &std::path::Path| {
|
||||
let output = cli()
|
||||
.current_dir(dir)
|
||||
.arg("cluster")
|
||||
.arg("validate")
|
||||
.arg("--config")
|
||||
.arg(dir)
|
||||
.arg("--json")
|
||||
.output()
|
||||
.unwrap();
|
||||
assert!(output.status.success(), "{output:?}");
|
||||
serde_json::from_str::<serde_json::Value>(String::from_utf8_lossy(&output.stdout).trim())
|
||||
.unwrap()
|
||||
};
|
||||
let (a, b) = (validate(baseline.path()), validate(with_config.path()));
|
||||
// Compare the path-free invariants (paths embed each tempdir).
|
||||
for key in ["ok", "diagnostics", "resource_digests", "dependencies"] {
|
||||
assert_eq!(a[key], b[key], "conflicting omnigraph.yaml leaked into cluster validate ({key})");
|
||||
}
|
||||
let leaked = b.to_string();
|
||||
assert!(!leaked.contains("phantom") && !leaked.contains("9999"), "{leaked}");
|
||||
}
|
||||
621
crates/omnigraph-cli/tests/cli_cluster_e2e.rs
Normal file
621
crates/omnigraph-cli/tests/cli_cluster_e2e.rs
Normal file
|
|
@ -0,0 +1,621 @@
|
|||
//! Cluster lifecycle compositions over the spawned binary (recovery, drift, convergence).
|
||||
//! Moved verbatim from tests/cli.rs in the modularization.
|
||||
|
||||
use std::fs;
|
||||
|
||||
use tempfile::tempdir;
|
||||
|
||||
mod support;
|
||||
|
||||
use support::*;
|
||||
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_lifecycle_import_apply_status_refresh_converges() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
assert_eq!(import["state_observations"]["state_revision"], 1);
|
||||
|
||||
let plan = cluster_json(temp.path(), "plan");
|
||||
let changes = plan["changes"].as_array().unwrap();
|
||||
assert_eq!(changes.len(), 3, "{plan}");
|
||||
let disposition_of = |resource: &str| {
|
||||
changes
|
||||
.iter()
|
||||
.find(|change| change["resource"] == resource)
|
||||
.unwrap_or_else(|| panic!("missing change for {resource}: {plan}"))["disposition"]
|
||||
.clone()
|
||||
};
|
||||
assert_eq!(disposition_of("graph.knowledge"), "derived");
|
||||
assert_eq!(disposition_of("query.knowledge.find_person"), "applied");
|
||||
assert_eq!(disposition_of("policy.base"), "applied");
|
||||
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["ok"], true, "{apply}");
|
||||
assert_eq!(apply["applied_count"], 2, "{apply}");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
|
||||
let status = cluster_json(temp.path(), "status");
|
||||
assert_eq!(
|
||||
status["resource_statuses"]["query.knowledge.find_person"]["status"],
|
||||
"applied"
|
||||
);
|
||||
assert_eq!(status["resource_statuses"]["policy.base"]["status"], "applied");
|
||||
assert!(
|
||||
status["state_observations"]["applied_config_digest"].is_string(),
|
||||
"converged apply must record the applied config digest: {status}"
|
||||
);
|
||||
|
||||
// Refresh re-observes the live graph; it must not undo apply's work.
|
||||
let refresh = cluster_json(temp.path(), "refresh");
|
||||
assert_eq!(refresh["ok"], true, "{refresh}");
|
||||
let replan = cluster_json(temp.path(), "plan");
|
||||
assert!(
|
||||
replan["changes"].as_array().unwrap().is_empty(),
|
||||
"refresh after a converged apply must not re-open the plan: {replan}"
|
||||
);
|
||||
|
||||
// A query edit round-trips: plan update -> apply -> converged again.
|
||||
fs::write(
|
||||
temp.path().join("people.gq"),
|
||||
r#"
|
||||
query find_person($name: String) {
|
||||
match { $p: Person { name: $name } }
|
||||
return { $p.name }
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let apply_edit = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply_edit["applied_count"], 1, "{apply_edit}");
|
||||
assert_eq!(apply_edit["converged"], true, "{apply_edit}");
|
||||
|
||||
let final_apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(final_apply["state_written"], false, "{final_apply}");
|
||||
assert!(final_apply["changes"].as_array().unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_schema_change_applied_by_cluster() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
|
||||
// Additive schema change: Stage 4B applies it from the cluster — no
|
||||
// manual schema apply, no refresh round-trip.
|
||||
fs::write(
|
||||
temp.path().join("people.pg"),
|
||||
r#"
|
||||
node Person {
|
||||
name: String @key
|
||||
age: I32?
|
||||
bio: String?
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Plan previews the real migration steps (RFC-004 §D7).
|
||||
let plan = cluster_json(temp.path(), "plan");
|
||||
let schema_change = change_for(&plan, "schema.knowledge");
|
||||
assert_eq!(schema_change["disposition"], "applied", "{plan}");
|
||||
let migration = &schema_change["migration"];
|
||||
assert_eq!(migration["supported"], true, "{plan}");
|
||||
assert!(
|
||||
migration["steps"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|step| step["kind"] == "add_property"),
|
||||
"{plan}"
|
||||
);
|
||||
|
||||
let evolve = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(evolve["ok"], true, "{evolve}");
|
||||
assert_eq!(evolve["converged"], true, "{evolve}");
|
||||
assert_eq!(change_for(&evolve, "schema.knowledge")["disposition"], "applied");
|
||||
|
||||
// The live graph carries the new schema; the plan is empty.
|
||||
let schema_show = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("show")
|
||||
.arg(temp.path().join("graphs/knowledge.omni")),
|
||||
);
|
||||
assert!(stdout_string(&schema_show).contains("bio"), "live schema updated");
|
||||
let replan = cluster_json(temp.path(), "plan");
|
||||
assert!(
|
||||
replan["changes"].as_array().unwrap().is_empty(),
|
||||
"one cluster apply converges a schema change: {replan}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_force_unlock_unblocks_apply() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
write_cluster_applyable_state(temp.path());
|
||||
write_cluster_lock(temp.path(), "stuck-lock", "apply");
|
||||
|
||||
let refused = parse_stdout_json(&output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("apply")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(refused["ok"], false);
|
||||
|
||||
let unlocked = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("force-unlock")
|
||||
.arg("stuck-lock")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(unlocked["lock_removed"], true, "{unlocked}");
|
||||
|
||||
let retried = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(retried["ok"], true, "{retried}");
|
||||
assert_eq!(retried["converged"], true, "{retried}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_lost_state_reimport_recovers_catalog() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
|
||||
let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let blob = temp
|
||||
.path()
|
||||
.join("__cluster/resources/query/knowledge/find_person")
|
||||
.join(format!("{query_digest}.gq"));
|
||||
let blob_content = fs::read_to_string(&blob).unwrap();
|
||||
|
||||
// Disaster: the state ledger is lost.
|
||||
fs::remove_file(temp.path().join("__cluster/state.json")).unwrap();
|
||||
|
||||
let reimport = cluster_json(temp.path(), "import");
|
||||
assert_eq!(reimport["ok"], true, "{reimport}");
|
||||
assert_eq!(reimport["state_observations"]["state_revision"], 1);
|
||||
// Import observes graph/schema only; query/policy digests are not invented.
|
||||
assert!(
|
||||
reimport["resource_digests"]
|
||||
.get("query.knowledge.find_person")
|
||||
.is_none(),
|
||||
"{reimport}"
|
||||
);
|
||||
|
||||
let plan = cluster_json(temp.path(), "plan");
|
||||
assert_eq!(
|
||||
change_for(&plan, "query.knowledge.find_person")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
assert_eq!(change_for(&plan, "policy.base")["disposition"], "applied");
|
||||
|
||||
let reapply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(reapply["ok"], true, "{reapply}");
|
||||
assert_eq!(reapply["converged"], true, "{reapply}");
|
||||
assert!(
|
||||
reapply["state_observations"]["applied_config_digest"].is_string(),
|
||||
"{reapply}"
|
||||
);
|
||||
// The catalog blob was reused, not rewritten with different content.
|
||||
assert_eq!(fs::read_to_string(&blob).unwrap(), blob_content);
|
||||
|
||||
let replan = cluster_json(temp.path(), "plan");
|
||||
assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_out_of_band_schema_drift_then_apply_converges_it() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
|
||||
// Out-of-band: the live graph evolves, cluster.yaml stays put.
|
||||
fs::write(
|
||||
temp.path().join("people_v2.pg"),
|
||||
r#"
|
||||
node Person {
|
||||
name: String @key
|
||||
age: I32?
|
||||
bio: String?
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg(temp.path().join("graphs/knowledge.omni"))
|
||||
.arg("--schema")
|
||||
.arg(temp.path().join("people_v2.pg"))
|
||||
.arg("--json"),
|
||||
);
|
||||
|
||||
// Drift is visible...
|
||||
let refresh = cluster_json(temp.path(), "refresh");
|
||||
assert_eq!(
|
||||
refresh["resource_statuses"]["schema.knowledge"]["status"],
|
||||
"drifted"
|
||||
);
|
||||
// ...the plan proposes converging back to desired, with a migration
|
||||
// preview (a soft drop of the out-of-band field)...
|
||||
let plan = cluster_json(temp.path(), "plan");
|
||||
let schema_change = change_for(&plan, "schema.knowledge");
|
||||
assert_eq!(schema_change["disposition"], "applied", "{plan}");
|
||||
assert!(
|
||||
schema_change["migration"]["steps"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|step| step["kind"] == "drop_property" && step["mode"] == "soft"),
|
||||
"{plan}"
|
||||
);
|
||||
// ...and apply converges the live schema back (axiom 8: drift correction
|
||||
// is gated like any change; a soft migration is the recoverable tier).
|
||||
let converge = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(converge["ok"], true, "{converge}");
|
||||
assert_eq!(converge["converged"], true, "{converge}");
|
||||
let schema_show = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("show")
|
||||
.arg(temp.path().join("graphs/knowledge.omni")),
|
||||
);
|
||||
assert!(
|
||||
!stdout_string(&schema_show).contains("bio"),
|
||||
"out-of-band field soft-dropped back to desired"
|
||||
);
|
||||
let replan = cluster_json(temp.path(), "plan");
|
||||
assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_graph_root_destruction_drifts_then_apply_recreates_empty_graph() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
fs::remove_dir_all(temp.path().join("graphs/knowledge.omni")).unwrap();
|
||||
|
||||
// Missing root is drift, not an error.
|
||||
let refresh = cluster_json(temp.path(), "refresh");
|
||||
assert_eq!(refresh["ok"], true, "{refresh}");
|
||||
assert_eq!(
|
||||
refresh["resource_statuses"]["graph.knowledge"]["status"],
|
||||
"drifted"
|
||||
);
|
||||
assert!(
|
||||
refresh["resource_statuses"]["graph.knowledge"]["conditions"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|condition| condition == "graph_missing"),
|
||||
"{refresh}"
|
||||
);
|
||||
// Graph/schema digests removed; query/policy digests preserved.
|
||||
assert!(refresh["resource_digests"].get("graph.knowledge").is_none());
|
||||
assert!(refresh["resource_digests"].get("schema.knowledge").is_none());
|
||||
assert!(
|
||||
refresh["resource_digests"]
|
||||
.get("query.knowledge.find_person")
|
||||
.is_some(),
|
||||
"{refresh}"
|
||||
);
|
||||
|
||||
let plan = cluster_json(temp.path(), "plan");
|
||||
assert_eq!(change_for(&plan, "graph.knowledge")["operation"], "create");
|
||||
// Stage 4A: the re-create is executable and the plan says so — nothing
|
||||
// hidden about converging a destroyed root back to an EMPTY graph (the
|
||||
// data was already lost; this is declarative convergence, RFC-004 §D1).
|
||||
assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "applied");
|
||||
assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "applied");
|
||||
// Converged-then-destroyed: query/policy are already in state at the
|
||||
// desired digests, so they are not changes at all.
|
||||
assert_eq!(plan["changes"].as_array().unwrap().len(), 2, "{plan}");
|
||||
|
||||
let recreate = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(recreate["ok"], true, "{recreate}");
|
||||
assert_eq!(recreate["converged"], true, "{recreate}");
|
||||
// The empty graph is back on disk; catalog state survived throughout.
|
||||
assert!(temp.path().join("graphs/knowledge.omni").exists());
|
||||
let state: serde_json::Value = serde_json::from_str(
|
||||
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"],
|
||||
query_digest
|
||||
);
|
||||
assert!(
|
||||
temp.path()
|
||||
.join("__cluster/resources/query/knowledge/find_person")
|
||||
.join(format!("{query_digest}.gq"))
|
||||
.exists()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_multi_graph_mixed_dispositions_then_approve_and_converge() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_multi_graph_cluster_fixture(temp.path());
|
||||
// No manual init: Stage 4A creates both graphs.
|
||||
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["ok"], true, "{apply}");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied");
|
||||
assert_eq!(
|
||||
change_for(&apply, "graph.engineering")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&apply, "query.engineering.find_service")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
// The graph-spanning and cluster-scoped policies ride the same run.
|
||||
assert_eq!(change_for(&apply, "policy.shared")["disposition"], "applied");
|
||||
assert_eq!(
|
||||
change_for(&apply, "policy.cluster_wide")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
assert!(temp.path().join("graphs/knowledge.omni").exists());
|
||||
assert!(temp.path().join("graphs/engineering.omni").exists());
|
||||
|
||||
// Mixed run: a graph REMOVAL (4C territory — deferred) gates its query
|
||||
// delete (blocked), while a knowledge query update is independent
|
||||
// (applied) and re-derives its composite. All four dispositions at once.
|
||||
fs::write(
|
||||
temp.path().join("cluster.yaml"),
|
||||
r#"
|
||||
version: 1
|
||||
metadata:
|
||||
name: company-brain
|
||||
state:
|
||||
backend: cluster
|
||||
lock: true
|
||||
graphs:
|
||||
knowledge:
|
||||
schema: ./people.pg
|
||||
queries:
|
||||
find_person:
|
||||
file: ./people.gq
|
||||
policies:
|
||||
shared:
|
||||
file: ./shared.policy.yaml
|
||||
applies_to: [knowledge]
|
||||
cluster_wide:
|
||||
file: ./cluster_wide.policy.yaml
|
||||
applies_to: [cluster]
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
temp.path().join("people.gq"),
|
||||
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let mixed = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(mixed["ok"], true, "{mixed}");
|
||||
assert_eq!(mixed["converged"], false, "{mixed}");
|
||||
// Stage 4C: deletes are gated on a digest-bound approval, one gate per
|
||||
// subtree (the graph-level approval carries schema + queries).
|
||||
assert_eq!(
|
||||
change_for(&mixed, "graph.engineering")["disposition"],
|
||||
"blocked"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "graph.engineering")["reason"],
|
||||
"approval_required"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "schema.engineering")["reason"],
|
||||
"approval_required"
|
||||
);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "query.engineering.find_service")["reason"],
|
||||
"approval_required"
|
||||
);
|
||||
let gate_plan = cluster_json(temp.path(), "plan");
|
||||
let gates = gate_plan["approvals_required"].as_array().unwrap();
|
||||
assert_eq!(gates.len(), 1, "{gate_plan}");
|
||||
assert_eq!(gates[0]["resource"], "graph.engineering");
|
||||
assert_eq!(gates[0]["satisfied"], false);
|
||||
assert_eq!(
|
||||
change_for(&mixed, "query.knowledge.find_person")["disposition"],
|
||||
"applied"
|
||||
);
|
||||
// 5A: policy.shared's applies_to narrowed with an unchanged file digest
|
||||
// — now a first-class binding change, applied in the same run.
|
||||
assert_eq!(change_for(&mixed, "policy.shared")["binding_change"], true);
|
||||
assert_eq!(change_for(&mixed, "policy.shared")["disposition"], "applied");
|
||||
assert_eq!(
|
||||
change_for(&mixed, "graph.knowledge")["disposition"],
|
||||
"derived"
|
||||
);
|
||||
// Deterministic ordering: changes sorted by resource address.
|
||||
let order: Vec<&str> = mixed["changes"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|change| change["resource"].as_str().unwrap())
|
||||
.collect();
|
||||
let mut sorted = order.clone();
|
||||
sorted.sort_unstable();
|
||||
assert_eq!(order, sorted, "{mixed}");
|
||||
// The conclusion: an apply without approval stays blocked; the approved
|
||||
// delete converges the cluster, tombstoning the removed graph.
|
||||
let still_blocked = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(still_blocked["converged"], false, "{still_blocked}");
|
||||
|
||||
let approve = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("--as")
|
||||
.arg("andrew")
|
||||
.arg("cluster")
|
||||
.arg("approve")
|
||||
.arg("graph.engineering")
|
||||
.arg("--config")
|
||||
.arg(temp.path())
|
||||
.arg("--json"),
|
||||
));
|
||||
assert_eq!(approve["ok"], true, "{approve}");
|
||||
assert_eq!(approve["approved_by"], "andrew");
|
||||
|
||||
let converge = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(converge["ok"], true, "{converge}");
|
||||
assert_eq!(converge["converged"], true, "{converge}");
|
||||
assert!(!temp.path().join("graphs/engineering.omni").exists());
|
||||
|
||||
let status = cluster_json(temp.path(), "status");
|
||||
assert_eq!(status["observations"]["graph.engineering"]["kind"], "tombstone");
|
||||
let final_plan = cluster_json(temp.path(), "plan");
|
||||
assert!(
|
||||
final_plan["changes"].as_array().unwrap().is_empty(),
|
||||
"{final_plan}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_approve_requires_actor() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("approve")
|
||||
.arg("graph.knowledge")
|
||||
.arg("--config")
|
||||
.arg(temp.path()),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("--as"), "{stderr}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_declared_graph_created_by_apply() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["ok"], true, "{apply}");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied");
|
||||
assert!(temp.path().join("graphs/knowledge.omni").exists());
|
||||
|
||||
// The created graph is a real graph: the per-graph CLI can open it.
|
||||
let snapshot = output_success(
|
||||
cli()
|
||||
.arg("snapshot")
|
||||
.arg(temp.path().join("graphs/knowledge.omni")),
|
||||
);
|
||||
assert!(!stdout_string(&snapshot).is_empty());
|
||||
|
||||
let plan = cluster_json(temp.path(), "plan");
|
||||
assert!(plan["changes"].as_array().unwrap().is_empty(), "{plan}");
|
||||
let status = cluster_json(temp.path(), "status");
|
||||
assert_eq!(
|
||||
status["resource_statuses"]["graph.knowledge"]["status"],
|
||||
"applied"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_e2e_payload_drift_self_heals() {
|
||||
let temp = tempdir().unwrap();
|
||||
write_cluster_config_fixture(temp.path());
|
||||
init_cluster_derived_graph(temp.path());
|
||||
let import = cluster_json(temp.path(), "import");
|
||||
assert_eq!(import["ok"], true, "{import}");
|
||||
let apply = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(apply["converged"], true, "{apply}");
|
||||
|
||||
let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let blob = temp
|
||||
.path()
|
||||
.join("__cluster/resources/query/knowledge/find_person")
|
||||
.join(format!("{query_digest}.gq"));
|
||||
fs::remove_file(&blob).unwrap();
|
||||
|
||||
let status = cluster_json(temp.path(), "status");
|
||||
assert_eq!(status["ok"], true, "{status}");
|
||||
assert!(
|
||||
status["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic["code"] == "catalog_payload_missing"),
|
||||
"{status}"
|
||||
);
|
||||
|
||||
let refresh = cluster_json(temp.path(), "refresh");
|
||||
assert_eq!(refresh["ok"], true, "{refresh}");
|
||||
assert_eq!(
|
||||
refresh["resource_statuses"]["query.knowledge.find_person"]["status"],
|
||||
"drifted"
|
||||
);
|
||||
|
||||
let heal = cluster_json(temp.path(), "apply");
|
||||
assert_eq!(heal["ok"], true, "{heal}");
|
||||
assert_eq!(heal["converged"], true, "{heal}");
|
||||
assert!(blob.exists(), "blob republished");
|
||||
|
||||
let clean = cluster_json(temp.path(), "status");
|
||||
assert!(
|
||||
!clean["diagnostics"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|diagnostic| {
|
||||
diagnostic["code"]
|
||||
.as_str()
|
||||
.is_some_and(|code| code.starts_with("catalog_payload"))
|
||||
}),
|
||||
"{clean}"
|
||||
);
|
||||
}
|
||||
1631
crates/omnigraph-cli/tests/cli_data.rs
Normal file
1631
crates/omnigraph-cli/tests/cli_data.rs
Normal file
File diff suppressed because it is too large
Load diff
535
crates/omnigraph-cli/tests/cli_queries.rs
Normal file
535
crates/omnigraph-cli/tests/cli_queries.rs
Normal file
|
|
@ -0,0 +1,535 @@
|
|||
//! Stored-query commands and alias resolution.
|
||||
//! Moved verbatim from tests/cli.rs in the modularization.
|
||||
|
||||
|
||||
use serde_json::Value;
|
||||
use tempfile::tempdir;
|
||||
|
||||
mod support;
|
||||
|
||||
use support::*;
|
||||
|
||||
|
||||
#[test]
|
||||
fn query_check_alias_matches_lint_output() {
|
||||
let temp = tempdir().unwrap();
|
||||
let schema_path = temp.path().join("schema.pg");
|
||||
let query_path = temp.path().join("queries.gq");
|
||||
write_file(
|
||||
&schema_path,
|
||||
r#"
|
||||
node Person {
|
||||
name: String
|
||||
}
|
||||
"#,
|
||||
);
|
||||
write_query_file(
|
||||
&query_path,
|
||||
r#"
|
||||
query list_people() {
|
||||
match { $p: Person }
|
||||
return { $p.name }
|
||||
}
|
||||
"#,
|
||||
);
|
||||
|
||||
let lint_output = output_success(
|
||||
cli()
|
||||
.arg("query")
|
||||
.arg("lint")
|
||||
.arg("--query")
|
||||
.arg(&query_path)
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json"),
|
||||
);
|
||||
let check_output = output_success(
|
||||
cli()
|
||||
.arg("query")
|
||||
.arg("check")
|
||||
.arg("--query")
|
||||
.arg(&query_path)
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json"),
|
||||
);
|
||||
|
||||
assert_eq!(stdout_string(&lint_output), stdout_string(&check_output));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_alias_from_yaml_config_runs_with_kv_output() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let config = temp.path().join("omnigraph.yaml");
|
||||
let query = temp.path().join("aliases.gq");
|
||||
init_graph(&graph);
|
||||
load_fixture(&graph);
|
||||
write_query_file(
|
||||
&query,
|
||||
&std::fs::read_to_string(fixture("test.gq")).unwrap(),
|
||||
);
|
||||
write_config(
|
||||
&config,
|
||||
&format!(
|
||||
"{}aliases:\n owner:\n command: read\n query: aliases.gq\n name: get_person\n args: [name]\n format: kv\n",
|
||||
local_yaml_config(&graph)
|
||||
),
|
||||
);
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("read")
|
||||
.arg("--config")
|
||||
.arg(&config)
|
||||
.arg("--alias")
|
||||
.arg("owner")
|
||||
.arg("Alice"),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
|
||||
assert!(stdout.contains("row 1"));
|
||||
assert!(stdout.contains("p.name: Alice"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_alias_uses_alias_target_without_cli_default_and_accepts_url_like_arg() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let config = temp.path().join("omnigraph.yaml");
|
||||
let query = temp.path().join("aliases.gq");
|
||||
let data = temp.path().join("url-like.jsonl");
|
||||
init_graph(&graph);
|
||||
write_jsonl(
|
||||
&data,
|
||||
r#"{"type":"Person","data":{"name":"https://example.com","age":30}}"#,
|
||||
);
|
||||
output_success(
|
||||
cli()
|
||||
.arg("load")
|
||||
.arg("--mode")
|
||||
.arg("overwrite")
|
||||
.arg("--data")
|
||||
.arg(&data)
|
||||
.arg(&graph),
|
||||
);
|
||||
write_query_file(
|
||||
&query,
|
||||
&std::fs::read_to_string(fixture("test.gq")).unwrap(),
|
||||
);
|
||||
write_config(
|
||||
&config,
|
||||
&format!(
|
||||
"graphs:\n local:\n uri: '{}'\nquery:\n roots:\n - .\npolicy: {{}}\naliases:\n owner:\n command: read\n query: aliases.gq\n name: get_person\n args: [name]\n graph: local\n format: kv\n",
|
||||
graph.to_string_lossy()
|
||||
),
|
||||
);
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("read")
|
||||
.arg("--config")
|
||||
.arg(&config)
|
||||
.arg("--alias")
|
||||
.arg("owner")
|
||||
.arg("https://example.com"),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
|
||||
assert!(stdout.contains("row 1"));
|
||||
assert!(stdout.contains("p.name: https://example.com"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn change_alias_from_yaml_config_persists_changes() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let config = temp.path().join("omnigraph.yaml");
|
||||
let query = temp.path().join("mutations.gq");
|
||||
init_graph(&graph);
|
||||
load_fixture(&graph);
|
||||
write_query_file(
|
||||
&query,
|
||||
r#"
|
||||
query insert_person($name: String, $age: I32) {
|
||||
insert Person { name: $name, age: $age }
|
||||
}
|
||||
"#,
|
||||
);
|
||||
write_config(
|
||||
&config,
|
||||
&format!(
|
||||
"{}aliases:\n add_person:\n command: change\n query: mutations.gq\n name: insert_person\n args: [name, age]\n",
|
||||
local_yaml_config(&graph)
|
||||
),
|
||||
);
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("change")
|
||||
.arg("--config")
|
||||
.arg(&config)
|
||||
.arg("--alias")
|
||||
.arg("add_person")
|
||||
.arg("Eve")
|
||||
.arg("29")
|
||||
.arg("--json"),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
assert_eq!(payload["affected_nodes"], 1);
|
||||
|
||||
let verify = output_success(
|
||||
cli()
|
||||
.arg("read")
|
||||
.arg(&graph)
|
||||
.arg("--query")
|
||||
.arg(fixture("test.gq"))
|
||||
.arg("--name")
|
||||
.arg("get_person")
|
||||
.arg("--params")
|
||||
.arg(r#"{"name":"Eve"}"#)
|
||||
.arg("--json"),
|
||||
);
|
||||
let verify_payload: Value = serde_json::from_slice(&verify.stdout).unwrap();
|
||||
assert_eq!(verify_payload["row_count"], 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queries_validate_exits_zero_on_clean_registry() {
|
||||
let graph = SystemGraph::loaded();
|
||||
graph.write_query(
|
||||
"find_person.gq",
|
||||
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
|
||||
);
|
||||
let config = graph.write_config(
|
||||
"omnigraph.yaml",
|
||||
&queries_test_config(
|
||||
&graph.path().to_string_lossy(),
|
||||
"find_person",
|
||||
"find_person.gq",
|
||||
),
|
||||
);
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("queries")
|
||||
.arg("validate")
|
||||
.arg("--config")
|
||||
.arg(&config),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
assert!(stdout.contains("OK"), "stdout:\n{stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queries_validate_exits_nonzero_on_type_broken_query() {
|
||||
let graph = SystemGraph::loaded();
|
||||
// `Widget` is not in the fixture schema.
|
||||
graph.write_query(
|
||||
"ghost.gq",
|
||||
"query ghost() { match { $w: Widget } return { $w.name } }",
|
||||
);
|
||||
let config = graph.write_config(
|
||||
"omnigraph.yaml",
|
||||
&queries_test_config(&graph.path().to_string_lossy(), "ghost", "ghost.gq"),
|
||||
);
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("queries")
|
||||
.arg("validate")
|
||||
.arg("--config")
|
||||
.arg(&config),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
assert!(
|
||||
stdout.contains("ghost"),
|
||||
"validation should name the broken query; stdout:\n{stdout}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queries_list_prints_registered_query() {
|
||||
let graph = SystemGraph::loaded();
|
||||
graph.write_query(
|
||||
"find_person.gq",
|
||||
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
|
||||
);
|
||||
// Exposed with an explicit tool name so the list shows the MCP suffix.
|
||||
let config = graph.write_config(
|
||||
"omnigraph.yaml",
|
||||
&format!(
|
||||
concat!(
|
||||
"graphs:\n",
|
||||
" local:\n",
|
||||
" uri: '{}'\n",
|
||||
" queries:\n",
|
||||
" find_person:\n",
|
||||
" file: ./find_person.gq\n",
|
||||
" mcp: {{ expose: true, tool_name: lookup_person }}\n",
|
||||
"cli:\n",
|
||||
" graph: local\n",
|
||||
"policy: {{}}\n",
|
||||
),
|
||||
graph.path().to_string_lossy().replace('\'', "''")
|
||||
),
|
||||
);
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("queries")
|
||||
.arg("list")
|
||||
.arg("--config")
|
||||
.arg(&config),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
assert!(stdout.contains("find_person"), "stdout:\n{stdout}");
|
||||
assert!(
|
||||
stdout.contains("$name: String"),
|
||||
"list should show typed params; stdout:\n{stdout}"
|
||||
);
|
||||
assert!(
|
||||
stdout.contains("[mcp: lookup_person]"),
|
||||
"list should show the MCP tool name for exposed queries; stdout:\n{stdout}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queries_list_requires_graph_selection_for_per_graph_only_registries() {
|
||||
let graph = SystemGraph::loaded();
|
||||
graph.write_query(
|
||||
"find_person.gq",
|
||||
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
|
||||
);
|
||||
let config = graph.write_config(
|
||||
"omnigraph.yaml",
|
||||
&format!(
|
||||
concat!(
|
||||
"graphs:\n",
|
||||
" local:\n",
|
||||
" uri: '{}'\n",
|
||||
" queries:\n",
|
||||
" find_person:\n",
|
||||
" file: ./find_person.gq\n",
|
||||
"policy: {{}}\n",
|
||||
),
|
||||
graph.path().to_string_lossy().replace('\'', "''")
|
||||
),
|
||||
);
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("queries")
|
||||
.arg("list")
|
||||
.arg("--config")
|
||||
.arg(&config),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(
|
||||
stderr.contains("local") && stderr.contains("--target local"),
|
||||
"error must name the graph and give a concrete selection hint; stderr:\n{stderr}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queries_list_without_graph_selection_lists_top_level_registry() {
|
||||
let graph = SystemGraph::loaded();
|
||||
graph.write_query(
|
||||
"top_find.gq",
|
||||
"query top_find($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
|
||||
);
|
||||
let config = graph.write_config(
|
||||
"omnigraph.yaml",
|
||||
concat!(
|
||||
"queries:\n",
|
||||
" top_find:\n",
|
||||
" file: ./top_find.gq\n",
|
||||
"policy: {}\n",
|
||||
),
|
||||
);
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("queries")
|
||||
.arg("list")
|
||||
.arg("--config")
|
||||
.arg(&config),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
assert!(stdout.contains("top_find"), "stdout:\n{stdout}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queries_list_unknown_target_errors() {
|
||||
// `queries list` opens no graph URI, so unknown-graph validation can't ride
|
||||
// along on URI resolution the way it does for every other command. An
|
||||
// unknown `--target` must still error (naming the graph) instead of
|
||||
// silently falling back to the top-level registry and showing the wrong
|
||||
// (or empty) catalog.
|
||||
let graph = SystemGraph::loaded();
|
||||
graph.write_query(
|
||||
"find_person.gq",
|
||||
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
|
||||
);
|
||||
let config = graph.write_config(
|
||||
"omnigraph.yaml",
|
||||
&queries_test_config(
|
||||
&graph.path().to_string_lossy(),
|
||||
"find_person",
|
||||
"find_person.gq",
|
||||
),
|
||||
);
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("queries")
|
||||
.arg("list")
|
||||
.arg("--target")
|
||||
.arg("nonexistent")
|
||||
.arg("--config")
|
||||
.arg(&config),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(
|
||||
stderr.contains("nonexistent"),
|
||||
"error must name the unknown graph; stderr:\n{stderr}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queries_commands_reject_named_graph_with_populated_top_level_block() {
|
||||
// A named graph (here via `cli.graph`) uses its own `graphs.<name>` block,
|
||||
// so a populated top-level `queries:` block would be silently ignored — a
|
||||
// config the server REFUSES to boot. `queries validate`/`list` must reject
|
||||
// it too (matching boot) instead of validating/listing the per-graph block
|
||||
// and giving a false green.
|
||||
let graph = SystemGraph::loaded();
|
||||
graph.write_query(
|
||||
"find_person.gq",
|
||||
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
|
||||
);
|
||||
let config = graph.write_config(
|
||||
"omnigraph.yaml",
|
||||
&format!(
|
||||
concat!(
|
||||
"graphs:\n",
|
||||
" local:\n",
|
||||
" uri: '{}'\n",
|
||||
" queries:\n",
|
||||
" find_person:\n",
|
||||
" file: ./find_person.gq\n",
|
||||
"cli:\n",
|
||||
" graph: local\n",
|
||||
"queries:\n", // populated top-level block: the coherence violation
|
||||
" legacy:\n",
|
||||
" file: ./legacy.gq\n",
|
||||
"policy: {{}}\n",
|
||||
),
|
||||
graph.path().to_string_lossy().replace('\'', "''")
|
||||
),
|
||||
);
|
||||
// Both resolve `local` from cli.graph (no positional URI), so both must
|
||||
// error and name the graph + the ignored block — like server boot does.
|
||||
for sub in ["validate", "list"] {
|
||||
let output = output_failure(cli().arg("queries").arg(sub).arg("--config").arg(&config));
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(
|
||||
stderr.contains("local") && stderr.contains("queries"),
|
||||
"`queries {sub}` must reject a named graph with a populated top-level block; stderr:\n{stderr}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queries_validate_exits_nonzero_on_duplicate_tool_name() {
|
||||
// Two exposed queries claiming one MCP tool name is a load-time
|
||||
// collision — `queries validate` must fail (offline, before the engine
|
||||
// opens) and name both queries plus the contested tool.
|
||||
let graph = SystemGraph::loaded();
|
||||
graph.write_query(
|
||||
"a.gq",
|
||||
"query a() { match { $p: Person } return { $p.name } }",
|
||||
);
|
||||
graph.write_query(
|
||||
"b.gq",
|
||||
"query b() { match { $p: Person } return { $p.name } }",
|
||||
);
|
||||
let config = graph.write_config(
|
||||
"omnigraph.yaml",
|
||||
&format!(
|
||||
concat!(
|
||||
"graphs:\n",
|
||||
" local:\n",
|
||||
" uri: '{}'\n",
|
||||
" queries:\n",
|
||||
" a:\n",
|
||||
" file: ./a.gq\n",
|
||||
" mcp: {{ expose: true, tool_name: dup }}\n",
|
||||
" b:\n",
|
||||
" file: ./b.gq\n",
|
||||
" mcp: {{ expose: true, tool_name: dup }}\n",
|
||||
"cli:\n",
|
||||
" graph: local\n",
|
||||
"policy: {{}}\n",
|
||||
),
|
||||
graph.path().to_string_lossy().replace('\'', "''")
|
||||
),
|
||||
);
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("queries")
|
||||
.arg("validate")
|
||||
.arg("--config")
|
||||
.arg(&config),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(
|
||||
stderr.contains("dup") && stderr.contains("'a'") && stderr.contains("'b'"),
|
||||
"duplicate tool name should be reported naming both queries; stderr:\n{stderr}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn queries_validate_positional_uri_ignores_default_graph() {
|
||||
// A positional URI is anonymous → the schema AND the registry both come
|
||||
// from top-level, even when `cli.graph` names a graph whose per-graph
|
||||
// queries would fail. Pins that the URI and registry can't diverge.
|
||||
let graph = SystemGraph::loaded();
|
||||
graph.write_query(
|
||||
"clean.gq",
|
||||
"query clean($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
|
||||
);
|
||||
// `Widget` is not in the fixture schema — the default graph's per-graph
|
||||
// query would break validate if it were (wrongly) selected.
|
||||
graph.write_query(
|
||||
"broken.gq",
|
||||
"query broken() { match { $w: Widget } return { $w.name } }",
|
||||
);
|
||||
let config = graph.write_config(
|
||||
"omnigraph.yaml",
|
||||
concat!(
|
||||
"cli:\n graph: prod\n",
|
||||
"graphs:\n",
|
||||
" prod:\n",
|
||||
" uri: /nonexistent-prod.omni\n",
|
||||
" queries:\n",
|
||||
" broken:\n",
|
||||
" file: ./broken.gq\n",
|
||||
"queries:\n",
|
||||
" clean:\n",
|
||||
" file: ./clean.gq\n",
|
||||
"policy: {}\n",
|
||||
),
|
||||
);
|
||||
// Positional URI = the real loaded graph; selection is anonymous, so the
|
||||
// CLEAN top-level registry validates (not prod's broken one).
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("queries")
|
||||
.arg("validate")
|
||||
.arg(graph.path())
|
||||
.arg("--config")
|
||||
.arg(&config),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
assert!(
|
||||
stdout.contains("OK"),
|
||||
"positional URI must validate the top-level registry, not the cli.graph default; stdout:\n{stdout}"
|
||||
);
|
||||
}
|
||||
500
crates/omnigraph-cli/tests/cli_schema_config.rs
Normal file
500
crates/omnigraph-cli/tests/cli_schema_config.rs
Normal file
|
|
@ -0,0 +1,500 @@
|
|||
//! init/config scaffolding, schema plan/apply, graphs listing, version.
|
||||
//! Moved verbatim from tests/cli.rs in the modularization.
|
||||
|
||||
use std::fs;
|
||||
|
||||
use lance::index::DatasetIndexExt;
|
||||
use omnigraph::db::{Omnigraph, ReadTarget};
|
||||
use serde_json::Value;
|
||||
use tempfile::tempdir;
|
||||
|
||||
mod support;
|
||||
|
||||
use support::*;
|
||||
|
||||
|
||||
#[test]
|
||||
fn version_command_prints_current_cli_version() {
|
||||
let output = output_success(cli().arg("version"));
|
||||
let stdout = stdout_string(&output);
|
||||
|
||||
assert_eq!(
|
||||
stdout.trim(),
|
||||
format!("omnigraph {}", env!("CARGO_PKG_VERSION"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn init_creates_graph_successfully_on_missing_local_directory() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema = fixture("test.pg");
|
||||
|
||||
let output = output_success(cli().arg("init").arg("--schema").arg(&schema).arg(&graph));
|
||||
let stdout = stdout_string(&output);
|
||||
|
||||
assert!(stdout.contains("initialized"));
|
||||
assert!(graph.join("_schema.pg").exists());
|
||||
assert!(graph.join("__manifest").exists());
|
||||
assert!(temp.path().join("omnigraph.yaml").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_plan_json_reports_supported_additive_change() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("next.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
);
|
||||
fs::write(&schema_path, next_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("plan")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
|
||||
assert_eq!(payload["supported"], true);
|
||||
assert_eq!(payload["step_count"], 1);
|
||||
assert_eq!(payload["steps"][0]["kind"], "add_property");
|
||||
assert_eq!(payload["steps"][0]["type_kind"], "node");
|
||||
assert_eq!(payload["steps"][0]["type_name"], "Person");
|
||||
assert_eq!(payload["steps"][0]["property_name"], "nickname");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_plan_json_reports_unsupported_type_change() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("breaking.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
let breaking_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("age: I32?", "age: I64?");
|
||||
fs::write(&schema_path, breaking_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("plan")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
|
||||
assert_eq!(payload["supported"], false);
|
||||
assert!(payload["steps"].as_array().unwrap().iter().any(|step| {
|
||||
step["kind"] == "unsupported_change"
|
||||
&& step["entity"]
|
||||
.as_str()
|
||||
.unwrap_or_default()
|
||||
.contains("Person.age")
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_json_applies_supported_migration() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("next.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
);
|
||||
fs::write(&schema_path, next_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
|
||||
assert_eq!(payload["supported"], true);
|
||||
assert_eq!(payload["applied"], true);
|
||||
assert_eq!(payload["step_count"], 1);
|
||||
|
||||
let db = tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(Omnigraph::open(graph.to_string_lossy().as_ref()))
|
||||
.unwrap();
|
||||
assert!(
|
||||
db.catalog().node_types["Person"]
|
||||
.properties
|
||||
.contains_key("nickname")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_human_reports_noop() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = fixture("test.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg(&graph),
|
||||
);
|
||||
let stdout = stdout_string(&output);
|
||||
|
||||
assert!(stdout.contains("applied: no"));
|
||||
assert!(stdout.contains("no schema changes"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_json_renames_type_and_updates_snapshot() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("rename.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
let renamed_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
|
||||
.replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
|
||||
.replace(
|
||||
"edge WorksAt: Person -> Company",
|
||||
"edge WorksAt: Human -> Company",
|
||||
);
|
||||
fs::write(&schema_path, renamed_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
assert_eq!(payload["applied"], true);
|
||||
|
||||
let db = tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(Omnigraph::open(graph.to_string_lossy().as_ref()))
|
||||
.unwrap();
|
||||
let snapshot = tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(db.snapshot_of(ReadTarget::branch("main")))
|
||||
.unwrap();
|
||||
assert!(snapshot.entry("node:Human").is_some());
|
||||
assert!(snapshot.entry("node:Person").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_json_renames_property_and_updates_catalog() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("rename-property.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
let renamed_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("age: I32?", "years: I32? @rename_from(\"age\")");
|
||||
fs::write(&schema_path, renamed_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
assert_eq!(payload["applied"], true);
|
||||
|
||||
let db = tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(Omnigraph::open(graph.to_string_lossy().as_ref()))
|
||||
.unwrap();
|
||||
let person = &db.catalog().node_types["Person"];
|
||||
assert!(person.properties.contains_key("years"));
|
||||
assert!(!person.properties.contains_key("age"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_json_adds_index_for_existing_property() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("index.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
let before_index_count = tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
let db = Omnigraph::open(graph.to_string_lossy().as_ref())
|
||||
.await
|
||||
.unwrap();
|
||||
let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
|
||||
let dataset = snapshot.open("node:Person").await.unwrap();
|
||||
dataset.load_indices().await.unwrap().len()
|
||||
});
|
||||
|
||||
let indexed_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("name: String @key", "name: String @key @index");
|
||||
fs::write(&schema_path, indexed_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
assert_eq!(payload["applied"], true);
|
||||
|
||||
let after_index_count = tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
let db = Omnigraph::open(graph.to_string_lossy().as_ref())
|
||||
.await
|
||||
.unwrap();
|
||||
let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
|
||||
let dataset = snapshot.open("node:Person").await.unwrap();
|
||||
dataset.load_indices().await.unwrap().len()
|
||||
});
|
||||
assert!(after_index_count > before_index_count);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_rejects_unsupported_plan() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("breaking.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
let breaking_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace("age: I32?", "age: I64?");
|
||||
fs::write(&schema_path, breaking_schema).unwrap();
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg(&graph),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("changing property type"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_rejects_when_non_main_branch_exists() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("next.pg");
|
||||
init_graph(&graph);
|
||||
output_success(
|
||||
cli()
|
||||
.arg("branch")
|
||||
.arg("create")
|
||||
.arg("--from")
|
||||
.arg("main")
|
||||
.arg("--uri")
|
||||
.arg(&graph)
|
||||
.arg("feature"),
|
||||
);
|
||||
|
||||
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
);
|
||||
fs::write(&schema_path, next_schema).unwrap();
|
||||
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg(&graph),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("schema apply requires a graph with only main"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_allow_data_loss_flag_promotes_drops_to_hard() {
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("drop-age.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
// Drop the nullable `age` column.
|
||||
let next_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace(" age: I32?\n", "");
|
||||
fs::write(&schema_path, next_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--allow-data-loss")
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
assert_eq!(payload["applied"], true);
|
||||
|
||||
let drop_step = payload["steps"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.find(|s| s["kind"] == "drop_property")
|
||||
.expect("plan should include a drop_property step");
|
||||
assert_eq!(
|
||||
drop_step["mode"], "hard",
|
||||
"--allow-data-loss should promote Soft → Hard; full step: {drop_step}",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_apply_without_allow_data_loss_keeps_soft_drops() {
|
||||
// Symmetric to the above: same schema change without the flag →
|
||||
// drops stay Soft. Pins default semantics against accidental Hard
|
||||
// promotion if a future refactor changes the option threading.
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
let schema_path = temp.path().join("drop-age-soft.pg");
|
||||
init_graph(&graph);
|
||||
|
||||
let next_schema = fs::read_to_string(fixture("test.pg"))
|
||||
.unwrap()
|
||||
.replace(" age: I32?\n", "");
|
||||
fs::write(&schema_path, next_schema).unwrap();
|
||||
|
||||
let output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("apply")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
|
||||
assert_eq!(payload["applied"], true);
|
||||
|
||||
let drop_step = payload["steps"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.find(|s| s["kind"] == "drop_property")
|
||||
.expect("plan should include a drop_property step");
|
||||
assert_eq!(
|
||||
drop_step["mode"], "soft",
|
||||
"no flag should leave drops Soft; full step: {drop_step}",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn schema_plan_parity_cli_and_sdk() {
|
||||
// Same .pg through `Omnigraph::plan_schema_with_options` (SDK) and
|
||||
// `omnigraph schema plan --json` (CLI). Asserts the steps array is
|
||||
// byte-identical after JSON round-trip. HTTP doesn't expose a
|
||||
// separate /schema/plan route — that side of parity is covered by
|
||||
// the HTTP soft/hard drop tests, which exercise apply with
|
||||
// identical fixtures.
|
||||
let temp = tempdir().unwrap();
|
||||
let graph = graph_path(temp.path());
|
||||
init_graph(&graph);
|
||||
let schema_path = temp.path().join("plan-parity.pg");
|
||||
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
|
||||
" age: I32?\n}",
|
||||
" age: I32?\n nickname: String?\n}",
|
||||
);
|
||||
fs::write(&schema_path, &next_schema).unwrap();
|
||||
|
||||
// CLI side.
|
||||
let cli_output = output_success(
|
||||
cli()
|
||||
.arg("schema")
|
||||
.arg("plan")
|
||||
.arg("--schema")
|
||||
.arg(&schema_path)
|
||||
.arg("--json")
|
||||
.arg(&graph),
|
||||
);
|
||||
let cli_payload: Value = serde_json::from_slice(&cli_output.stdout).unwrap();
|
||||
|
||||
// SDK side: open graph, call plan_schema.
|
||||
let plan = tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
let db = Omnigraph::open(graph.to_string_lossy().as_ref())
|
||||
.await
|
||||
.unwrap();
|
||||
db.plan_schema(&next_schema).await.unwrap()
|
||||
});
|
||||
let sdk_steps = serde_json::to_value(&plan.steps).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
cli_payload["steps"], sdk_steps,
|
||||
"CLI plan steps must match SDK plan steps for identical input",
|
||||
);
|
||||
assert_eq!(cli_payload["supported"], plan.supported);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn graphs_subcommand_help_lists_list_only() {
|
||||
let output = output_success(cli().arg("graphs").arg("--help"));
|
||||
let stdout = stdout_string(&output);
|
||||
assert!(
|
||||
stdout.contains("list"),
|
||||
"expected `list` subcommand in help output:\n{stdout}"
|
||||
);
|
||||
let lowered = stdout.to_lowercase();
|
||||
assert!(
|
||||
!lowered.contains("create a new graph"),
|
||||
"graph create should not be in v0.6.0 help; got:\n{stdout}"
|
||||
);
|
||||
assert!(
|
||||
!lowered.contains("delete a graph"),
|
||||
"graph delete should not be in v0.6.0 help; got:\n{stdout}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn graphs_list_against_local_uri_errors_with_remote_only_message() {
|
||||
let output = output_failure(
|
||||
cli()
|
||||
.arg("graphs")
|
||||
.arg("list")
|
||||
.arg("--uri")
|
||||
.arg("/tmp/local"),
|
||||
);
|
||||
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
|
||||
assert!(
|
||||
stderr.contains("remote multi-graph server URL"),
|
||||
"expected 'remote multi-graph server URL' rejection in stderr; got:\n{stderr}"
|
||||
);
|
||||
}
|
||||
|
|
@ -317,3 +317,353 @@ impl SystemGraph {
|
|||
spawn_server_with_config_env(config, envs)
|
||||
}
|
||||
}
|
||||
|
||||
// ---- helpers moved from the monolithic tests/cli.rs ----
|
||||
#[allow(unused_imports)]
|
||||
use lance::Dataset;
|
||||
#[allow(unused_imports)]
|
||||
use lance::index::DatasetIndexExt;
|
||||
#[allow(unused_imports)]
|
||||
use omnigraph::db::{Omnigraph, ReadTarget};
|
||||
|
||||
pub const POLICY_YAML: &str = r#"
|
||||
version: 1
|
||||
groups:
|
||||
team: [act-andrew, act-bruno]
|
||||
admins: [act-andrew]
|
||||
protected_branches: [main]
|
||||
rules:
|
||||
- id: team-read
|
||||
allow:
|
||||
actors: { group: team }
|
||||
actions: [read]
|
||||
branch_scope: any
|
||||
- id: team-write
|
||||
allow:
|
||||
actors: { group: team }
|
||||
actions: [change]
|
||||
branch_scope: unprotected
|
||||
- id: admins-promote
|
||||
allow:
|
||||
actors: { group: admins }
|
||||
actions: [branch_merge]
|
||||
target_branch_scope: protected
|
||||
"#;
|
||||
|
||||
pub const POLICY_TESTS_YAML: &str = r#"
|
||||
version: 1
|
||||
cases:
|
||||
- id: allow-feature-write
|
||||
actor: act-andrew
|
||||
action: change
|
||||
branch: feature
|
||||
expect: allow
|
||||
- id: deny-main-write
|
||||
actor: act-bruno
|
||||
action: change
|
||||
branch: main
|
||||
expect: deny
|
||||
"#;
|
||||
|
||||
pub fn manifest_dataset_version(graph: &std::path::Path) -> u64 {
|
||||
tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
Omnigraph::open(graph.to_string_lossy().as_ref())
|
||||
.await
|
||||
.unwrap()
|
||||
.snapshot_of(ReadTarget::branch("main"))
|
||||
.await
|
||||
.unwrap()
|
||||
.version()
|
||||
})
|
||||
}
|
||||
|
||||
pub fn forge_person_delete_drift(graph: &std::path::Path) -> (u64, u64) {
|
||||
tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
let uri = graph.to_string_lossy();
|
||||
let db = Omnigraph::open(uri.as_ref()).await.unwrap();
|
||||
let snap = db
|
||||
.snapshot_of(ReadTarget::branch("main"))
|
||||
.await
|
||||
.unwrap();
|
||||
let entry = snap.entry("node:Person").unwrap();
|
||||
let full_path = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
|
||||
let mut ds = Dataset::open(&full_path).await.unwrap();
|
||||
let deleted = ds.delete("name = 'Alice'").await.unwrap();
|
||||
assert_eq!(deleted.num_deleted_rows, 1);
|
||||
let head = deleted.new_dataset.version().version;
|
||||
assert!(head > entry.table_version);
|
||||
(entry.table_version, head)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn write_policy_config_fixture(root: &std::path::Path) -> (std::path::PathBuf, std::path::PathBuf) {
|
||||
let config = root.join("omnigraph.yaml");
|
||||
let policy = root.join("policy.yaml");
|
||||
fs::write(
|
||||
&config,
|
||||
r#"
|
||||
project:
|
||||
name: policy-test-graph
|
||||
policy:
|
||||
file: ./policy.yaml
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(&policy, POLICY_YAML).unwrap();
|
||||
fs::write(root.join("policy.tests.yaml"), POLICY_TESTS_YAML).unwrap();
|
||||
(config, policy)
|
||||
}
|
||||
|
||||
pub fn write_cluster_config_fixture(root: &std::path::Path) {
|
||||
fs::write(
|
||||
root.join("people.pg"),
|
||||
r#"
|
||||
node Person {
|
||||
name: String @key
|
||||
age: I32?
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
root.join("people.gq"),
|
||||
r#"
|
||||
query find_person($name: String) {
|
||||
match { $p: Person { name: $name } }
|
||||
return { $p.name, $p.age }
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(root.join("base.policy.yaml"), "rules: []\n").unwrap();
|
||||
fs::write(
|
||||
root.join("cluster.yaml"),
|
||||
r#"
|
||||
version: 1
|
||||
metadata:
|
||||
name: company-brain
|
||||
state:
|
||||
backend: cluster
|
||||
lock: true
|
||||
graphs:
|
||||
knowledge:
|
||||
schema: ./people.pg
|
||||
queries:
|
||||
find_person:
|
||||
file: ./people.gq
|
||||
policies:
|
||||
base:
|
||||
file: ./base.policy.yaml
|
||||
applies_to: [knowledge]
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn init_cluster_derived_graph(root: &std::path::Path) {
|
||||
init_named_cluster_graph(root, "knowledge", "people.pg");
|
||||
}
|
||||
|
||||
pub fn init_named_cluster_graph(root: &std::path::Path, graph_id: &str, schema_file: &str) {
|
||||
let graph_dir = root.join("graphs");
|
||||
fs::create_dir_all(&graph_dir).unwrap();
|
||||
output_success(
|
||||
cli()
|
||||
.arg("init")
|
||||
.arg("--schema")
|
||||
.arg(root.join(schema_file))
|
||||
.arg(graph_dir.join(format!("{graph_id}.omni"))),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn write_cluster_lock(root: &std::path::Path, lock_id: &str, operation: &str) {
|
||||
let state_dir = root.join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("lock.json"),
|
||||
format!(
|
||||
r#"{{"version":1,"lock_id":"{lock_id}","operation":"{operation}","created_at":"1970-01-01T00:00:00Z","pid":123}}"#
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn write_cluster_applyable_state(root: &std::path::Path) -> serde_json::Value {
|
||||
let validate = parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg("validate")
|
||||
.arg("--config")
|
||||
.arg(root)
|
||||
.arg("--json"),
|
||||
));
|
||||
let schema_digest = validate["resource_digests"]["schema.knowledge"]
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
let state_dir = root.join("__cluster");
|
||||
fs::create_dir_all(&state_dir).unwrap();
|
||||
fs::write(
|
||||
state_dir.join("state.json"),
|
||||
format!(
|
||||
r#"{{
|
||||
"version": 1,
|
||||
"state_revision": 1,
|
||||
"applied_revision": {{
|
||||
"resources": {{
|
||||
"graph.knowledge": {{ "digest": "seed" }},
|
||||
"schema.knowledge": {{ "digest": "{schema_digest}" }}
|
||||
}}
|
||||
}}
|
||||
}}
|
||||
"#
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
validate
|
||||
}
|
||||
|
||||
pub fn cluster_json(root: &std::path::Path, command: &str) -> serde_json::Value {
|
||||
parse_stdout_json(&output_success(
|
||||
cli()
|
||||
.arg("cluster")
|
||||
.arg(command)
|
||||
.arg("--config")
|
||||
.arg(root)
|
||||
.arg("--json"),
|
||||
))
|
||||
}
|
||||
|
||||
pub fn write_multi_graph_cluster_fixture(root: &std::path::Path) {
|
||||
write_cluster_config_fixture(root);
|
||||
fs::write(
|
||||
root.join("services.pg"),
|
||||
r#"
|
||||
node Service {
|
||||
name: String @key
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
root.join("services.gq"),
|
||||
r#"
|
||||
query find_service($name: String) {
|
||||
match { $s: Service { name: $name } }
|
||||
return { $s.name }
|
||||
}
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(root.join("cluster_wide.policy.yaml"), "rules: []\n").unwrap();
|
||||
fs::write(root.join("shared.policy.yaml"), "rules: []\n").unwrap();
|
||||
fs::write(
|
||||
root.join("cluster.yaml"),
|
||||
r#"
|
||||
version: 1
|
||||
metadata:
|
||||
name: company-brain
|
||||
state:
|
||||
backend: cluster
|
||||
lock: true
|
||||
graphs:
|
||||
knowledge:
|
||||
schema: ./people.pg
|
||||
queries:
|
||||
find_person:
|
||||
file: ./people.gq
|
||||
engineering:
|
||||
schema: ./services.pg
|
||||
queries:
|
||||
find_service:
|
||||
file: ./services.gq
|
||||
policies:
|
||||
shared:
|
||||
file: ./shared.policy.yaml
|
||||
applies_to: [knowledge, engineering]
|
||||
cluster_wide:
|
||||
file: ./cluster_wide.policy.yaml
|
||||
applies_to: [cluster]
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub fn change_for<'j>(json: &'j serde_json::Value, resource: &str) -> &'j serde_json::Value {
|
||||
json["changes"]
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.find(|change| change["resource"] == resource)
|
||||
.unwrap_or_else(|| panic!("missing change for {resource}: {json}"))
|
||||
}
|
||||
|
||||
pub fn write_seed_fixture(root: &std::path::Path) -> std::path::PathBuf {
|
||||
fs::create_dir_all(root.join("data")).unwrap();
|
||||
fs::create_dir_all(root.join("build")).unwrap();
|
||||
let raw_seed = root.join("data/seed.jsonl");
|
||||
let seed = root.join("seed.yaml");
|
||||
|
||||
fs::write(
|
||||
&raw_seed,
|
||||
concat!(
|
||||
"{\"type\":\"Decision\",\"data\":{\"slug\":\"dec-alpha\",\"intent\":\"Alpha ship\"}}\n",
|
||||
"{\"type\":\"Decision\",\"data\":{\"slug\":\"dec-beta\",\"intent\":\"Beta ship\",\"embedding\":[0.1,0.2]}}\n"
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
fs::write(
|
||||
&seed,
|
||||
concat!(
|
||||
"graph:\n",
|
||||
" slug: mr-context-graph\n",
|
||||
"sources:\n",
|
||||
" raw_seed: ./data/seed.jsonl\n",
|
||||
"artifacts:\n",
|
||||
" embedded_seed: ./build/seed.embedded.jsonl\n",
|
||||
"embeddings:\n",
|
||||
" model: gemini-embedding-2-preview\n",
|
||||
" dimension: 4\n",
|
||||
" types:\n",
|
||||
" Decision:\n",
|
||||
" target: embedding\n",
|
||||
" fields: [slug, intent]\n"
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
seed
|
||||
}
|
||||
|
||||
pub fn write_seed_fixture_with_edge(root: &std::path::Path) -> std::path::PathBuf {
|
||||
let seed = write_seed_fixture(root);
|
||||
let raw_seed = root.join("data/seed.jsonl");
|
||||
fs::write(
|
||||
&raw_seed,
|
||||
concat!(
|
||||
"{\"type\":\"Decision\",\"data\":{\"slug\":\"dec-alpha\",\"intent\":\"Alpha ship\"}}\n",
|
||||
"{\"type\":\"Decision\",\"data\":{\"slug\":\"dec-beta\",\"intent\":\"Beta ship\",\"embedding\":[0.1,0.2]}}\n",
|
||||
"{\"edge\":\"Triggered\",\"from\":\"sig-alpha\",\"to\":\"dec-alpha\"}\n"
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
seed
|
||||
}
|
||||
|
||||
pub fn read_embedded_rows(path: std::path::PathBuf) -> Vec<Value> {
|
||||
fs::read_to_string(path)
|
||||
.unwrap()
|
||||
.lines()
|
||||
.filter(|line| !line.trim().is_empty())
|
||||
.map(|line| serde_json::from_str(line).unwrap())
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn queries_test_config(graph_uri: &str, entry: &str, gq_file: &str) -> String {
|
||||
format!(
|
||||
"graphs:\n local:\n uri: '{}'\n queries:\n {entry}:\n file: ./{gq_file}\n\
|
||||
cli:\n graph: local\npolicy: {{}}\n",
|
||||
graph_uri.replace('\'', "''")
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue