Merge pull request #193 from ModernRelay/refactor/cli-modularize

refactor(cli): modularize main.rs and the test monolith — pure code movement
This commit is contained in:
Andrew Altshuler 2026-06-11 15:37:28 +03:00 committed by GitHub
commit 7af3697397
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 7511 additions and 7570 deletions

View file

@ -0,0 +1,650 @@
//! The clap surface: every command, subcommand, and argument struct
//! (moved verbatim from main.rs in the modularization).
use super::*;
pub(crate) const DEFAULT_BEARER_TOKEN_ENV: &str = "OMNIGRAPH_BEARER_TOKEN";
#[derive(Debug, Parser)]
#[command(name = "omnigraph")]
#[command(about = "Omnigraph graph database CLI")]
#[command(version = env!("CARGO_PKG_VERSION"), disable_version_flag = true)]
pub(crate) struct Cli {
/// Actor identity for direct-engine writes (MR-722). Overrides
/// `cli.actor` from `omnigraph.yaml`. When the configured policy
/// is in effect, Cedar evaluates this actor against the requested
/// action and scope; with policy configured but neither this flag
/// nor `cli.actor` set, the engine-layer footgun guard fires and
/// the write is denied (no silent bypass). Has no effect on remote
/// HTTP writes — those resolve their actor server-side from the
/// bearer token.
#[arg(long = "as", global = true, value_name = "ACTOR")]
pub(crate) as_actor: Option<String>,
#[command(subcommand)]
pub(crate) command: Command,
}
#[derive(Debug, Subcommand)]
pub(crate) enum Command {
/// Print the CLI version
Version,
/// Generate, clean, or refresh explicit seed embeddings
Embed(EmbedArgs),
/// Initialize a new graph from a schema
Init {
#[arg(long)]
schema: PathBuf,
/// Graph URI (local path or s3://)
uri: String,
/// Overwrite existing schema artifacts at the URI. Without
/// this flag, init refuses to touch a URI that already holds
/// `_schema.pg`, `_schema.ir.json`, or `__schema_state.json`
/// — closes the re-init footgun (MR-668 follow-up). With the
/// flag, the operator opts in to destructive semantics.
#[arg(long)]
force: bool,
},
/// Load data into a graph (local or remote)
Load {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
data: PathBuf,
/// Target branch (defaults to main). Without --from it must exist.
#[arg(long)]
branch: Option<String>,
/// Base branch to fork --branch from when it doesn't exist yet.
/// Without this flag a missing branch is an error, never a fork.
#[arg(long)]
from: Option<String>,
/// How existing rows are handled: overwrite | append | merge.
/// Required — overwrite is destructive, so there is no default.
#[arg(long)]
mode: CliLoadMode,
#[arg(long)]
json: bool,
},
/// Deprecated alias of `load --from <base>` (defaults: --mode merge, --from main)
Ingest {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
data: PathBuf,
#[arg(long)]
branch: Option<String>,
#[arg(long)]
from: Option<String>,
#[arg(long, default_value = "merge")]
mode: CliLoadMode,
#[arg(long)]
json: bool,
},
/// Branch operations
Branch {
#[command(subcommand)]
command: BranchCommand,
},
/// Schema planning operations
Schema {
#[command(subcommand)]
command: SchemaCommand,
},
/// Validate queries against a schema (offline) or repo (repo-backed).
///
/// Canonical name is `lint` (matches the `omnigraph_compiler::lint`
/// module and the `OG-XXX-NNN` lint-code vocabulary). Replaces the
/// deprecated `omnigraph query lint` / `omnigraph query check` /
/// `omnigraph check` invocations — each is kept as an argv-level
/// shim that prints a one-line stderr warning and rewrites to
/// `omnigraph lint`. Aliases are deliberately *not* exposed via
/// clap's `visible_alias` because that would advertise two
/// equivalent canonical names, which agents emit interchangeably
/// (see MR-981).
Lint {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
query: PathBuf,
#[arg(long)]
schema: Option<PathBuf>,
#[arg(long)]
json: bool,
},
/// Operate on the server-side stored-query registry (`queries:`).
Queries {
#[command(subcommand)]
command: QueriesCommand,
},
/// Show graph snapshot
Snapshot {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
branch: Option<String>,
#[arg(long)]
json: bool,
},
/// Export a full graph snapshot as JSONL
Export {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
branch: Option<String>,
#[arg(long, hide = true)]
jsonl: bool,
#[arg(long = "type")]
type_names: Vec<String>,
#[arg(long = "table")]
table_keys: Vec<String>,
},
/// Commit history operations
Commit {
#[command(subcommand)]
command: CommitCommand,
},
/// Execute a read query against a branch or snapshot.
///
/// Canonical read endpoint. The previous name `omnigraph read` is
/// kept as a visible alias and prints a one-line deprecation warning
/// when used. Pairs with `omnigraph mutate` on the write side.
#[command(visible_alias = "read")]
Query {
/// Graph URI
#[arg(long)]
uri: Option<String>,
#[arg(hide = true)]
legacy_uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long, conflicts_with_all = ["query", "query_string"])]
alias: Option<String>,
#[arg(long, conflicts_with_all = ["alias", "query_string"])]
query: Option<PathBuf>,
/// Inline GQ source — alternative to `--query <path>` and `--alias <name>`.
#[arg(short = 'e', long = "query-string", value_name = "GQ", conflicts_with_all = ["query", "alias"])]
query_string: Option<String>,
#[arg(long)]
name: Option<String>,
#[command(flatten)]
params: ParamsArgs,
#[arg(long, conflicts_with = "snapshot")]
branch: Option<String>,
#[arg(long, conflicts_with = "branch")]
snapshot: Option<String>,
#[arg(long, conflicts_with = "json")]
format: Option<ReadOutputFormat>,
#[arg(long, conflicts_with = "format")]
json: bool,
#[arg()]
alias_args: Vec<String>,
},
/// Execute a graph mutation query against a branch.
///
/// Canonical mutation endpoint. The previous name `omnigraph change`
/// is kept as a visible alias and prints a one-line deprecation
/// warning when used. Pairs with `omnigraph query` on the read side.
#[command(visible_alias = "change")]
Mutate {
/// Graph URI
#[arg(long)]
uri: Option<String>,
#[arg(hide = true)]
legacy_uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long, conflicts_with_all = ["query", "query_string"])]
alias: Option<String>,
#[arg(long, conflicts_with_all = ["alias", "query_string"])]
query: Option<PathBuf>,
/// Inline GQ source — alternative to `--query <path>` and `--alias <name>`.
#[arg(short = 'e', long = "query-string", value_name = "GQ", conflicts_with_all = ["query", "alias"])]
query_string: Option<String>,
#[arg(long)]
name: Option<String>,
#[command(flatten)]
params: ParamsArgs,
#[arg(long)]
branch: Option<String>,
#[arg(long)]
json: bool,
#[arg()]
alias_args: Vec<String>,
},
/// Policy administration and diagnostics
Policy {
#[command(subcommand)]
command: PolicyCommand,
},
/// Compact small Lance fragments in every table of the graph
Optimize {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
json: bool,
},
/// Classify and explicitly repair manifest/head drift
Repair {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
/// Publish verified maintenance drift. Without this flag, repair only
/// previews what it would do.
#[arg(long)]
confirm: bool,
/// Also publish suspicious or unverifiable drift. Requires
/// `--confirm`; use only after operator review.
#[arg(long, requires = "confirm")]
force: bool,
#[arg(long)]
json: bool,
},
/// Remove old Lance versions from every table of the graph (destructive)
Cleanup {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
/// Number of recent versions to keep per table. Either `--keep` or
/// `--older-than` (or both) must be set.
#[arg(long)]
keep: Option<u32>,
/// Only remove versions older than this duration. Accepts Go-style
/// durations: `7d`, `24h`, `90m`. At least one of --keep / --older-than.
#[arg(long)]
older_than: Option<String>,
/// Required to actually run; without it, prints what would be removed
#[arg(long)]
confirm: bool,
#[arg(long)]
json: bool,
},
/// Validate and plan read-only cluster configuration.
Cluster {
#[command(subcommand)]
command: ClusterCommand,
},
/// Manage graphs on a multi-graph server (MR-668)
Graphs {
#[command(subcommand)]
command: GraphsCommand,
},
}
#[derive(Debug, Subcommand)]
pub(crate) enum ClusterCommand {
/// Validate cluster.yaml and referenced schemas, queries, and policy files.
Validate {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Produce a read-only plan by diffing cluster.yaml against __cluster/state.json.
Plan {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Apply the config-only (query/policy) subset of the plan to the local
/// cluster catalog. Graph/schema changes are deferred to a later stage.
Apply {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Record a digest-bound approval for a gated (irreversible) change,
/// e.g. a graph delete. Requires the global --as actor.
Approve {
/// Typed resource address of the gated change (e.g. graph.scratch).
resource: String,
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Read the local JSON state ledger without scanning live graph resources.
Status {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Refresh existing local JSON state from declared graph observations.
Refresh {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Import initial local JSON state from declared graph observations.
Import {
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
/// Remove a held local JSON state lock after operator confirmation.
ForceUnlock {
/// Exact lock id from cluster status or a state_lock_held diagnostic.
lock_id: String,
/// Cluster config directory containing cluster.yaml.
#[arg(long, default_value = ".")]
config: PathBuf,
/// Emit JSON instead of human text.
#[arg(long)]
json: bool,
},
}
/// Operations on the graph registry of a multi-graph server (MR-668).
///
/// All operations target a remote multi-graph server URL (http:// or
/// https://). Local-URI invocations return a clear error. To add or
/// remove graphs, operators edit `omnigraph.yaml` directly and restart
/// the server — runtime mutation is not exposed in v0.6.0.
#[derive(Debug, Subcommand)]
pub(crate) enum GraphsCommand {
/// List every graph registered with the multi-graph server.
List {
/// Remote server URL (e.g. `https://server.example.com`).
#[arg(long)]
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
json: bool,
},
}
#[derive(Debug, Subcommand)]
pub(crate) enum BranchCommand {
/// Create a new branch
Create {
/// Graph URI
#[arg(long)]
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
from: Option<String>,
name: String,
#[arg(long)]
json: bool,
},
/// List branches
List {
/// Graph URI
#[arg(long)]
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
json: bool,
},
/// Delete a branch
Delete {
/// Graph URI
#[arg(long)]
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
name: String,
#[arg(long)]
json: bool,
},
/// Merge a source branch into a target branch
Merge {
/// Graph URI
#[arg(long)]
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
source: String,
#[arg(long)]
into: Option<String>,
#[arg(long)]
json: bool,
},
}
#[derive(Debug, Subcommand)]
pub(crate) enum SchemaCommand {
/// Plan a schema migration against the accepted persisted schema
Plan {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
schema: PathBuf,
#[arg(long)]
json: bool,
/// Show the plan as it would execute with `--allow-data-loss`.
/// Promotes every `DropMode::Soft` step to `DropMode::Hard`
/// so the plan output reflects the destructive intent.
#[arg(long, default_value_t = false)]
allow_data_loss: bool,
},
/// Apply a supported schema migration
Apply {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
schema: PathBuf,
#[arg(long)]
json: bool,
/// Allow destructive (data-loss) schema changes.
///
/// Without this flag, drops are "soft": the column or table
/// is removed from the current manifest version but prior
/// versions are retained, so `snapshot_at_version(pre_drop)`
/// can still read the dropped data until `omnigraph cleanup`
/// runs. With this flag, drops are "hard": `cleanup_old_versions`
/// runs on the affected datasets immediately after the apply,
/// making the prior data unreachable.
#[arg(long, default_value_t = false)]
allow_data_loss: bool,
},
/// Show the current accepted schema source
#[command(alias = "get")]
Show {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
json: bool,
},
}
#[derive(Debug, Subcommand)]
pub(crate) enum CommitCommand {
/// List graph commits
List {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
branch: Option<String>,
#[arg(long)]
json: bool,
},
/// Show a graph commit
Show {
/// Graph URI
#[arg(long)]
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
commit_id: String,
#[arg(long)]
json: bool,
},
}
#[derive(Debug, Subcommand)]
pub(crate) enum PolicyCommand {
/// Validate policy YAML and compiled Cedar policy state
Validate {
#[arg(long)]
config: Option<PathBuf>,
},
/// Run declarative policy tests from policy.tests.yaml
Test {
#[arg(long)]
config: Option<PathBuf>,
},
/// Explain one policy decision locally
Explain {
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
actor: String,
#[arg(long)]
action: PolicyAction,
#[arg(long)]
branch: Option<String>,
#[arg(long = "target-branch")]
target_branch: Option<String>,
},
}
#[derive(Debug, Subcommand)]
pub(crate) enum QueriesCommand {
/// Type-check the stored-query registry against the live schema.
///
/// Distinct from `omnigraph lint` (which lints one `.gq` file):
/// this validates the whole `queries:` registry — opening the graph
/// to read its schema and confirming every stored query still
/// type-checks. Exits non-zero on any breakage.
Validate {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
json: bool,
},
/// List the registered stored queries (name, MCP exposure, params).
List {
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
json: bool,
},
}
#[derive(Debug, Args, Clone)]
pub(crate) struct ParamsArgs {
#[arg(long, conflicts_with = "params_file")]
pub(crate) params: Option<String>,
#[arg(long, conflicts_with = "params")]
pub(crate) params_file: Option<PathBuf>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, ValueEnum)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CliLoadMode {
Overwrite,
Append,
Merge,
}
impl From<CliLoadMode> for LoadMode {
fn from(value: CliLoadMode) -> Self {
match value {
CliLoadMode::Overwrite => LoadMode::Overwrite,
CliLoadMode::Append => LoadMode::Append,
CliLoadMode::Merge => LoadMode::Merge,
}
}
}
impl CliLoadMode {
pub(crate) fn as_str(self) -> &'static str {
match self {
CliLoadMode::Overwrite => "overwrite",
CliLoadMode::Append => "append",
CliLoadMode::Merge => "merge",
}
}
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,416 @@
//! In-source test suite for the CLI binary (moved verbatim from
//! main.rs; `use super::*` resolves through the #[path] declaration).
use std::fs;
use super::{
DEFAULT_BEARER_TOKEN_ENV, apply_bearer_token, bearer_token_from_env_file,
legacy_change_request_body, load_cli_config, load_env_file_into_process,
normalize_bearer_token, parse_env_assignment, resolve_cli_graph, resolve_policy_context,
resolve_remote_bearer_token,
};
use omnigraph_server::load_config;
use reqwest::header::AUTHORIZATION;
use serde_json::json;
use tempfile::tempdir;
#[test]
fn legacy_change_request_body_uses_legacy_field_names() {
// `execute_change_remote` hits `POST /change`, which old
// `omnigraph-server` builds deserialize as `ChangeRequest` with
// **required** `query_source` and optional `query_name` keys.
// Newer servers accept both spellings via serde alias, but a
// newer CLI must still emit the legacy keys on the wire so it
// can talk to an old server during a rolling upgrade.
let body = legacy_change_request_body(
"query insert_person($n: String) { insert Person { name: $n } }",
Some("insert_person"),
"main",
Some(&json!({ "n": "Alice" })),
);
assert_eq!(
body["query_source"].as_str(),
Some("query insert_person($n: String) { insert Person { name: $n } }"),
);
assert_eq!(body["query_name"].as_str(), Some("insert_person"));
assert_eq!(body["branch"].as_str(), Some("main"));
assert_eq!(body["params"]["n"].as_str(), Some("Alice"));
// Crucially, the **new** field names must NOT appear -- old
// servers would silently treat them as unknown fields and then
// fail on missing required `query_source`.
assert!(
body.get("query").is_none(),
"legacy /change body must not carry the renamed `query` key; got {body}"
);
assert!(
body.get("name").is_none(),
"legacy /change body must not carry the renamed `name` key; got {body}"
);
}
#[test]
fn legacy_change_request_body_omits_optional_fields_when_unset() {
let body = legacy_change_request_body(
"query find() { match { $p: Person } return { $p.name } }",
None,
"main",
None,
);
assert_eq!(body["branch"].as_str(), Some("main"));
assert!(body.get("query_name").is_none());
assert!(body.get("params").is_none());
}
#[test]
fn apply_bearer_token_adds_header_when_configured() {
let client = reqwest::Client::new();
let request = apply_bearer_token(client.get("http://example.com"), Some("demo-token"))
.build()
.unwrap();
assert_eq!(
request
.headers()
.get(AUTHORIZATION)
.and_then(|value| value.to_str().ok()),
Some("Bearer demo-token")
);
}
#[test]
fn apply_bearer_token_leaves_request_unchanged_when_not_configured() {
let client = reqwest::Client::new();
let request = apply_bearer_token(client.get("http://example.com"), None)
.build()
.unwrap();
assert!(request.headers().get(AUTHORIZATION).is_none());
}
#[test]
fn normalize_bearer_token_trims_and_filters_blank_values() {
assert_eq!(normalize_bearer_token(None), None);
assert_eq!(normalize_bearer_token(Some(" ".to_string())), None);
assert_eq!(
normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(),
Some("demo-token")
);
}
#[test]
fn parse_env_assignment_supports_plain_and_exported_values() {
assert_eq!(
parse_env_assignment("DEMO_TOKEN=demo-token"),
Some(("DEMO_TOKEN".to_string(), "demo-token".to_string()))
);
assert_eq!(
parse_env_assignment("export DEMO_TOKEN=\"quoted-token\""),
Some(("DEMO_TOKEN".to_string(), "quoted-token".to_string()))
);
assert_eq!(parse_env_assignment("# comment"), None);
assert_eq!(parse_env_assignment(" "), None);
}
#[test]
fn bearer_token_from_env_file_reads_named_value() {
let temp = tempdir().unwrap();
let env_file = temp.path().join(".env.omni");
fs::write(
&env_file,
"FIRST=ignore\nexport DEMO_TOKEN=\" demo-token \"\n",
)
.unwrap();
assert_eq!(
bearer_token_from_env_file(&env_file, "DEMO_TOKEN")
.unwrap()
.as_deref(),
Some("demo-token")
);
assert_eq!(
bearer_token_from_env_file(&env_file, "MISSING").unwrap(),
None
);
}
#[test]
fn load_env_file_into_process_sets_missing_values_without_overriding_existing_ones() {
let temp = tempdir().unwrap();
let env_file = temp.path().join(".env.omni");
fs::write(
&env_file,
"AUTOLOAD_ONLY=from-file\nAUTOLOAD_PRESET=from-file\n",
)
.unwrap();
let missing_key = "AUTOLOAD_ONLY";
let preset_key = "AUTOLOAD_PRESET";
let previous_missing = std::env::var_os(missing_key);
let previous_preset = std::env::var_os(preset_key);
unsafe {
std::env::remove_var(missing_key);
std::env::set_var(preset_key, "from-env");
}
load_env_file_into_process(&env_file).unwrap();
assert_eq!(std::env::var(missing_key).unwrap(), "from-file");
assert_eq!(std::env::var(preset_key).unwrap(), "from-env");
unsafe {
if let Some(value) = previous_missing {
std::env::set_var(missing_key, value);
} else {
std::env::remove_var(missing_key);
}
if let Some(value) = previous_preset {
std::env::set_var(preset_key, value);
} else {
std::env::remove_var(preset_key);
}
}
}
#[test]
fn resolve_remote_bearer_token_uses_scoped_env_file_with_global_fallback() {
let temp = tempdir().unwrap();
fs::write(
temp.path().join("omnigraph.yaml"),
r#"
graphs:
demo:
uri: https://example.com
bearer_token_env: DEMO_TOKEN
auth:
env_file: .env.omni
cli:
graph: demo
"#,
)
.unwrap();
fs::write(
temp.path().join(".env.omni"),
"DEMO_TOKEN=scoped-token\nOMNIGRAPH_BEARER_TOKEN=global-token\n",
)
.unwrap();
let previous = std::env::var_os(DEFAULT_BEARER_TOKEN_ENV);
unsafe {
std::env::remove_var(DEFAULT_BEARER_TOKEN_ENV);
}
let config_path = temp.path().join("omnigraph.yaml");
let config = load_config(Some(&config_path)).unwrap();
assert_eq!(
resolve_remote_bearer_token(&config, None, Some("demo"))
.unwrap()
.as_deref(),
Some("scoped-token")
);
assert_eq!(
resolve_remote_bearer_token(&config, Some("https://override.example.com"), None)
.unwrap()
.as_deref(),
Some("global-token")
);
unsafe {
if let Some(value) = previous {
std::env::set_var(DEFAULT_BEARER_TOKEN_ENV, value);
} else {
std::env::remove_var(DEFAULT_BEARER_TOKEN_ENV);
}
}
}
#[test]
fn load_cli_config_autoloads_env_file_into_process() {
let temp = tempdir().unwrap();
fs::write(
temp.path().join("omnigraph.yaml"),
r#"
auth:
env_file: .env.omni
graphs:
demo:
uri: s3://bucket/prefix
"#,
)
.unwrap();
fs::write(
temp.path().join(".env.omni"),
"AUTOLOAD_FROM_CONFIG=loaded\n",
)
.unwrap();
let key = "AUTOLOAD_FROM_CONFIG";
let previous = std::env::var_os(key);
unsafe {
std::env::remove_var(key);
}
let config_path = temp.path().join("omnigraph.yaml");
let config = load_cli_config(Some(&config_path)).unwrap();
assert_eq!(
config.resolve_target_uri(None, Some("demo"), None).unwrap(),
"s3://bucket/prefix"
);
assert_eq!(std::env::var(key).unwrap(), "loaded");
unsafe {
if let Some(value) = previous {
std::env::set_var(key, value);
} else {
std::env::remove_var(key);
}
}
}
#[test]
fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri()
{
let temp = tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
project:
name: misleading-project
graphs:
local:
uri: /tmp/local-policy-graph.omni
policy:
file: ./policy.yaml
cli:
graph: local
"#,
)
.unwrap();
let config = load_config(Some(&config_path)).unwrap();
let context = resolve_policy_context(&config).unwrap();
assert_eq!(context.graph_id, "local");
}
#[test]
fn graph_identity_resolve_policy_context_server_graph_uses_graph_key_when_cli_graph_absent() {
let temp = tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
project:
name: misleading-project
graphs:
local:
uri: /tmp/local-policy-graph.omni
policy:
file: ./server-policy.yaml
server:
graph: local
"#,
)
.unwrap();
let config = load_config(Some(&config_path)).unwrap();
let context = resolve_policy_context(&config).unwrap();
assert_eq!(context.graph_id, "local");
assert!(context.policy_file.ends_with("server-policy.yaml"));
}
#[test]
fn graph_identity_resolve_policy_context_anonymous_uses_top_level_default_identity() {
let temp = tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
project:
name: misleading-project
graphs:
local:
uri: /tmp/local-policy-graph.omni
policy:
file: ./top-policy.yaml
"#,
)
.unwrap();
let config = load_config(Some(&config_path)).unwrap();
let context = resolve_policy_context(&config).unwrap();
assert_eq!(context.graph_id, "default");
assert!(context.policy_file.ends_with("top-policy.yaml"));
}
#[test]
fn graph_identity_resolve_cli_graph_named_target_uses_graph_key_not_project_name_or_uri() {
let temp = tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
project:
name: misleading-project
graphs:
prod:
uri: s3://bucket/prod-graph/
policy:
file: ./prod-policy.yaml
"#,
)
.unwrap();
let config = load_config(Some(&config_path)).unwrap();
let graph = resolve_cli_graph(&config, None, Some("prod")).unwrap();
assert_eq!(graph.selected(), Some("prod"));
assert_eq!(graph.graph_id, "prod");
assert_eq!(graph.uri, "s3://bucket/prod-graph/");
}
#[test]
fn graph_identity_resolve_cli_graph_positional_uri_uses_anonymous_normalized_uri() {
let temp = tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
project:
name: misleading-project
graphs:
local:
uri: /tmp/configured-graph.omni
policy:
file: ./policy.yaml
cli:
graph: local
"#,
)
.unwrap();
let config = load_config(Some(&config_path)).unwrap();
let local_graph_path = temp.path().join("explicit-graph.omni");
let local_graph = resolve_cli_graph(
&config,
Some(format!("file://{}", local_graph_path.display())),
None,
)
.unwrap();
assert_eq!(local_graph.selected(), None);
assert_eq!(
local_graph.graph_id,
local_graph_path.to_string_lossy().as_ref()
);
assert_eq!(local_graph.policy_file, None);
let s3_graph = resolve_cli_graph(
&config,
Some("s3://bucket/anonymous-graph/".to_string()),
None,
)
.unwrap();
assert_eq!(s3_graph.selected(), None);
assert_eq!(s3_graph.graph_id, "s3://bucket/anonymous-graph");
assert_eq!(s3_graph.policy_file, None);
}

View file

@ -0,0 +1,830 @@
//! Human/JSON output formatting for every command (moved verbatim from
//! main.rs in the modularization).
use super::*;
#[derive(Debug, Serialize)]
pub(crate) struct LoadOutput {
pub(crate) uri: String,
pub(crate) branch: String,
pub(crate) mode: &'static str,
/// Present only when `--from` was given; echoes the requested base.
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) base_branch: Option<String>,
pub(crate) branch_created: bool,
pub(crate) nodes_loaded: usize,
pub(crate) edges_loaded: usize,
pub(crate) node_types_loaded: usize,
pub(crate) edge_types_loaded: usize,
}
pub(crate) fn load_output_from_tables(
uri: &str,
branch: &str,
mode: CliLoadMode,
output: &IngestOutput,
) -> LoadOutput {
let mut nodes_loaded = 0;
let mut edges_loaded = 0;
let mut node_types_loaded = 0;
let mut edge_types_loaded = 0;
for table in &output.tables {
if table.table_key.starts_with("node:") {
nodes_loaded += table.rows_loaded;
node_types_loaded += 1;
} else if table.table_key.starts_with("edge:") {
edges_loaded += table.rows_loaded;
edge_types_loaded += 1;
}
}
LoadOutput {
uri: uri.to_string(),
branch: branch.to_string(),
mode: mode.as_str(),
base_branch: output.base_branch.clone(),
branch_created: output.branch_created,
nodes_loaded,
edges_loaded,
node_types_loaded,
edge_types_loaded,
}
}
#[derive(Debug, Serialize)]
pub(crate) struct SchemaPlanOutput<'a> {
pub(crate) uri: &'a str,
pub(crate) supported: bool,
pub(crate) step_count: usize,
pub(crate) steps: &'a [SchemaMigrationStep],
}
pub(crate) fn print_schema_apply_human(output: &SchemaApplyOutput) {
println!("schema apply for {}", output.uri);
println!("supported: {}", if output.supported { "yes" } else { "no" });
println!("applied: {}", if output.applied { "yes" } else { "no" });
println!("manifest_version: {}", output.manifest_version);
if output.steps.is_empty() {
println!("no schema changes");
return;
}
for step in &output.steps {
println!("- {}", render_schema_plan_step(step));
}
}
pub(crate) fn query_kind_label(kind: QueryLintQueryKind) -> &'static str {
match kind {
QueryLintQueryKind::Read => "read",
QueryLintQueryKind::Mutation => "mutation",
}
}
pub(crate) fn severity_label(severity: QueryLintSeverity) -> &'static str {
match severity {
QueryLintSeverity::Error => "ERROR",
QueryLintSeverity::Warning => "WARN ",
QueryLintSeverity::Info => "INFO ",
}
}
pub(crate) fn print_query_lint_human(output: &QueryLintOutput) {
for result in &output.results {
match result.status {
QueryLintStatus::Ok => {
println!(
"OK query `{}` ({})",
result.name,
query_kind_label(result.kind)
);
}
QueryLintStatus::Error => {
println!(
"ERROR query `{}`: {}",
result.name,
result.error.as_deref().unwrap_or("unknown error")
);
}
}
for warning in &result.warnings {
println!("WARN query `{}`: {}", result.name, warning);
}
}
for finding in &output.findings {
println!("{} {}", severity_label(finding.severity), finding.message);
}
println!(
"INFO Lint complete: {} queries processed ({} error(s), {} warning(s), {} info item(s))",
output.queries_processed, output.errors, output.warnings, output.infos
);
}
pub(crate) fn finish_query_lint(output: &QueryLintOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_query_lint_human(output);
}
if output.status == QueryLintStatus::Error {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
pub(crate) fn print_json<T: Serialize>(value: &T) -> Result<()> {
println!("{}", serde_json::to_string_pretty(value)?);
Ok(())
}
pub(crate) fn print_cluster_validate_human(output: &ValidateOutput) {
if output.ok {
println!(
"cluster config valid: {} resource(s), {} dependency edge(s)",
output.resources.len(),
output.dependencies.len()
);
} else {
println!("cluster config invalid");
}
print_cluster_diagnostics(&output.diagnostics);
}
pub(crate) fn print_cluster_plan_human(output: &PlanOutput) {
if output.ok {
println!(
"cluster plan: {} change(s), {} approval gate(s)",
output.changes.len(),
output.approvals_required.len()
);
for change in &output.changes {
let bindings = if change.binding_change { " [bindings]" } else { "" };
println!(" {:?} {}{bindings}", change.operation, change.resource);
if let Some(migration) = &change.migration {
if !migration.supported {
println!(" migration UNSUPPORTED:");
}
for step in &migration.steps {
println!(
" {}",
serde_json::to_string(step).unwrap_or_else(|_| format!("{step:?}"))
);
}
}
}
if output.changes.is_empty() {
println!(" no changes");
}
} else {
println!("cluster plan failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
pub(crate) fn print_cluster_apply_human(output: &ApplyOutput) {
if output.ok {
println!(
"cluster apply: {} applied, {} deferred/blocked",
output.applied_count, output.deferred_count
);
} else {
println!("cluster apply failed");
}
// The change list prints on failure too: an operator debugging a partial
// apply (payload or state-write error) needs to see what was attempted.
print_cluster_apply_changes(&output.changes);
if output.ok {
let state = &output.state_observations;
println!(
" state: revision {}, converged: {}, written: {}",
state.state_revision, output.converged, output.state_written
);
println!(" note: cluster-booted servers (--cluster) serve this on their next restart; omnigraph.yaml deployments are unaffected");
}
print_cluster_diagnostics(&output.diagnostics);
}
pub(crate) fn print_cluster_apply_changes(changes: &[omnigraph_cluster::PlanChange]) {
for change in changes {
let bindings = if change.binding_change { " [bindings]" } else { "" };
match (&change.disposition, change.reason.as_deref()) {
(Some(disposition), Some(reason)) => println!(
" {:?} {}{bindings} [{disposition:?}: {reason}]",
change.operation, change.resource
),
(Some(disposition), None) => println!(
" {:?} {}{bindings} [{disposition:?}]",
change.operation, change.resource
),
_ => println!(" {:?} {}{bindings}", change.operation, change.resource),
}
}
if changes.is_empty() {
println!(" no changes");
}
}
pub(crate) fn print_cluster_status_human(output: &StatusOutput) {
if output.ok {
let state = &output.state_observations;
if state.state_found {
println!(
"cluster state: revision {}, {} resource(s)",
state.state_revision, state.resource_count
);
if let Some(digest) = state.applied_config_digest.as_deref() {
println!(" applied config: {digest}");
}
if state.locked {
println!(" lock: held{}", cluster_lock_summary(state));
} else {
println!(" lock: not held");
}
} else {
println!("cluster state missing");
}
} else {
println!("cluster status failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
pub(crate) fn print_cluster_state_sync_human(output: &StateSyncOutput) {
let operation = match output.operation {
omnigraph_cluster::StateSyncOperation::Refresh => "refresh",
omnigraph_cluster::StateSyncOperation::Import => "import",
};
if output.ok {
let state = &output.state_observations;
println!(
"cluster {operation}: revision {}, {} resource(s)",
state.state_revision, state.resource_count
);
if let Some(cas) = state.state_cas.as_deref() {
println!(" state_cas: {cas}");
}
if state.locked {
println!(" lock: acquired{}", cluster_lock_summary(state));
} else {
println!(" lock: not acquired");
}
} else {
println!("cluster {operation} failed");
}
print_cluster_diagnostics(&output.diagnostics);
}
pub(crate) fn print_cluster_force_unlock_human(output: &ForceUnlockOutput) {
if output.ok {
if output.lock_removed {
println!(
"cluster force-unlock: removed lock{}",
cluster_lock_summary(&output.state_observations)
);
} else {
println!("cluster force-unlock: no lock removed");
}
} else {
println!("cluster force-unlock failed");
if output.state_observations.locked {
println!(
" lock: held{}",
cluster_lock_summary(&output.state_observations)
);
}
}
print_cluster_diagnostics(&output.diagnostics);
}
pub(crate) fn cluster_lock_summary(state: &omnigraph_cluster::StateObservations) -> String {
let Some(lock_id) = state.lock_id.as_deref() else {
return String::new();
};
let mut parts = vec![format!("id={lock_id}")];
if let Some(operation) = state.lock_operation.as_deref() {
parts.push(format!("operation={operation}"));
}
if let Some(pid) = state.lock_pid {
parts.push(format!("pid={pid}"));
}
if let Some(created_at) = state.lock_created_at.as_deref() {
parts.push(format!("created_at={created_at}"));
}
if let Some(age_seconds) = state.lock_age_seconds {
parts.push(format!("age_seconds={age_seconds}"));
}
format!(" ({})", parts.join(", "))
}
pub(crate) fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) {
for diagnostic in diagnostics {
let label = match diagnostic.severity {
DiagnosticSeverity::Error => "ERROR",
DiagnosticSeverity::Warning => "WARN ",
};
println!(
"{label} {} {}: {}",
diagnostic.code, diagnostic.path, diagnostic.message
);
}
}
pub(crate) fn finish_cluster_validate(output: &ValidateOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_validate_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
pub(crate) fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_plan_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
pub(crate) fn finish_cluster_apply(output: &ApplyOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_apply_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
pub(crate) fn finish_cluster_approve(output: &ApproveOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else if output.ok {
println!(
"cluster approve: {} {} approved by {} (approval {})",
output
.operation
.as_ref()
.map(|operation| format!("{operation:?}").to_lowercase())
.unwrap_or_default(),
output.resource.as_deref().unwrap_or("?"),
output.approved_by.as_deref().unwrap_or("?"),
output.approval_id.as_deref().unwrap_or("?"),
);
print_cluster_diagnostics(&output.diagnostics);
} else {
println!("cluster approve failed");
print_cluster_diagnostics(&output.diagnostics);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
pub(crate) fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_status_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
pub(crate) fn finish_cluster_state_sync(output: &StateSyncOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_state_sync_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
pub(crate) fn finish_cluster_force_unlock(output: &ForceUnlockOutput, json: bool) -> Result<()> {
if json {
print_json(output)?;
} else {
print_cluster_force_unlock_human(output);
}
if !output.ok {
io::stdout().flush()?;
std::process::exit(1);
}
Ok(())
}
pub(crate) fn print_load_human(payload: &LoadOutput) {
println!(
"loaded {} on branch {} with {}: {} nodes across {} node types, {} edges across {} edge types",
payload.uri,
payload.branch,
payload.mode,
payload.nodes_loaded,
payload.node_types_loaded,
payload.edges_loaded,
payload.edge_types_loaded
);
if payload.branch_created {
if let Some(base) = &payload.base_branch {
println!("branch {} created from {}", payload.branch, base);
}
}
}
pub(crate) fn print_ingest_human(output: &IngestOutput) {
println!(
"ingested {} into branch {} from {} with {} ({})",
output.uri,
output.branch,
output.base_branch.as_deref().unwrap_or("main"),
output.mode.as_str(),
if output.branch_created {
"branch created"
} else {
"branch exists"
}
);
for table in &output.tables {
println!("{} rows_loaded={}", table.table_key, table.rows_loaded);
}
if let Some(actor_id) = &output.actor_id {
println!("actor_id: {}", actor_id);
}
}
pub(crate) fn print_schema_plan_human(uri: &str, plan: &SchemaMigrationPlan) {
println!("schema plan for {}", uri);
println!("supported: {}", if plan.supported { "yes" } else { "no" });
if plan.steps.is_empty() {
println!("no schema changes");
return;
}
for step in &plan.steps {
println!("- {}", render_schema_plan_step(step));
}
}
pub(crate) fn render_schema_plan_step(step: &SchemaMigrationStep) -> String {
match step {
SchemaMigrationStep::AddType { type_kind, name } => {
format!("add {} type '{}'", schema_type_kind_label(*type_kind), name)
}
SchemaMigrationStep::RenameType {
type_kind,
from,
to,
} => format!(
"rename {} type '{}' -> '{}'",
schema_type_kind_label(*type_kind),
from,
to
),
SchemaMigrationStep::AddProperty {
type_kind,
type_name,
property_name,
property_type,
} => format!(
"add property '{}.{}' ({}) on {} '{}'",
type_name,
property_name,
render_prop_type(property_type),
schema_type_kind_label(*type_kind),
type_name
),
SchemaMigrationStep::RenameProperty {
type_kind,
type_name,
from,
to,
} => format!(
"rename property '{}.{}' -> '{}.{}' on {} '{}'",
type_name,
from,
type_name,
to,
schema_type_kind_label(*type_kind),
type_name
),
SchemaMigrationStep::AddConstraint {
type_kind,
type_name,
constraint,
} => format!(
"add constraint {} on {} '{}'",
render_constraint(constraint),
schema_type_kind_label(*type_kind),
type_name
),
SchemaMigrationStep::UpdateTypeMetadata {
type_kind,
name,
annotations,
} => format!(
"update metadata on {} '{}' ({})",
schema_type_kind_label(*type_kind),
name,
render_annotations(annotations)
),
SchemaMigrationStep::UpdatePropertyMetadata {
type_kind,
type_name,
property_name,
annotations,
} => format!(
"update metadata on property '{}.{}' of {} '{}' ({})",
type_name,
property_name,
schema_type_kind_label(*type_kind),
type_name,
render_annotations(annotations)
),
SchemaMigrationStep::DropType {
type_kind,
name,
mode,
} => format!(
"drop {} type '{}' ({} mode)",
schema_type_kind_label(*type_kind),
name,
drop_mode_label(*mode),
),
SchemaMigrationStep::DropProperty {
type_kind,
type_name,
property_name,
mode,
} => format!(
"drop property '{}.{}' of {} '{}' ({} mode)",
type_name,
property_name,
schema_type_kind_label(*type_kind),
type_name,
drop_mode_label(*mode),
),
SchemaMigrationStep::UnsupportedChange { entity, reason, .. } => {
// When a schema-lint code is attached, render code + tier
// so operators see at-a-glance the kind of risk (destructive
// / validated / safe) — not just the rule identifier.
// Reach the diagnostic via the `diagnostic()` helper so the
// CLI doesn't need to know how the lookup works.
match step.diagnostic() {
Some(diag) => format!(
"unsupported change on {} [{}, {}]: {}",
entity,
diag.code,
schema_lint_tier_label(diag.tier),
reason,
),
None => format!("unsupported change on {}: {}", entity, reason),
}
}
}
}
pub(crate) fn schema_type_kind_label(kind: omnigraph_compiler::SchemaTypeKind) -> &'static str {
match kind {
omnigraph_compiler::SchemaTypeKind::Interface => "interface",
omnigraph_compiler::SchemaTypeKind::Node => "node",
omnigraph_compiler::SchemaTypeKind::Edge => "edge",
}
}
pub(crate) fn schema_lint_tier_label(tier: omnigraph_compiler::SafetyTier) -> &'static str {
match tier {
omnigraph_compiler::SafetyTier::Safe => "safe",
omnigraph_compiler::SafetyTier::Validated => "validated",
omnigraph_compiler::SafetyTier::Destructive => "destructive",
}
}
pub(crate) fn drop_mode_label(mode: omnigraph_compiler::DropMode) -> &'static str {
match mode {
omnigraph_compiler::DropMode::Soft => "soft",
omnigraph_compiler::DropMode::Hard => "hard",
}
}
pub(crate) fn render_prop_type(prop_type: &omnigraph_compiler::PropType) -> String {
let base = if let Some(values) = &prop_type.enum_values {
format!("Enum({})", values.join("|"))
} else {
prop_type.scalar.to_string()
};
let base = if prop_type.list {
format!("[{}]", base)
} else {
base
};
if prop_type.nullable {
format!("{}?", base)
} else {
base
}
}
pub(crate) fn render_constraint(constraint: &omnigraph_compiler::schema::ast::Constraint) -> String {
match constraint {
omnigraph_compiler::schema::ast::Constraint::Key(columns) => {
format!("@key({})", columns.join(", "))
}
omnigraph_compiler::schema::ast::Constraint::Unique(columns) => {
format!("@unique({})", columns.join(", "))
}
omnigraph_compiler::schema::ast::Constraint::Index(columns) => {
format!("@index({})", columns.join(", "))
}
omnigraph_compiler::schema::ast::Constraint::Range { property, min, max } => {
format!("@range({}, {:?}, {:?})", property, min, max)
}
omnigraph_compiler::schema::ast::Constraint::Check { property, pattern } => {
format!("@check({}, {:?})", property, pattern)
}
}
}
pub(crate) fn render_annotations(annotations: &[omnigraph_compiler::schema::ast::Annotation]) -> String {
annotations
.iter()
.map(|annotation| match &annotation.value {
Some(value) => format!("@{}({})", annotation.name, value),
None => format!("@{}", annotation.name),
})
.collect::<Vec<_>>()
.join(", ")
}
pub(crate) fn print_embed_human(output: &EmbedOutput) {
println!(
"embedded {} rows (selected {}, cleaned {}) from {} -> {} [{} {}d]",
output.embedded_rows,
output.selected_rows,
output.cleaned_rows,
output.input,
output.output,
output.mode,
output.dimension
);
}
pub(crate) fn print_snapshot_human(branch: &str, manifest_version: u64, entries: &[SnapshotTableOutput]) {
println!("branch: {}", branch);
println!("manifest_version: {}", manifest_version);
for entry in entries {
println!(
"{} v{} branch={} rows={}",
entry.table_key,
entry.table_version,
entry.table_branch.as_deref().unwrap_or("main"),
entry.row_count
);
}
}
pub(crate) fn print_read_output(
output: &ReadOutput,
format: ReadOutputFormat,
config: &OmnigraphConfig,
) -> Result<()> {
println!(
"{}",
render_read(
output,
format,
&ReadRenderOptions {
max_column_width: config.table_max_column_width(),
cell_layout: config.table_cell_layout(),
},
)?
);
Ok(())
}
pub(crate) fn print_change_human(output: &ChangeOutput) {
println!(
"changed {} via {}: {} nodes, {} edges",
output.branch, output.query_name, output.affected_nodes, output.affected_edges
);
if let Some(actor_id) = &output.actor_id {
println!("actor_id: {}", actor_id);
}
}
pub(crate) fn print_commit_list_human(commits: &[CommitOutput]) {
for commit in commits {
let branch = commit.manifest_branch.as_deref().unwrap_or("main");
println!(
"{} branch={} version={}{}",
commit.graph_commit_id,
branch,
commit.manifest_version,
commit
.actor_id
.as_deref()
.map(|actor| format!(" actor={}", actor))
.unwrap_or_default()
);
}
}
pub(crate) fn print_commit_human(commit: &CommitOutput) {
println!("graph_commit_id: {}", commit.graph_commit_id);
println!(
"manifest_branch: {}",
commit.manifest_branch.as_deref().unwrap_or("main")
);
println!("manifest_version: {}", commit.manifest_version);
if let Some(parent_commit_id) = &commit.parent_commit_id {
println!("parent_commit_id: {}", parent_commit_id);
}
if let Some(merged_parent_commit_id) = &commit.merged_parent_commit_id {
println!("merged_parent_commit_id: {}", merged_parent_commit_id);
}
if let Some(actor_id) = &commit.actor_id {
println!("actor_id: {}", actor_id);
}
println!("created_at: {}", commit.created_at);
}
pub(crate) fn print_policy_explain(decision: &PolicyDecision, actor_id: &str, request: &PolicyRequest) {
println!(
"decision: {}",
if decision.allowed { "allow" } else { "deny" }
);
println!("actor: {}", actor_id);
println!("action: {}", request.action);
if let Some(branch) = &request.branch {
println!("branch: {}", branch);
}
if let Some(target_branch) = &request.target_branch {
println!("target_branch: {}", target_branch);
}
if let Some(rule_id) = &decision.matched_rule_id {
println!("matched_rule: {}", rule_id);
}
println!("message: {}", decision.message);
}
pub(crate) fn yaml_string(value: &str) -> String {
format!("'{}'", value.replace('\'', "''"))
}
#[derive(serde::Serialize)]
pub(crate) struct QueriesIssue {
pub(crate) query: String,
pub(crate) message: String,
}
#[derive(serde::Serialize)]
pub(crate) struct QueriesValidateOutput {
pub(crate) ok: bool,
pub(crate) breakages: Vec<QueriesIssue>,
pub(crate) warnings: Vec<QueriesIssue>,
}
#[derive(serde::Serialize)]
pub(crate) struct QueriesParam {
pub(crate) name: String,
#[serde(rename = "type")]
pub(crate) type_name: String,
pub(crate) nullable: bool,
}
#[derive(serde::Serialize)]
pub(crate) struct QueriesListItem {
pub(crate) name: String,
pub(crate) mcp_expose: bool,
pub(crate) tool_name: Option<String>,
pub(crate) mutation: bool,
pub(crate) params: Vec<QueriesParam>,
}
#[derive(serde::Serialize)]
pub(crate) struct QueriesListOutput {
pub(crate) queries: Vec<QueriesListItem>,
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,884 @@
//! Cluster command surface: validate/plan/apply/approve/status/sync/force-unlock.
//! Moved verbatim from tests/cli.rs in the modularization.
use std::fs;
use tempfile::tempdir;
mod support;
use support::*;
#[test]
fn cluster_validate_config_success() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path()),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("cluster config valid"), "{stdout}");
}
#[test]
fn cluster_validate_json_is_stable() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert!(json["resource_digests"]["graph.knowledge"].is_string());
assert!(json["resource_digests"]["query.knowledge.find_person"].is_string());
assert_eq!(json["dependencies"][0]["from"], "policy.base");
assert_eq!(json["dependencies"][0]["to"], "graph.knowledge");
}
#[test]
fn cluster_plan_json_reads_inferred_local_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"applied_revision": {
"config_digest": "old",
"resources": {
"graph.knowledge": { "digest": "old-graph" },
"policy.old": { "digest": "old-policy" }
}
}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_found"], true);
assert!(
json["changes"]
.as_array()
.unwrap()
.iter()
.any(|change| change["resource"] == "policy.old" && change["operation"] == "delete"),
"plan should read state and delete stale resources: {json}"
);
}
#[test]
fn cluster_status_json_reports_missing_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("status")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_found"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_missing"),
"missing state should be a warning diagnostic: {json}"
);
}
#[test]
fn cluster_status_json_reports_lock_metadata() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_lock(temp.path(), "held-lock", "refresh");
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("status")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["locked"], true);
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
assert_eq!(json["state_observations"]["lock_operation"], "refresh");
assert_eq!(json["state_observations"]["lock_pid"], 123);
assert_eq!(
json["state_observations"]["lock_created_at"],
"1970-01-01T00:00:00Z"
);
assert!(json["state_observations"]["lock_age_seconds"].is_number());
}
#[test]
fn cluster_status_json_reports_extended_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"state_revision": 5,
"applied_revision": {
"config_digest": "applied",
"resources": {
"graph.knowledge": { "digest": "graph-digest" }
}
},
"resource_statuses": {
"graph.knowledge": { "status": "applied", "conditions": ["healthy"] }
},
"approval_records": {},
"recovery_records": {},
"observations": {}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("status")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_revision"], 5);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["resource_digests"]["graph.knowledge"], "graph-digest");
assert_eq!(
json["resource_statuses"]["graph.knowledge"]["status"],
"applied"
);
}
#[test]
fn cluster_plan_json_includes_state_cas_revision_and_lock_observation() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"state_revision": 9,
"applied_revision": {
"config_digest": "old",
"resources": {
"graph.knowledge": { "digest": "old-graph" }
}
}
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["state_observations"]["state_revision"], 9);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["state_observations"]["locked"], false);
assert_eq!(json["state_observations"]["lock_acquired"], true);
assert!(json["state_observations"]["acquired_lock_id"].is_string());
assert!(!state_dir.join("lock.json").exists());
}
#[test]
fn cluster_plan_locked_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_lock(temp.path(), "held-lock", "plan");
let output = output_failure(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert_eq!(json["state_observations"]["locked"], true);
assert_eq!(json["state_observations"]["lock_acquired"], false);
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
assert_eq!(json["state_observations"]["lock_operation"], "plan");
assert_eq!(json["state_observations"]["lock_pid"], 123);
assert_eq!(
json["state_observations"]["lock_created_at"],
"1970-01-01T00:00:00Z"
);
assert!(json["state_observations"]["lock_age_seconds"].is_number());
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held"
&& diagnostic["message"]
.as_str()
.unwrap()
.contains("force-unlock held-lock")),
"locked state should produce a useful diagnostic: {json}"
);
}
#[test]
fn cluster_force_unlock_json_removes_lock() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_lock(temp.path(), "held-lock", "plan");
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("force-unlock")
.arg("held-lock")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["lock_removed"], true);
assert_eq!(json["state_observations"]["lock_id"], "held-lock");
assert_eq!(json["state_observations"]["lock_operation"], "plan");
assert!(!temp.path().join("__cluster/lock.json").exists());
}
#[test]
fn cluster_force_unlock_wrong_id_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_lock(temp.path(), "held-lock", "plan");
let json = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("force-unlock")
.arg("other-lock")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], false);
assert_eq!(json["lock_removed"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_id_mismatch")
);
assert!(temp.path().join("__cluster/lock.json").exists());
}
#[test]
fn cluster_locked_plan_then_force_unlock_then_plan_succeeds() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_lock(temp.path(), "held-lock", "plan");
let locked = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(locked["ok"], false);
assert_eq!(locked["state_observations"]["lock_id"], "held-lock");
let unlocked = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("force-unlock")
.arg("held-lock")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(unlocked["lock_removed"], true);
let planned = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("plan")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(planned["ok"], true);
}
#[test]
fn cluster_import_json_bootstraps_missing_state() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("import")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["operation"], "import");
assert_eq!(json["state_observations"]["state_revision"], 1);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["state_observations"]["locked"], false);
assert_eq!(json["state_observations"]["lock_acquired"], true);
assert!(json["state_observations"]["acquired_lock_id"].is_string());
assert!(json["observations"]["graph.knowledge"]["manifest_version"].is_number());
assert_eq!(
json["resource_statuses"]["graph.knowledge"]["status"],
"applied"
);
assert!(temp.path().join("__cluster/state.json").exists());
assert!(!temp.path().join("__cluster/lock.json").exists());
}
#[test]
fn cluster_refresh_json_updates_revision_cas_and_removes_lock() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"
{
"version": 1,
"state_revision": 2,
"applied_revision": { "resources": {} }
}
"#,
)
.unwrap();
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("refresh")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true);
assert_eq!(json["operation"], "refresh");
assert_eq!(json["state_observations"]["state_revision"], 3);
assert!(
json["state_observations"]["state_cas"]
.as_str()
.unwrap()
.starts_with("sha256:")
);
assert_eq!(json["state_observations"]["locked"], false);
assert_eq!(json["state_observations"]["lock_acquired"], true);
assert!(json["state_observations"]["acquired_lock_id"].is_string());
assert!(!state_dir.join("lock.json").exists());
}
#[test]
fn cluster_refresh_missing_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_failure(
cli()
.arg("cluster")
.arg("refresh")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_missing"),
"missing state should produce a useful diagnostic: {json}"
);
}
#[test]
fn cluster_import_existing_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"{"version":1,"applied_revision":{"resources":{}}}"#,
)
.unwrap();
let output = output_failure(
cli()
.arg("cluster")
.arg("import")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_already_exists"),
"existing state should produce a useful diagnostic: {json}"
);
}
#[test]
fn cluster_refresh_and_import_locked_state_exit_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
r#"{"version":1,"applied_revision":{"resources":{}}}"#,
)
.unwrap();
fs::write(
state_dir.join("lock.json"),
r#"{"version":1,"lock_id":"held-lock","operation":"refresh","created_at":"2026-06-08T00:00:00Z","pid":123}"#,
)
.unwrap();
let refresh = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("refresh")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(refresh["state_observations"]["locked"], true);
assert_eq!(refresh["state_observations"]["lock_id"], "held-lock");
assert_eq!(refresh["state_observations"]["lock_acquired"], false);
assert!(
refresh["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held")
);
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let state_dir = temp.path().join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("lock.json"),
r#"{"version":1,"lock_id":"held-lock","operation":"import","created_at":"2026-06-08T00:00:00Z","pid":123}"#,
)
.unwrap();
let imported = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("import")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(imported["state_observations"]["locked"], true);
assert_eq!(imported["state_observations"]["lock_id"], "held-lock");
assert_eq!(imported["state_observations"]["lock_acquired"], false);
assert!(
imported["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held")
);
}
#[test]
fn cluster_validate_invalid_config_exits_nonzero() {
let temp = tempdir().unwrap();
fs::write(
temp.path().join("cluster.yaml"),
"version: 1\ngraphs: {}\npipelines: {}\n",
)
.unwrap();
let output = output_failure(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(temp.path()),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("future_phase_field"), "{stdout}");
}
#[test]
fn cluster_apply_json_applies_query_and_policy() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let validate = write_cluster_applyable_state(temp.path());
let json = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(json["ok"], true, "{json}");
assert_eq!(json["applied_count"], 2, "{json}");
assert_eq!(json["converged"], true, "{json}");
assert_eq!(json["state_written"], true, "{json}");
assert_eq!(
json["resource_statuses"]["query.knowledge.find_person"]["status"],
"applied"
);
let query_digest = validate["resource_digests"]["query.knowledge.find_person"]
.as_str()
.unwrap();
let payload = temp
.path()
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{query_digest}.gq"));
assert!(payload.exists(), "missing payload {}", payload.display());
let state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
)
.unwrap();
assert_eq!(state["state_revision"], 2);
assert_eq!(
state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"],
*query_digest
);
}
#[test]
fn cluster_apply_missing_state_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_failure(
cli()
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_missing"),
"{json}"
);
assert!(!temp.path().join("__cluster/resources").exists());
}
#[test]
fn cluster_apply_locked_exits_nonzero() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_applyable_state(temp.path());
write_cluster_lock(temp.path(), "held-lock", "plan");
let output = output_failure(
cli()
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json"),
);
let json = parse_stdout_json(&output);
assert_eq!(json["ok"], false);
assert!(
json["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "state_lock_held"),
"{json}"
);
assert!(temp.path().join("__cluster/lock.json").exists());
assert!(!temp.path().join("__cluster/resources").exists());
}
#[test]
fn cluster_apply_uses_cli_actor_from_local_config() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
fs::write(
temp.path().join("omnigraph.yaml"),
"cli:\n actor: act-local\n",
)
.unwrap();
// Phase 1: import once (setup, not under test).
let output = cli()
.current_dir(temp.path())
.arg("cluster")
.arg("import")
.arg("--config")
.arg(temp.path())
.output()
.unwrap();
assert!(output.status.success(), "{output:?}");
// Phase 2: apply alone, capturing the echoed actor (idempotent re-runs).
let apply = |extra: &[&str]| {
let mut command = cli();
command.current_dir(temp.path());
for arg in extra {
command.arg(arg);
}
let output = command
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json")
.output()
.unwrap();
let json: serde_json::Value =
serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap();
json["actor"].clone()
};
assert_eq!(apply(&[]), "act-local", "cli.actor is the no-flag default");
assert_eq!(apply(&["--as", "andrew"]), "andrew", "--as overrides cli.actor");
}
#[test]
fn cluster_approve_uses_cli_actor_fallback() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
fs::write(
temp.path().join("omnigraph.yaml"),
"cli:\n actor: act-local\n",
)
.unwrap();
// Converge, then remove the graph so a gated delete is pending.
for command in ["import", "apply"] {
let output = cli()
.current_dir(temp.path())
.arg("cluster")
.arg(command)
.arg("--config")
.arg(temp.path())
.output()
.unwrap();
assert!(output.status.success(), "cluster {command} failed");
}
fs::write(temp.path().join("cluster.yaml"), "version: 1\ngraphs: {}\n").unwrap();
let output = cli()
.current_dir(temp.path())
.arg("cluster")
.arg("approve")
.arg("graph.knowledge")
.arg("--config")
.arg(temp.path())
.arg("--json")
.output()
.unwrap();
assert!(output.status.success(), "{output:?}");
let json: serde_json::Value =
serde_json::from_str(String::from_utf8_lossy(&output.stdout).trim()).unwrap();
assert_eq!(json["approved_by"], "act-local");
// With neither flag nor config: refused with the actionable message.
let bare = tempdir().unwrap();
write_cluster_config_fixture(bare.path());
let output = output_failure(
cli()
.current_dir(bare.path())
.arg("cluster")
.arg("approve")
.arg("graph.knowledge")
.arg("--config")
.arg(bare.path()),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(stderr.contains("--as"), "{stderr}");
assert!(stderr.contains("cli.actor"), "{stderr}");
}
#[test]
fn cluster_commands_ignore_malformed_local_config() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
fs::write(temp.path().join("omnigraph.yaml"), "{{{{ not yaml").unwrap();
for command in ["validate", "plan", "status"] {
let output = cli()
.current_dir(temp.path())
.arg("cluster")
.arg(command)
.arg("--config")
.arg(temp.path())
.arg("--json")
.output()
.unwrap();
assert!(
output.status.success() || command == "plan", // plan warns state-missing pre-import; still must not config-error
"cluster {command} affected by malformed omnigraph.yaml: {output:?}"
);
assert!(
!String::from_utf8_lossy(&output.stderr).contains("omnigraph.yaml"),
"cluster {command} touched omnigraph.yaml"
);
}
// import + apply with an explicit --as: the config is never loaded.
for (command, args) in [("import", vec![]), ("apply", vec!["--as", "andrew"])] {
let mut invocation = cli();
invocation.current_dir(temp.path());
for arg in &args {
invocation.arg(arg);
}
let output = invocation
.arg("cluster")
.arg(command)
.arg("--config")
.arg(temp.path())
.output()
.unwrap();
assert!(
output.status.success(),
"cluster {command} affected by malformed omnigraph.yaml: {}",
String::from_utf8_lossy(&output.stderr)
);
}
// Only the no-flag actor lookup is allowed to fail, and loudly.
let output = output_failure(
cli()
.current_dir(temp.path())
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path()),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("omnigraph.yaml") && stderr.contains("--as"),
"the actor-default config read must fail loudly and actionably: {stderr}"
);
}
#[test]
fn cluster_commands_ignore_conflicting_local_config() {
let baseline = tempdir().unwrap();
write_cluster_config_fixture(baseline.path());
let with_config = tempdir().unwrap();
write_cluster_config_fixture(with_config.path());
fs::write(
with_config.path().join("omnigraph.yaml"),
r#"
server:
bind: 0.0.0.0:9999
graphs:
phantom:
uri: ./phantom.omni
"#,
)
.unwrap();
let validate = |dir: &std::path::Path| {
let output = cli()
.current_dir(dir)
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(dir)
.arg("--json")
.output()
.unwrap();
assert!(output.status.success(), "{output:?}");
serde_json::from_str::<serde_json::Value>(String::from_utf8_lossy(&output.stdout).trim())
.unwrap()
};
let (a, b) = (validate(baseline.path()), validate(with_config.path()));
// Compare the path-free invariants (paths embed each tempdir).
for key in ["ok", "diagnostics", "resource_digests", "dependencies"] {
assert_eq!(a[key], b[key], "conflicting omnigraph.yaml leaked into cluster validate ({key})");
}
let leaked = b.to_string();
assert!(!leaked.contains("phantom") && !leaked.contains("9999"), "{leaked}");
}

View file

@ -0,0 +1,621 @@
//! Cluster lifecycle compositions over the spawned binary (recovery, drift, convergence).
//! Moved verbatim from tests/cli.rs in the modularization.
use std::fs;
use tempfile::tempdir;
mod support;
use support::*;
#[test]
fn cluster_e2e_lifecycle_import_apply_status_refresh_converges() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
assert_eq!(import["state_observations"]["state_revision"], 1);
let plan = cluster_json(temp.path(), "plan");
let changes = plan["changes"].as_array().unwrap();
assert_eq!(changes.len(), 3, "{plan}");
let disposition_of = |resource: &str| {
changes
.iter()
.find(|change| change["resource"] == resource)
.unwrap_or_else(|| panic!("missing change for {resource}: {plan}"))["disposition"]
.clone()
};
assert_eq!(disposition_of("graph.knowledge"), "derived");
assert_eq!(disposition_of("query.knowledge.find_person"), "applied");
assert_eq!(disposition_of("policy.base"), "applied");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["ok"], true, "{apply}");
assert_eq!(apply["applied_count"], 2, "{apply}");
assert_eq!(apply["converged"], true, "{apply}");
let status = cluster_json(temp.path(), "status");
assert_eq!(
status["resource_statuses"]["query.knowledge.find_person"]["status"],
"applied"
);
assert_eq!(status["resource_statuses"]["policy.base"]["status"], "applied");
assert!(
status["state_observations"]["applied_config_digest"].is_string(),
"converged apply must record the applied config digest: {status}"
);
// Refresh re-observes the live graph; it must not undo apply's work.
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
let replan = cluster_json(temp.path(), "plan");
assert!(
replan["changes"].as_array().unwrap().is_empty(),
"refresh after a converged apply must not re-open the plan: {replan}"
);
// A query edit round-trips: plan update -> apply -> converged again.
fs::write(
temp.path().join("people.gq"),
r#"
query find_person($name: String) {
match { $p: Person { name: $name } }
return { $p.name }
}
"#,
)
.unwrap();
let apply_edit = cluster_json(temp.path(), "apply");
assert_eq!(apply_edit["applied_count"], 1, "{apply_edit}");
assert_eq!(apply_edit["converged"], true, "{apply_edit}");
let final_apply = cluster_json(temp.path(), "apply");
assert_eq!(final_apply["state_written"], false, "{final_apply}");
assert!(final_apply["changes"].as_array().unwrap().is_empty());
}
#[test]
fn cluster_e2e_schema_change_applied_by_cluster() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
// Additive schema change: Stage 4B applies it from the cluster — no
// manual schema apply, no refresh round-trip.
fs::write(
temp.path().join("people.pg"),
r#"
node Person {
name: String @key
age: I32?
bio: String?
}
"#,
)
.unwrap();
// Plan previews the real migration steps (RFC-004 §D7).
let plan = cluster_json(temp.path(), "plan");
let schema_change = change_for(&plan, "schema.knowledge");
assert_eq!(schema_change["disposition"], "applied", "{plan}");
let migration = &schema_change["migration"];
assert_eq!(migration["supported"], true, "{plan}");
assert!(
migration["steps"]
.as_array()
.unwrap()
.iter()
.any(|step| step["kind"] == "add_property"),
"{plan}"
);
let evolve = cluster_json(temp.path(), "apply");
assert_eq!(evolve["ok"], true, "{evolve}");
assert_eq!(evolve["converged"], true, "{evolve}");
assert_eq!(change_for(&evolve, "schema.knowledge")["disposition"], "applied");
// The live graph carries the new schema; the plan is empty.
let schema_show = output_success(
cli()
.arg("schema")
.arg("show")
.arg(temp.path().join("graphs/knowledge.omni")),
);
assert!(stdout_string(&schema_show).contains("bio"), "live schema updated");
let replan = cluster_json(temp.path(), "plan");
assert!(
replan["changes"].as_array().unwrap().is_empty(),
"one cluster apply converges a schema change: {replan}"
);
}
#[test]
fn cluster_e2e_force_unlock_unblocks_apply() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
write_cluster_applyable_state(temp.path());
write_cluster_lock(temp.path(), "stuck-lock", "apply");
let refused = parse_stdout_json(&output_failure(
cli()
.arg("cluster")
.arg("apply")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(refused["ok"], false);
let unlocked = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("force-unlock")
.arg("stuck-lock")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(unlocked["lock_removed"], true, "{unlocked}");
let retried = cluster_json(temp.path(), "apply");
assert_eq!(retried["ok"], true, "{retried}");
assert_eq!(retried["converged"], true, "{retried}");
}
#[test]
fn cluster_e2e_lost_state_reimport_recovers_catalog() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"]
.as_str()
.unwrap()
.to_string();
let blob = temp
.path()
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{query_digest}.gq"));
let blob_content = fs::read_to_string(&blob).unwrap();
// Disaster: the state ledger is lost.
fs::remove_file(temp.path().join("__cluster/state.json")).unwrap();
let reimport = cluster_json(temp.path(), "import");
assert_eq!(reimport["ok"], true, "{reimport}");
assert_eq!(reimport["state_observations"]["state_revision"], 1);
// Import observes graph/schema only; query/policy digests are not invented.
assert!(
reimport["resource_digests"]
.get("query.knowledge.find_person")
.is_none(),
"{reimport}"
);
let plan = cluster_json(temp.path(), "plan");
assert_eq!(
change_for(&plan, "query.knowledge.find_person")["disposition"],
"applied"
);
assert_eq!(change_for(&plan, "policy.base")["disposition"], "applied");
let reapply = cluster_json(temp.path(), "apply");
assert_eq!(reapply["ok"], true, "{reapply}");
assert_eq!(reapply["converged"], true, "{reapply}");
assert!(
reapply["state_observations"]["applied_config_digest"].is_string(),
"{reapply}"
);
// The catalog blob was reused, not rewritten with different content.
assert_eq!(fs::read_to_string(&blob).unwrap(), blob_content);
let replan = cluster_json(temp.path(), "plan");
assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}");
}
#[test]
fn cluster_e2e_out_of_band_schema_drift_then_apply_converges_it() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
// Out-of-band: the live graph evolves, cluster.yaml stays put.
fs::write(
temp.path().join("people_v2.pg"),
r#"
node Person {
name: String @key
age: I32?
bio: String?
}
"#,
)
.unwrap();
output_success(
cli()
.arg("schema")
.arg("apply")
.arg(temp.path().join("graphs/knowledge.omni"))
.arg("--schema")
.arg(temp.path().join("people_v2.pg"))
.arg("--json"),
);
// Drift is visible...
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(
refresh["resource_statuses"]["schema.knowledge"]["status"],
"drifted"
);
// ...the plan proposes converging back to desired, with a migration
// preview (a soft drop of the out-of-band field)...
let plan = cluster_json(temp.path(), "plan");
let schema_change = change_for(&plan, "schema.knowledge");
assert_eq!(schema_change["disposition"], "applied", "{plan}");
assert!(
schema_change["migration"]["steps"]
.as_array()
.unwrap()
.iter()
.any(|step| step["kind"] == "drop_property" && step["mode"] == "soft"),
"{plan}"
);
// ...and apply converges the live schema back (axiom 8: drift correction
// is gated like any change; a soft migration is the recoverable tier).
let converge = cluster_json(temp.path(), "apply");
assert_eq!(converge["ok"], true, "{converge}");
assert_eq!(converge["converged"], true, "{converge}");
let schema_show = output_success(
cli()
.arg("schema")
.arg("show")
.arg(temp.path().join("graphs/knowledge.omni")),
);
assert!(
!stdout_string(&schema_show).contains("bio"),
"out-of-band field soft-dropped back to desired"
);
let replan = cluster_json(temp.path(), "plan");
assert!(replan["changes"].as_array().unwrap().is_empty(), "{replan}");
}
#[test]
fn cluster_e2e_graph_root_destruction_drifts_then_apply_recreates_empty_graph() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"]
.as_str()
.unwrap()
.to_string();
fs::remove_dir_all(temp.path().join("graphs/knowledge.omni")).unwrap();
// Missing root is drift, not an error.
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
assert_eq!(
refresh["resource_statuses"]["graph.knowledge"]["status"],
"drifted"
);
assert!(
refresh["resource_statuses"]["graph.knowledge"]["conditions"]
.as_array()
.unwrap()
.iter()
.any(|condition| condition == "graph_missing"),
"{refresh}"
);
// Graph/schema digests removed; query/policy digests preserved.
assert!(refresh["resource_digests"].get("graph.knowledge").is_none());
assert!(refresh["resource_digests"].get("schema.knowledge").is_none());
assert!(
refresh["resource_digests"]
.get("query.knowledge.find_person")
.is_some(),
"{refresh}"
);
let plan = cluster_json(temp.path(), "plan");
assert_eq!(change_for(&plan, "graph.knowledge")["operation"], "create");
// Stage 4A: the re-create is executable and the plan says so — nothing
// hidden about converging a destroyed root back to an EMPTY graph (the
// data was already lost; this is declarative convergence, RFC-004 §D1).
assert_eq!(change_for(&plan, "graph.knowledge")["disposition"], "applied");
assert_eq!(change_for(&plan, "schema.knowledge")["disposition"], "applied");
// Converged-then-destroyed: query/policy are already in state at the
// desired digests, so they are not changes at all.
assert_eq!(plan["changes"].as_array().unwrap().len(), 2, "{plan}");
let recreate = cluster_json(temp.path(), "apply");
assert_eq!(recreate["ok"], true, "{recreate}");
assert_eq!(recreate["converged"], true, "{recreate}");
// The empty graph is back on disk; catalog state survived throughout.
assert!(temp.path().join("graphs/knowledge.omni").exists());
let state: serde_json::Value = serde_json::from_str(
&fs::read_to_string(temp.path().join("__cluster/state.json")).unwrap(),
)
.unwrap();
assert_eq!(
state["applied_revision"]["resources"]["query.knowledge.find_person"]["digest"],
query_digest
);
assert!(
temp.path()
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{query_digest}.gq"))
.exists()
);
}
#[test]
fn cluster_e2e_multi_graph_mixed_dispositions_then_approve_and_converge() {
let temp = tempdir().unwrap();
write_multi_graph_cluster_fixture(temp.path());
// No manual init: Stage 4A creates both graphs.
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["ok"], true, "{apply}");
assert_eq!(apply["converged"], true, "{apply}");
assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied");
assert_eq!(
change_for(&apply, "graph.engineering")["disposition"],
"applied"
);
assert_eq!(
change_for(&apply, "query.engineering.find_service")["disposition"],
"applied"
);
// The graph-spanning and cluster-scoped policies ride the same run.
assert_eq!(change_for(&apply, "policy.shared")["disposition"], "applied");
assert_eq!(
change_for(&apply, "policy.cluster_wide")["disposition"],
"applied"
);
assert!(temp.path().join("graphs/knowledge.omni").exists());
assert!(temp.path().join("graphs/engineering.omni").exists());
// Mixed run: a graph REMOVAL (4C territory — deferred) gates its query
// delete (blocked), while a knowledge query update is independent
// (applied) and re-derives its composite. All four dispositions at once.
fs::write(
temp.path().join("cluster.yaml"),
r#"
version: 1
metadata:
name: company-brain
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
shared:
file: ./shared.policy.yaml
applies_to: [knowledge]
cluster_wide:
file: ./cluster_wide.policy.yaml
applies_to: [cluster]
"#,
)
.unwrap();
fs::write(
temp.path().join("people.gq"),
"\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n",
)
.unwrap();
let mixed = cluster_json(temp.path(), "apply");
assert_eq!(mixed["ok"], true, "{mixed}");
assert_eq!(mixed["converged"], false, "{mixed}");
// Stage 4C: deletes are gated on a digest-bound approval, one gate per
// subtree (the graph-level approval carries schema + queries).
assert_eq!(
change_for(&mixed, "graph.engineering")["disposition"],
"blocked"
);
assert_eq!(
change_for(&mixed, "graph.engineering")["reason"],
"approval_required"
);
assert_eq!(
change_for(&mixed, "schema.engineering")["reason"],
"approval_required"
);
assert_eq!(
change_for(&mixed, "query.engineering.find_service")["reason"],
"approval_required"
);
let gate_plan = cluster_json(temp.path(), "plan");
let gates = gate_plan["approvals_required"].as_array().unwrap();
assert_eq!(gates.len(), 1, "{gate_plan}");
assert_eq!(gates[0]["resource"], "graph.engineering");
assert_eq!(gates[0]["satisfied"], false);
assert_eq!(
change_for(&mixed, "query.knowledge.find_person")["disposition"],
"applied"
);
// 5A: policy.shared's applies_to narrowed with an unchanged file digest
// — now a first-class binding change, applied in the same run.
assert_eq!(change_for(&mixed, "policy.shared")["binding_change"], true);
assert_eq!(change_for(&mixed, "policy.shared")["disposition"], "applied");
assert_eq!(
change_for(&mixed, "graph.knowledge")["disposition"],
"derived"
);
// Deterministic ordering: changes sorted by resource address.
let order: Vec<&str> = mixed["changes"]
.as_array()
.unwrap()
.iter()
.map(|change| change["resource"].as_str().unwrap())
.collect();
let mut sorted = order.clone();
sorted.sort_unstable();
assert_eq!(order, sorted, "{mixed}");
// The conclusion: an apply without approval stays blocked; the approved
// delete converges the cluster, tombstoning the removed graph.
let still_blocked = cluster_json(temp.path(), "apply");
assert_eq!(still_blocked["converged"], false, "{still_blocked}");
let approve = parse_stdout_json(&output_success(
cli()
.arg("--as")
.arg("andrew")
.arg("cluster")
.arg("approve")
.arg("graph.engineering")
.arg("--config")
.arg(temp.path())
.arg("--json"),
));
assert_eq!(approve["ok"], true, "{approve}");
assert_eq!(approve["approved_by"], "andrew");
let converge = cluster_json(temp.path(), "apply");
assert_eq!(converge["ok"], true, "{converge}");
assert_eq!(converge["converged"], true, "{converge}");
assert!(!temp.path().join("graphs/engineering.omni").exists());
let status = cluster_json(temp.path(), "status");
assert_eq!(status["observations"]["graph.engineering"]["kind"], "tombstone");
let final_plan = cluster_json(temp.path(), "plan");
assert!(
final_plan["changes"].as_array().unwrap().is_empty(),
"{final_plan}"
);
}
#[test]
fn cluster_e2e_approve_requires_actor() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let output = output_failure(
cli()
.arg("cluster")
.arg("approve")
.arg("graph.knowledge")
.arg("--config")
.arg(temp.path()),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(stderr.contains("--as"), "{stderr}");
}
#[test]
fn cluster_e2e_declared_graph_created_by_apply() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["ok"], true, "{apply}");
assert_eq!(apply["converged"], true, "{apply}");
assert_eq!(change_for(&apply, "graph.knowledge")["disposition"], "applied");
assert!(temp.path().join("graphs/knowledge.omni").exists());
// The created graph is a real graph: the per-graph CLI can open it.
let snapshot = output_success(
cli()
.arg("snapshot")
.arg(temp.path().join("graphs/knowledge.omni")),
);
assert!(!stdout_string(&snapshot).is_empty());
let plan = cluster_json(temp.path(), "plan");
assert!(plan["changes"].as_array().unwrap().is_empty(), "{plan}");
let status = cluster_json(temp.path(), "status");
assert_eq!(
status["resource_statuses"]["graph.knowledge"]["status"],
"applied"
);
}
#[test]
fn cluster_e2e_payload_drift_self_heals() {
let temp = tempdir().unwrap();
write_cluster_config_fixture(temp.path());
init_cluster_derived_graph(temp.path());
let import = cluster_json(temp.path(), "import");
assert_eq!(import["ok"], true, "{import}");
let apply = cluster_json(temp.path(), "apply");
assert_eq!(apply["converged"], true, "{apply}");
let query_digest = change_for(&apply, "query.knowledge.find_person")["after_digest"]
.as_str()
.unwrap()
.to_string();
let blob = temp
.path()
.join("__cluster/resources/query/knowledge/find_person")
.join(format!("{query_digest}.gq"));
fs::remove_file(&blob).unwrap();
let status = cluster_json(temp.path(), "status");
assert_eq!(status["ok"], true, "{status}");
assert!(
status["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "catalog_payload_missing"),
"{status}"
);
let refresh = cluster_json(temp.path(), "refresh");
assert_eq!(refresh["ok"], true, "{refresh}");
assert_eq!(
refresh["resource_statuses"]["query.knowledge.find_person"]["status"],
"drifted"
);
let heal = cluster_json(temp.path(), "apply");
assert_eq!(heal["ok"], true, "{heal}");
assert_eq!(heal["converged"], true, "{heal}");
assert!(blob.exists(), "blob republished");
let clean = cluster_json(temp.path(), "status");
assert!(
!clean["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| {
diagnostic["code"]
.as_str()
.is_some_and(|code| code.starts_with("catalog_payload"))
}),
"{clean}"
);
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,535 @@
//! Stored-query commands and alias resolution.
//! Moved verbatim from tests/cli.rs in the modularization.
use serde_json::Value;
use tempfile::tempdir;
mod support;
use support::*;
#[test]
fn query_check_alias_matches_lint_output() {
let temp = tempdir().unwrap();
let schema_path = temp.path().join("schema.pg");
let query_path = temp.path().join("queries.gq");
write_file(
&schema_path,
r#"
node Person {
name: String
}
"#,
);
write_query_file(
&query_path,
r#"
query list_people() {
match { $p: Person }
return { $p.name }
}
"#,
);
let lint_output = output_success(
cli()
.arg("query")
.arg("lint")
.arg("--query")
.arg(&query_path)
.arg("--schema")
.arg(&schema_path)
.arg("--json"),
);
let check_output = output_success(
cli()
.arg("query")
.arg("check")
.arg("--query")
.arg(&query_path)
.arg("--schema")
.arg(&schema_path)
.arg("--json"),
);
assert_eq!(stdout_string(&lint_output), stdout_string(&check_output));
}
#[test]
fn read_alias_from_yaml_config_runs_with_kv_output() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let config = temp.path().join("omnigraph.yaml");
let query = temp.path().join("aliases.gq");
init_graph(&graph);
load_fixture(&graph);
write_query_file(
&query,
&std::fs::read_to_string(fixture("test.gq")).unwrap(),
);
write_config(
&config,
&format!(
"{}aliases:\n owner:\n command: read\n query: aliases.gq\n name: get_person\n args: [name]\n format: kv\n",
local_yaml_config(&graph)
),
);
let output = output_success(
cli()
.arg("read")
.arg("--config")
.arg(&config)
.arg("--alias")
.arg("owner")
.arg("Alice"),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("row 1"));
assert!(stdout.contains("p.name: Alice"));
}
#[test]
fn read_alias_uses_alias_target_without_cli_default_and_accepts_url_like_arg() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let config = temp.path().join("omnigraph.yaml");
let query = temp.path().join("aliases.gq");
let data = temp.path().join("url-like.jsonl");
init_graph(&graph);
write_jsonl(
&data,
r#"{"type":"Person","data":{"name":"https://example.com","age":30}}"#,
);
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(&graph),
);
write_query_file(
&query,
&std::fs::read_to_string(fixture("test.gq")).unwrap(),
);
write_config(
&config,
&format!(
"graphs:\n local:\n uri: '{}'\nquery:\n roots:\n - .\npolicy: {{}}\naliases:\n owner:\n command: read\n query: aliases.gq\n name: get_person\n args: [name]\n graph: local\n format: kv\n",
graph.to_string_lossy()
),
);
let output = output_success(
cli()
.arg("read")
.arg("--config")
.arg(&config)
.arg("--alias")
.arg("owner")
.arg("https://example.com"),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("row 1"));
assert!(stdout.contains("p.name: https://example.com"));
}
#[test]
fn change_alias_from_yaml_config_persists_changes() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let config = temp.path().join("omnigraph.yaml");
let query = temp.path().join("mutations.gq");
init_graph(&graph);
load_fixture(&graph);
write_query_file(
&query,
r#"
query insert_person($name: String, $age: I32) {
insert Person { name: $name, age: $age }
}
"#,
);
write_config(
&config,
&format!(
"{}aliases:\n add_person:\n command: change\n query: mutations.gq\n name: insert_person\n args: [name, age]\n",
local_yaml_config(&graph)
),
);
let output = output_success(
cli()
.arg("change")
.arg("--config")
.arg(&config)
.arg("--alias")
.arg("add_person")
.arg("Eve")
.arg("29")
.arg("--json"),
);
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["affected_nodes"], 1);
let verify = output_success(
cli()
.arg("read")
.arg(&graph)
.arg("--query")
.arg(fixture("test.gq"))
.arg("--name")
.arg("get_person")
.arg("--params")
.arg(r#"{"name":"Eve"}"#)
.arg("--json"),
);
let verify_payload: Value = serde_json::from_slice(&verify.stdout).unwrap();
assert_eq!(verify_payload["row_count"], 1);
}
#[test]
fn queries_validate_exits_zero_on_clean_registry() {
let graph = SystemGraph::loaded();
graph.write_query(
"find_person.gq",
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(
&graph.path().to_string_lossy(),
"find_person",
"find_person.gq",
),
);
let output = output_success(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("OK"), "stdout:\n{stdout}");
}
#[test]
fn queries_validate_exits_nonzero_on_type_broken_query() {
let graph = SystemGraph::loaded();
// `Widget` is not in the fixture schema.
graph.write_query(
"ghost.gq",
"query ghost() { match { $w: Widget } return { $w.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "ghost", "ghost.gq"),
);
let output = output_failure(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(
stdout.contains("ghost"),
"validation should name the broken query; stdout:\n{stdout}"
);
}
#[test]
fn queries_list_prints_registered_query() {
let graph = SystemGraph::loaded();
graph.write_query(
"find_person.gq",
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
);
// Exposed with an explicit tool name so the list shows the MCP suffix.
let config = graph.write_config(
"omnigraph.yaml",
&format!(
concat!(
"graphs:\n",
" local:\n",
" uri: '{}'\n",
" queries:\n",
" find_person:\n",
" file: ./find_person.gq\n",
" mcp: {{ expose: true, tool_name: lookup_person }}\n",
"cli:\n",
" graph: local\n",
"policy: {{}}\n",
),
graph.path().to_string_lossy().replace('\'', "''")
),
);
let output = output_success(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("find_person"), "stdout:\n{stdout}");
assert!(
stdout.contains("$name: String"),
"list should show typed params; stdout:\n{stdout}"
);
assert!(
stdout.contains("[mcp: lookup_person]"),
"list should show the MCP tool name for exposed queries; stdout:\n{stdout}"
);
}
#[test]
fn queries_list_requires_graph_selection_for_per_graph_only_registries() {
let graph = SystemGraph::loaded();
graph.write_query(
"find_person.gq",
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&format!(
concat!(
"graphs:\n",
" local:\n",
" uri: '{}'\n",
" queries:\n",
" find_person:\n",
" file: ./find_person.gq\n",
"policy: {{}}\n",
),
graph.path().to_string_lossy().replace('\'', "''")
),
);
let output = output_failure(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("local") && stderr.contains("--target local"),
"error must name the graph and give a concrete selection hint; stderr:\n{stderr}"
);
}
#[test]
fn queries_list_without_graph_selection_lists_top_level_registry() {
let graph = SystemGraph::loaded();
graph.write_query(
"top_find.gq",
"query top_find($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
);
let config = graph.write_config(
"omnigraph.yaml",
concat!(
"queries:\n",
" top_find:\n",
" file: ./top_find.gq\n",
"policy: {}\n",
),
);
let output = output_success(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("top_find"), "stdout:\n{stdout}");
}
#[test]
fn queries_list_unknown_target_errors() {
// `queries list` opens no graph URI, so unknown-graph validation can't ride
// along on URI resolution the way it does for every other command. An
// unknown `--target` must still error (naming the graph) instead of
// silently falling back to the top-level registry and showing the wrong
// (or empty) catalog.
let graph = SystemGraph::loaded();
graph.write_query(
"find_person.gq",
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(
&graph.path().to_string_lossy(),
"find_person",
"find_person.gq",
),
);
let output = output_failure(
cli()
.arg("queries")
.arg("list")
.arg("--target")
.arg("nonexistent")
.arg("--config")
.arg(&config),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("nonexistent"),
"error must name the unknown graph; stderr:\n{stderr}"
);
}
#[test]
fn queries_commands_reject_named_graph_with_populated_top_level_block() {
// A named graph (here via `cli.graph`) uses its own `graphs.<name>` block,
// so a populated top-level `queries:` block would be silently ignored — a
// config the server REFUSES to boot. `queries validate`/`list` must reject
// it too (matching boot) instead of validating/listing the per-graph block
// and giving a false green.
let graph = SystemGraph::loaded();
graph.write_query(
"find_person.gq",
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&format!(
concat!(
"graphs:\n",
" local:\n",
" uri: '{}'\n",
" queries:\n",
" find_person:\n",
" file: ./find_person.gq\n",
"cli:\n",
" graph: local\n",
"queries:\n", // populated top-level block: the coherence violation
" legacy:\n",
" file: ./legacy.gq\n",
"policy: {{}}\n",
),
graph.path().to_string_lossy().replace('\'', "''")
),
);
// Both resolve `local` from cli.graph (no positional URI), so both must
// error and name the graph + the ignored block — like server boot does.
for sub in ["validate", "list"] {
let output = output_failure(cli().arg("queries").arg(sub).arg("--config").arg(&config));
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("local") && stderr.contains("queries"),
"`queries {sub}` must reject a named graph with a populated top-level block; stderr:\n{stderr}"
);
}
}
#[test]
fn queries_validate_exits_nonzero_on_duplicate_tool_name() {
// Two exposed queries claiming one MCP tool name is a load-time
// collision — `queries validate` must fail (offline, before the engine
// opens) and name both queries plus the contested tool.
let graph = SystemGraph::loaded();
graph.write_query(
"a.gq",
"query a() { match { $p: Person } return { $p.name } }",
);
graph.write_query(
"b.gq",
"query b() { match { $p: Person } return { $p.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&format!(
concat!(
"graphs:\n",
" local:\n",
" uri: '{}'\n",
" queries:\n",
" a:\n",
" file: ./a.gq\n",
" mcp: {{ expose: true, tool_name: dup }}\n",
" b:\n",
" file: ./b.gq\n",
" mcp: {{ expose: true, tool_name: dup }}\n",
"cli:\n",
" graph: local\n",
"policy: {{}}\n",
),
graph.path().to_string_lossy().replace('\'', "''")
),
);
let output = output_failure(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("dup") && stderr.contains("'a'") && stderr.contains("'b'"),
"duplicate tool name should be reported naming both queries; stderr:\n{stderr}"
);
}
#[test]
fn queries_validate_positional_uri_ignores_default_graph() {
// A positional URI is anonymous → the schema AND the registry both come
// from top-level, even when `cli.graph` names a graph whose per-graph
// queries would fail. Pins that the URI and registry can't diverge.
let graph = SystemGraph::loaded();
graph.write_query(
"clean.gq",
"query clean($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
);
// `Widget` is not in the fixture schema — the default graph's per-graph
// query would break validate if it were (wrongly) selected.
graph.write_query(
"broken.gq",
"query broken() { match { $w: Widget } return { $w.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
concat!(
"cli:\n graph: prod\n",
"graphs:\n",
" prod:\n",
" uri: /nonexistent-prod.omni\n",
" queries:\n",
" broken:\n",
" file: ./broken.gq\n",
"queries:\n",
" clean:\n",
" file: ./clean.gq\n",
"policy: {}\n",
),
);
// Positional URI = the real loaded graph; selection is anonymous, so the
// CLEAN top-level registry validates (not prod's broken one).
let output = output_success(
cli()
.arg("queries")
.arg("validate")
.arg(graph.path())
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(
stdout.contains("OK"),
"positional URI must validate the top-level registry, not the cli.graph default; stdout:\n{stdout}"
);
}

View file

@ -0,0 +1,500 @@
//! init/config scaffolding, schema plan/apply, graphs listing, version.
//! Moved verbatim from tests/cli.rs in the modularization.
use std::fs;
use lance::index::DatasetIndexExt;
use omnigraph::db::{Omnigraph, ReadTarget};
use serde_json::Value;
use tempfile::tempdir;
mod support;
use support::*;
#[test]
fn version_command_prints_current_cli_version() {
let output = output_success(cli().arg("version"));
let stdout = stdout_string(&output);
assert_eq!(
stdout.trim(),
format!("omnigraph {}", env!("CARGO_PKG_VERSION"))
);
}
#[test]
fn init_creates_graph_successfully_on_missing_local_directory() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema = fixture("test.pg");
let output = output_success(cli().arg("init").arg("--schema").arg(&schema).arg(&graph));
let stdout = stdout_string(&output);
assert!(stdout.contains("initialized"));
assert!(graph.join("_schema.pg").exists());
assert!(graph.join("__manifest").exists());
assert!(temp.path().join("omnigraph.yaml").exists());
}
#[test]
fn schema_plan_json_reports_supported_additive_change() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("next.pg");
init_graph(&graph);
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
fs::write(&schema_path, next_schema).unwrap();
let output = output_success(
cli()
.arg("schema")
.arg("plan")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.arg(&graph),
);
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["supported"], true);
assert_eq!(payload["step_count"], 1);
assert_eq!(payload["steps"][0]["kind"], "add_property");
assert_eq!(payload["steps"][0]["type_kind"], "node");
assert_eq!(payload["steps"][0]["type_name"], "Person");
assert_eq!(payload["steps"][0]["property_name"], "nickname");
}
#[test]
fn schema_plan_json_reports_unsupported_type_change() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("breaking.pg");
init_graph(&graph);
let breaking_schema = fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace("age: I32?", "age: I64?");
fs::write(&schema_path, breaking_schema).unwrap();
let output = output_success(
cli()
.arg("schema")
.arg("plan")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.arg(&graph),
);
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["supported"], false);
assert!(payload["steps"].as_array().unwrap().iter().any(|step| {
step["kind"] == "unsupported_change"
&& step["entity"]
.as_str()
.unwrap_or_default()
.contains("Person.age")
}));
}
#[test]
fn schema_apply_json_applies_supported_migration() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("next.pg");
init_graph(&graph);
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
fs::write(&schema_path, next_schema).unwrap();
let output = output_success(
cli()
.arg("schema")
.arg("apply")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.arg(&graph),
);
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["supported"], true);
assert_eq!(payload["applied"], true);
assert_eq!(payload["step_count"], 1);
let db = tokio::runtime::Runtime::new()
.unwrap()
.block_on(Omnigraph::open(graph.to_string_lossy().as_ref()))
.unwrap();
assert!(
db.catalog().node_types["Person"]
.properties
.contains_key("nickname")
);
}
#[test]
fn schema_apply_human_reports_noop() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = fixture("test.pg");
init_graph(&graph);
let output = output_success(
cli()
.arg("schema")
.arg("apply")
.arg("--schema")
.arg(&schema_path)
.arg(&graph),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("applied: no"));
assert!(stdout.contains("no schema changes"));
}
#[test]
fn schema_apply_json_renames_type_and_updates_snapshot() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("rename.pg");
init_graph(&graph);
let renamed_schema = fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace("node Person {\n", "node Human @rename_from(\"Person\") {\n")
.replace("edge Knows: Person -> Person", "edge Knows: Human -> Human")
.replace(
"edge WorksAt: Person -> Company",
"edge WorksAt: Human -> Company",
);
fs::write(&schema_path, renamed_schema).unwrap();
let output = output_success(
cli()
.arg("schema")
.arg("apply")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.arg(&graph),
);
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["applied"], true);
let db = tokio::runtime::Runtime::new()
.unwrap()
.block_on(Omnigraph::open(graph.to_string_lossy().as_ref()))
.unwrap();
let snapshot = tokio::runtime::Runtime::new()
.unwrap()
.block_on(db.snapshot_of(ReadTarget::branch("main")))
.unwrap();
assert!(snapshot.entry("node:Human").is_some());
assert!(snapshot.entry("node:Person").is_none());
}
#[test]
fn schema_apply_json_renames_property_and_updates_catalog() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("rename-property.pg");
init_graph(&graph);
let renamed_schema = fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace("age: I32?", "years: I32? @rename_from(\"age\")");
fs::write(&schema_path, renamed_schema).unwrap();
let output = output_success(
cli()
.arg("schema")
.arg("apply")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.arg(&graph),
);
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["applied"], true);
let db = tokio::runtime::Runtime::new()
.unwrap()
.block_on(Omnigraph::open(graph.to_string_lossy().as_ref()))
.unwrap();
let person = &db.catalog().node_types["Person"];
assert!(person.properties.contains_key("years"));
assert!(!person.properties.contains_key("age"));
}
#[test]
fn schema_apply_json_adds_index_for_existing_property() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("index.pg");
init_graph(&graph);
let before_index_count = tokio::runtime::Runtime::new().unwrap().block_on(async {
let db = Omnigraph::open(graph.to_string_lossy().as_ref())
.await
.unwrap();
let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
let dataset = snapshot.open("node:Person").await.unwrap();
dataset.load_indices().await.unwrap().len()
});
let indexed_schema = fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace("name: String @key", "name: String @key @index");
fs::write(&schema_path, indexed_schema).unwrap();
let output = output_success(
cli()
.arg("schema")
.arg("apply")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.arg(&graph),
);
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["applied"], true);
let after_index_count = tokio::runtime::Runtime::new().unwrap().block_on(async {
let db = Omnigraph::open(graph.to_string_lossy().as_ref())
.await
.unwrap();
let snapshot = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
let dataset = snapshot.open("node:Person").await.unwrap();
dataset.load_indices().await.unwrap().len()
});
assert!(after_index_count > before_index_count);
}
#[test]
fn schema_apply_rejects_unsupported_plan() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("breaking.pg");
init_graph(&graph);
let breaking_schema = fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace("age: I32?", "age: I64?");
fs::write(&schema_path, breaking_schema).unwrap();
let output = output_failure(
cli()
.arg("schema")
.arg("apply")
.arg("--schema")
.arg(&schema_path)
.arg(&graph),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(stderr.contains("changing property type"));
}
#[test]
fn schema_apply_rejects_when_non_main_branch_exists() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("next.pg");
init_graph(&graph);
output_success(
cli()
.arg("branch")
.arg("create")
.arg("--from")
.arg("main")
.arg("--uri")
.arg(&graph)
.arg("feature"),
);
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
fs::write(&schema_path, next_schema).unwrap();
let output = output_failure(
cli()
.arg("schema")
.arg("apply")
.arg("--schema")
.arg(&schema_path)
.arg(&graph),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(stderr.contains("schema apply requires a graph with only main"));
}
#[test]
fn schema_apply_allow_data_loss_flag_promotes_drops_to_hard() {
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("drop-age.pg");
init_graph(&graph);
// Drop the nullable `age` column.
let next_schema = fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace(" age: I32?\n", "");
fs::write(&schema_path, next_schema).unwrap();
let output = output_success(
cli()
.arg("schema")
.arg("apply")
.arg("--schema")
.arg(&schema_path)
.arg("--allow-data-loss")
.arg("--json")
.arg(&graph),
);
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["applied"], true);
let drop_step = payload["steps"]
.as_array()
.unwrap()
.iter()
.find(|s| s["kind"] == "drop_property")
.expect("plan should include a drop_property step");
assert_eq!(
drop_step["mode"], "hard",
"--allow-data-loss should promote Soft → Hard; full step: {drop_step}",
);
}
#[test]
fn schema_apply_without_allow_data_loss_keeps_soft_drops() {
// Symmetric to the above: same schema change without the flag →
// drops stay Soft. Pins default semantics against accidental Hard
// promotion if a future refactor changes the option threading.
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
let schema_path = temp.path().join("drop-age-soft.pg");
init_graph(&graph);
let next_schema = fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace(" age: I32?\n", "");
fs::write(&schema_path, next_schema).unwrap();
let output = output_success(
cli()
.arg("schema")
.arg("apply")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.arg(&graph),
);
let payload: Value = serde_json::from_slice(&output.stdout).unwrap();
assert_eq!(payload["applied"], true);
let drop_step = payload["steps"]
.as_array()
.unwrap()
.iter()
.find(|s| s["kind"] == "drop_property")
.expect("plan should include a drop_property step");
assert_eq!(
drop_step["mode"], "soft",
"no flag should leave drops Soft; full step: {drop_step}",
);
}
#[test]
fn schema_plan_parity_cli_and_sdk() {
// Same .pg through `Omnigraph::plan_schema_with_options` (SDK) and
// `omnigraph schema plan --json` (CLI). Asserts the steps array is
// byte-identical after JSON round-trip. HTTP doesn't expose a
// separate /schema/plan route — that side of parity is covered by
// the HTTP soft/hard drop tests, which exercise apply with
// identical fixtures.
let temp = tempdir().unwrap();
let graph = graph_path(temp.path());
init_graph(&graph);
let schema_path = temp.path().join("plan-parity.pg");
let next_schema = fs::read_to_string(fixture("test.pg")).unwrap().replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
fs::write(&schema_path, &next_schema).unwrap();
// CLI side.
let cli_output = output_success(
cli()
.arg("schema")
.arg("plan")
.arg("--schema")
.arg(&schema_path)
.arg("--json")
.arg(&graph),
);
let cli_payload: Value = serde_json::from_slice(&cli_output.stdout).unwrap();
// SDK side: open graph, call plan_schema.
let plan = tokio::runtime::Runtime::new().unwrap().block_on(async {
let db = Omnigraph::open(graph.to_string_lossy().as_ref())
.await
.unwrap();
db.plan_schema(&next_schema).await.unwrap()
});
let sdk_steps = serde_json::to_value(&plan.steps).unwrap();
assert_eq!(
cli_payload["steps"], sdk_steps,
"CLI plan steps must match SDK plan steps for identical input",
);
assert_eq!(cli_payload["supported"], plan.supported);
}
#[test]
fn graphs_subcommand_help_lists_list_only() {
let output = output_success(cli().arg("graphs").arg("--help"));
let stdout = stdout_string(&output);
assert!(
stdout.contains("list"),
"expected `list` subcommand in help output:\n{stdout}"
);
let lowered = stdout.to_lowercase();
assert!(
!lowered.contains("create a new graph"),
"graph create should not be in v0.6.0 help; got:\n{stdout}"
);
assert!(
!lowered.contains("delete a graph"),
"graph delete should not be in v0.6.0 help; got:\n{stdout}"
);
}
#[test]
fn graphs_list_against_local_uri_errors_with_remote_only_message() {
let output = output_failure(
cli()
.arg("graphs")
.arg("list")
.arg("--uri")
.arg("/tmp/local"),
);
let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
assert!(
stderr.contains("remote multi-graph server URL"),
"expected 'remote multi-graph server URL' rejection in stderr; got:\n{stderr}"
);
}

View file

@ -317,3 +317,353 @@ impl SystemGraph {
spawn_server_with_config_env(config, envs)
}
}
// ---- helpers moved from the monolithic tests/cli.rs ----
#[allow(unused_imports)]
use lance::Dataset;
#[allow(unused_imports)]
use lance::index::DatasetIndexExt;
#[allow(unused_imports)]
use omnigraph::db::{Omnigraph, ReadTarget};
pub const POLICY_YAML: &str = r#"
version: 1
groups:
team: [act-andrew, act-bruno]
admins: [act-andrew]
protected_branches: [main]
rules:
- id: team-read
allow:
actors: { group: team }
actions: [read]
branch_scope: any
- id: team-write
allow:
actors: { group: team }
actions: [change]
branch_scope: unprotected
- id: admins-promote
allow:
actors: { group: admins }
actions: [branch_merge]
target_branch_scope: protected
"#;
pub const POLICY_TESTS_YAML: &str = r#"
version: 1
cases:
- id: allow-feature-write
actor: act-andrew
action: change
branch: feature
expect: allow
- id: deny-main-write
actor: act-bruno
action: change
branch: main
expect: deny
"#;
pub fn manifest_dataset_version(graph: &std::path::Path) -> u64 {
tokio::runtime::Runtime::new().unwrap().block_on(async {
Omnigraph::open(graph.to_string_lossy().as_ref())
.await
.unwrap()
.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap()
.version()
})
}
pub fn forge_person_delete_drift(graph: &std::path::Path) -> (u64, u64) {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let uri = graph.to_string_lossy();
let db = Omnigraph::open(uri.as_ref()).await.unwrap();
let snap = db
.snapshot_of(ReadTarget::branch("main"))
.await
.unwrap();
let entry = snap.entry("node:Person").unwrap();
let full_path = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
let mut ds = Dataset::open(&full_path).await.unwrap();
let deleted = ds.delete("name = 'Alice'").await.unwrap();
assert_eq!(deleted.num_deleted_rows, 1);
let head = deleted.new_dataset.version().version;
assert!(head > entry.table_version);
(entry.table_version, head)
})
}
pub fn write_policy_config_fixture(root: &std::path::Path) -> (std::path::PathBuf, std::path::PathBuf) {
let config = root.join("omnigraph.yaml");
let policy = root.join("policy.yaml");
fs::write(
&config,
r#"
project:
name: policy-test-graph
policy:
file: ./policy.yaml
"#,
)
.unwrap();
fs::write(&policy, POLICY_YAML).unwrap();
fs::write(root.join("policy.tests.yaml"), POLICY_TESTS_YAML).unwrap();
(config, policy)
}
pub fn write_cluster_config_fixture(root: &std::path::Path) {
fs::write(
root.join("people.pg"),
r#"
node Person {
name: String @key
age: I32?
}
"#,
)
.unwrap();
fs::write(
root.join("people.gq"),
r#"
query find_person($name: String) {
match { $p: Person { name: $name } }
return { $p.name, $p.age }
}
"#,
)
.unwrap();
fs::write(root.join("base.policy.yaml"), "rules: []\n").unwrap();
fs::write(
root.join("cluster.yaml"),
r#"
version: 1
metadata:
name: company-brain
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
policies:
base:
file: ./base.policy.yaml
applies_to: [knowledge]
"#,
)
.unwrap();
}
pub fn init_cluster_derived_graph(root: &std::path::Path) {
init_named_cluster_graph(root, "knowledge", "people.pg");
}
pub fn init_named_cluster_graph(root: &std::path::Path, graph_id: &str, schema_file: &str) {
let graph_dir = root.join("graphs");
fs::create_dir_all(&graph_dir).unwrap();
output_success(
cli()
.arg("init")
.arg("--schema")
.arg(root.join(schema_file))
.arg(graph_dir.join(format!("{graph_id}.omni"))),
);
}
pub fn write_cluster_lock(root: &std::path::Path, lock_id: &str, operation: &str) {
let state_dir = root.join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("lock.json"),
format!(
r#"{{"version":1,"lock_id":"{lock_id}","operation":"{operation}","created_at":"1970-01-01T00:00:00Z","pid":123}}"#
),
)
.unwrap();
}
pub fn write_cluster_applyable_state(root: &std::path::Path) -> serde_json::Value {
let validate = parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg("validate")
.arg("--config")
.arg(root)
.arg("--json"),
));
let schema_digest = validate["resource_digests"]["schema.knowledge"]
.as_str()
.unwrap()
.to_string();
let state_dir = root.join("__cluster");
fs::create_dir_all(&state_dir).unwrap();
fs::write(
state_dir.join("state.json"),
format!(
r#"{{
"version": 1,
"state_revision": 1,
"applied_revision": {{
"resources": {{
"graph.knowledge": {{ "digest": "seed" }},
"schema.knowledge": {{ "digest": "{schema_digest}" }}
}}
}}
}}
"#
),
)
.unwrap();
validate
}
pub fn cluster_json(root: &std::path::Path, command: &str) -> serde_json::Value {
parse_stdout_json(&output_success(
cli()
.arg("cluster")
.arg(command)
.arg("--config")
.arg(root)
.arg("--json"),
))
}
pub fn write_multi_graph_cluster_fixture(root: &std::path::Path) {
write_cluster_config_fixture(root);
fs::write(
root.join("services.pg"),
r#"
node Service {
name: String @key
}
"#,
)
.unwrap();
fs::write(
root.join("services.gq"),
r#"
query find_service($name: String) {
match { $s: Service { name: $name } }
return { $s.name }
}
"#,
)
.unwrap();
fs::write(root.join("cluster_wide.policy.yaml"), "rules: []\n").unwrap();
fs::write(root.join("shared.policy.yaml"), "rules: []\n").unwrap();
fs::write(
root.join("cluster.yaml"),
r#"
version: 1
metadata:
name: company-brain
state:
backend: cluster
lock: true
graphs:
knowledge:
schema: ./people.pg
queries:
find_person:
file: ./people.gq
engineering:
schema: ./services.pg
queries:
find_service:
file: ./services.gq
policies:
shared:
file: ./shared.policy.yaml
applies_to: [knowledge, engineering]
cluster_wide:
file: ./cluster_wide.policy.yaml
applies_to: [cluster]
"#,
)
.unwrap();
}
pub fn change_for<'j>(json: &'j serde_json::Value, resource: &str) -> &'j serde_json::Value {
json["changes"]
.as_array()
.unwrap()
.iter()
.find(|change| change["resource"] == resource)
.unwrap_or_else(|| panic!("missing change for {resource}: {json}"))
}
pub fn write_seed_fixture(root: &std::path::Path) -> std::path::PathBuf {
fs::create_dir_all(root.join("data")).unwrap();
fs::create_dir_all(root.join("build")).unwrap();
let raw_seed = root.join("data/seed.jsonl");
let seed = root.join("seed.yaml");
fs::write(
&raw_seed,
concat!(
"{\"type\":\"Decision\",\"data\":{\"slug\":\"dec-alpha\",\"intent\":\"Alpha ship\"}}\n",
"{\"type\":\"Decision\",\"data\":{\"slug\":\"dec-beta\",\"intent\":\"Beta ship\",\"embedding\":[0.1,0.2]}}\n"
),
)
.unwrap();
fs::write(
&seed,
concat!(
"graph:\n",
" slug: mr-context-graph\n",
"sources:\n",
" raw_seed: ./data/seed.jsonl\n",
"artifacts:\n",
" embedded_seed: ./build/seed.embedded.jsonl\n",
"embeddings:\n",
" model: gemini-embedding-2-preview\n",
" dimension: 4\n",
" types:\n",
" Decision:\n",
" target: embedding\n",
" fields: [slug, intent]\n"
),
)
.unwrap();
seed
}
pub fn write_seed_fixture_with_edge(root: &std::path::Path) -> std::path::PathBuf {
let seed = write_seed_fixture(root);
let raw_seed = root.join("data/seed.jsonl");
fs::write(
&raw_seed,
concat!(
"{\"type\":\"Decision\",\"data\":{\"slug\":\"dec-alpha\",\"intent\":\"Alpha ship\"}}\n",
"{\"type\":\"Decision\",\"data\":{\"slug\":\"dec-beta\",\"intent\":\"Beta ship\",\"embedding\":[0.1,0.2]}}\n",
"{\"edge\":\"Triggered\",\"from\":\"sig-alpha\",\"to\":\"dec-alpha\"}\n"
),
)
.unwrap();
seed
}
pub fn read_embedded_rows(path: std::path::PathBuf) -> Vec<Value> {
fs::read_to_string(path)
.unwrap()
.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| serde_json::from_str(line).unwrap())
.collect()
}
pub fn queries_test_config(graph_uri: &str, entry: &str, gq_file: &str) -> String {
format!(
"graphs:\n local:\n uri: '{}'\n queries:\n {entry}:\n file: ./{gq_file}\n\
cli:\n graph: local\npolicy: {{}}\n",
graph_uri.replace('\'', "''")
)
}