use std::ffi::OsString; use std::fs; use std::io::{self, Write}; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use clap::{Arg, ArgAction, Args, CommandFactory, FromArgMatches, Parser, Subcommand, ValueEnum}; use color_eyre::eyre::{Result, bail}; use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph::storage::normalize_root_uri; use omnigraph_cluster::{ ApplyOutput, DiagnosticSeverity, ForceUnlockOutput, PlanOutput, StateSyncOutput, StatusOutput, ValidateOutput, apply_config_dir, force_unlock_config_dir, import_config_dir, plan_config_dir, refresh_config_dir, status_config_dir, validate_config_dir, }; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::schema::parser::parse_schema; use omnigraph_compiler::{ JsonParamMode, ParamMap, QueryLintOutput, QueryLintQueryKind, QueryLintSchemaSource, QueryLintSeverity, QueryLintStatus, SchemaMigrationPlan, SchemaMigrationStep, build_catalog, json_params_to_param_map, lint_query_file, }; use omnigraph_server::api::{ BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput, BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitListOutput, CommitOutput, ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, IngestRequest, ReadOutput, ReadRequest, SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotOutput, SnapshotTableOutput, commit_output, ingest_output, read_output, schema_apply_output, snapshot_payload, }; use omnigraph_server::queries::{QueryRegistry, check, format_check_breakages}; use omnigraph_server::{ AliasCommand, OmnigraphConfig, PolicyAction, PolicyDecision, PolicyEngine, PolicyRequest, PolicyTestConfig, ReadOutputFormat, graph_resource_id_for_selection, load_config, }; use reqwest::Method; use reqwest::header::AUTHORIZATION; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; mod embed; mod read_format; use embed::{EmbedArgs, EmbedOutput, execute_embed}; use read_format::{ReadRenderOptions, render_read}; const DEFAULT_BEARER_TOKEN_ENV: &str = "OMNIGRAPH_BEARER_TOKEN"; #[derive(Debug, Parser)] #[command(name = "omnigraph")] #[command(about = "Omnigraph graph database CLI")] #[command(version = env!("CARGO_PKG_VERSION"), disable_version_flag = true)] struct Cli { /// Actor identity for direct-engine writes (MR-722). Overrides /// `cli.actor` from `omnigraph.yaml`. When the configured policy /// is in effect, Cedar evaluates this actor against the requested /// action and scope; with policy configured but neither this flag /// nor `cli.actor` set, the engine-layer footgun guard fires and /// the write is denied (no silent bypass). Has no effect on remote /// HTTP writes — those resolve their actor server-side from the /// bearer token. #[arg(long = "as", global = true, value_name = "ACTOR")] as_actor: Option, #[command(subcommand)] command: Command, } #[derive(Debug, Subcommand)] enum Command { /// Print the CLI version Version, /// Generate, clean, or refresh explicit seed embeddings Embed(EmbedArgs), /// Initialize a new graph from a schema Init { #[arg(long)] schema: PathBuf, /// Graph URI (local path or s3://) uri: String, /// Overwrite existing schema artifacts at the URI. Without /// this flag, init refuses to touch a URI that already holds /// `_schema.pg`, `_schema.ir.json`, or `__schema_state.json` /// — closes the re-init footgun (MR-668 follow-up). With the /// flag, the operator opts in to destructive semantics. #[arg(long)] force: bool, }, /// Load data into a graph Load { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] data: PathBuf, #[arg(long)] branch: Option, #[arg(long, default_value = "overwrite")] mode: CliLoadMode, #[arg(long)] json: bool, }, /// Ingest data into a reviewable named branch Ingest { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] data: PathBuf, #[arg(long)] branch: Option, #[arg(long)] from: Option, #[arg(long, default_value = "merge")] mode: CliLoadMode, #[arg(long)] json: bool, }, /// Branch operations Branch { #[command(subcommand)] command: BranchCommand, }, /// Schema planning operations Schema { #[command(subcommand)] command: SchemaCommand, }, /// Validate queries against a schema (offline) or repo (repo-backed). /// /// Canonical name is `lint` (matches the `omnigraph_compiler::lint` /// module and the `OG-XXX-NNN` lint-code vocabulary). Replaces the /// deprecated `omnigraph query lint` / `omnigraph query check` / /// `omnigraph check` invocations — each is kept as an argv-level /// shim that prints a one-line stderr warning and rewrites to /// `omnigraph lint`. Aliases are deliberately *not* exposed via /// clap's `visible_alias` because that would advertise two /// equivalent canonical names, which agents emit interchangeably /// (see MR-981). Lint { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] query: PathBuf, #[arg(long)] schema: Option, #[arg(long)] json: bool, }, /// Operate on the server-side stored-query registry (`queries:`). Queries { #[command(subcommand)] command: QueriesCommand, }, /// Show graph snapshot Snapshot { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] branch: Option, #[arg(long)] json: bool, }, /// Export a full graph snapshot as JSONL Export { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] branch: Option, #[arg(long, hide = true)] jsonl: bool, #[arg(long = "type")] type_names: Vec, #[arg(long = "table")] table_keys: Vec, }, /// Commit history operations Commit { #[command(subcommand)] command: CommitCommand, }, /// Execute a read query against a branch or snapshot. /// /// Canonical read endpoint. The previous name `omnigraph read` is /// kept as a visible alias and prints a one-line deprecation warning /// when used. Pairs with `omnigraph mutate` on the write side. #[command(visible_alias = "read")] Query { /// Graph URI #[arg(long)] uri: Option, #[arg(hide = true)] legacy_uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long, conflicts_with_all = ["query", "query_string"])] alias: Option, #[arg(long, conflicts_with_all = ["alias", "query_string"])] query: Option, /// Inline GQ source — alternative to `--query ` and `--alias `. #[arg(short = 'e', long = "query-string", value_name = "GQ", conflicts_with_all = ["query", "alias"])] query_string: Option, #[arg(long)] name: Option, #[command(flatten)] params: ParamsArgs, #[arg(long, conflicts_with = "snapshot")] branch: Option, #[arg(long, conflicts_with = "branch")] snapshot: Option, #[arg(long, conflicts_with = "json")] format: Option, #[arg(long, conflicts_with = "format")] json: bool, #[arg()] alias_args: Vec, }, /// Execute a graph mutation query against a branch. /// /// Canonical mutation endpoint. The previous name `omnigraph change` /// is kept as a visible alias and prints a one-line deprecation /// warning when used. Pairs with `omnigraph query` on the read side. #[command(visible_alias = "change")] Mutate { /// Graph URI #[arg(long)] uri: Option, #[arg(hide = true)] legacy_uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long, conflicts_with_all = ["query", "query_string"])] alias: Option, #[arg(long, conflicts_with_all = ["alias", "query_string"])] query: Option, /// Inline GQ source — alternative to `--query ` and `--alias `. #[arg(short = 'e', long = "query-string", value_name = "GQ", conflicts_with_all = ["query", "alias"])] query_string: Option, #[arg(long)] name: Option, #[command(flatten)] params: ParamsArgs, #[arg(long)] branch: Option, #[arg(long)] json: bool, #[arg()] alias_args: Vec, }, /// Policy administration and diagnostics Policy { #[command(subcommand)] command: PolicyCommand, }, /// Compact small Lance fragments in every table of the graph Optimize { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, /// Classify and explicitly repair manifest/head drift Repair { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, /// Publish verified maintenance drift. Without this flag, repair only /// previews what it would do. #[arg(long)] confirm: bool, /// Also publish suspicious or unverifiable drift. Requires /// `--confirm`; use only after operator review. #[arg(long, requires = "confirm")] force: bool, #[arg(long)] json: bool, }, /// Remove old Lance versions from every table of the graph (destructive) Cleanup { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, /// Number of recent versions to keep per table. Either `--keep` or /// `--older-than` (or both) must be set. #[arg(long)] keep: Option, /// Only remove versions older than this duration. Accepts Go-style /// durations: `7d`, `24h`, `90m`. At least one of --keep / --older-than. #[arg(long)] older_than: Option, /// Required to actually run; without it, prints what would be removed #[arg(long)] confirm: bool, #[arg(long)] json: bool, }, /// Validate and plan read-only cluster configuration. Cluster { #[command(subcommand)] command: ClusterCommand, }, /// Manage graphs on a multi-graph server (MR-668) Graphs { #[command(subcommand)] command: GraphsCommand, }, } #[derive(Debug, Subcommand)] enum ClusterCommand { /// Validate cluster.yaml and referenced schemas, queries, and policy files. Validate { /// Cluster config directory containing cluster.yaml. #[arg(long, default_value = ".")] config: PathBuf, /// Emit JSON instead of human text. #[arg(long)] json: bool, }, /// Produce a read-only plan by diffing cluster.yaml against __cluster/state.json. Plan { /// Cluster config directory containing cluster.yaml. #[arg(long, default_value = ".")] config: PathBuf, /// Emit JSON instead of human text. #[arg(long)] json: bool, }, /// Apply the config-only (query/policy) subset of the plan to the local /// cluster catalog. Graph/schema changes are deferred to a later stage. Apply { /// Cluster config directory containing cluster.yaml. #[arg(long, default_value = ".")] config: PathBuf, /// Emit JSON instead of human text. #[arg(long)] json: bool, }, /// Read the local JSON state ledger without scanning live graph resources. Status { /// Cluster config directory containing cluster.yaml. #[arg(long, default_value = ".")] config: PathBuf, /// Emit JSON instead of human text. #[arg(long)] json: bool, }, /// Refresh existing local JSON state from declared graph observations. Refresh { /// Cluster config directory containing cluster.yaml. #[arg(long, default_value = ".")] config: PathBuf, /// Emit JSON instead of human text. #[arg(long)] json: bool, }, /// Import initial local JSON state from declared graph observations. Import { /// Cluster config directory containing cluster.yaml. #[arg(long, default_value = ".")] config: PathBuf, /// Emit JSON instead of human text. #[arg(long)] json: bool, }, /// Remove a held local JSON state lock after operator confirmation. ForceUnlock { /// Exact lock id from cluster status or a state_lock_held diagnostic. lock_id: String, /// Cluster config directory containing cluster.yaml. #[arg(long, default_value = ".")] config: PathBuf, /// Emit JSON instead of human text. #[arg(long)] json: bool, }, } /// Operations on the graph registry of a multi-graph server (MR-668). /// /// All operations target a remote multi-graph server URL (http:// or /// https://). Local-URI invocations return a clear error. To add or /// remove graphs, operators edit `omnigraph.yaml` directly and restart /// the server — runtime mutation is not exposed in v0.6.0. #[derive(Debug, Subcommand)] enum GraphsCommand { /// List every graph registered with the multi-graph server. List { /// Remote server URL (e.g. `https://server.example.com`). #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, } #[derive(Debug, Subcommand)] enum BranchCommand { /// Create a new branch Create { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] from: Option, name: String, #[arg(long)] json: bool, }, /// List branches List { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, /// Delete a branch Delete { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, name: String, #[arg(long)] json: bool, }, /// Merge a source branch into a target branch Merge { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, source: String, #[arg(long)] into: Option, #[arg(long)] json: bool, }, } #[derive(Debug, Subcommand)] enum SchemaCommand { /// Plan a schema migration against the accepted persisted schema Plan { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] schema: PathBuf, #[arg(long)] json: bool, /// Show the plan as it would execute with `--allow-data-loss`. /// Promotes every `DropMode::Soft` step to `DropMode::Hard` /// so the plan output reflects the destructive intent. #[arg(long, default_value_t = false)] allow_data_loss: bool, }, /// Apply a supported schema migration Apply { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] schema: PathBuf, #[arg(long)] json: bool, /// Allow destructive (data-loss) schema changes. /// /// Without this flag, drops are "soft": the column or table /// is removed from the current manifest version but prior /// versions are retained, so `snapshot_at_version(pre_drop)` /// can still read the dropped data until `omnigraph cleanup` /// runs. With this flag, drops are "hard": `cleanup_old_versions` /// runs on the affected datasets immediately after the apply, /// making the prior data unreachable. #[arg(long, default_value_t = false)] allow_data_loss: bool, }, /// Show the current accepted schema source #[command(alias = "get")] Show { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, } #[derive(Debug, Subcommand)] enum CommitCommand { /// List graph commits List { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] branch: Option, #[arg(long)] json: bool, }, /// Show a graph commit Show { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, commit_id: String, #[arg(long)] json: bool, }, } #[derive(Debug, Subcommand)] enum PolicyCommand { /// Validate policy YAML and compiled Cedar policy state Validate { #[arg(long)] config: Option, }, /// Run declarative policy tests from policy.tests.yaml Test { #[arg(long)] config: Option, }, /// Explain one policy decision locally Explain { #[arg(long)] config: Option, #[arg(long)] actor: String, #[arg(long)] action: PolicyAction, #[arg(long)] branch: Option, #[arg(long = "target-branch")] target_branch: Option, }, } #[derive(Debug, Subcommand)] enum QueriesCommand { /// Type-check the stored-query registry against the live schema. /// /// Distinct from `omnigraph lint` (which lints one `.gq` file): /// this validates the whole `queries:` registry — opening the graph /// to read its schema and confirming every stored query still /// type-checks. Exits non-zero on any breakage. Validate { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, /// List the registered stored queries (name, MCP exposure, params). List { #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, } #[derive(Debug, Args, Clone)] struct ParamsArgs { #[arg(long, conflicts_with = "params_file")] params: Option, #[arg(long, conflicts_with = "params")] params_file: Option, } #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, ValueEnum)] #[serde(rename_all = "snake_case")] enum CliLoadMode { Overwrite, Append, Merge, } impl From for LoadMode { fn from(value: CliLoadMode) -> Self { match value { CliLoadMode::Overwrite => LoadMode::Overwrite, CliLoadMode::Append => LoadMode::Append, CliLoadMode::Merge => LoadMode::Merge, } } } impl CliLoadMode { fn as_str(self) -> &'static str { match self { CliLoadMode::Overwrite => "overwrite", CliLoadMode::Append => "append", CliLoadMode::Merge => "merge", } } } #[derive(Debug, Serialize)] struct LoadOutput<'a> { uri: &'a str, branch: &'a str, mode: &'a str, nodes_loaded: usize, edges_loaded: usize, node_types_loaded: usize, edge_types_loaded: usize, } #[derive(Debug, Serialize)] struct SchemaPlanOutput<'a> { uri: &'a str, supported: bool, step_count: usize, steps: &'a [SchemaMigrationStep], } fn print_schema_apply_human(output: &SchemaApplyOutput) { println!("schema apply for {}", output.uri); println!("supported: {}", if output.supported { "yes" } else { "no" }); println!("applied: {}", if output.applied { "yes" } else { "no" }); println!("manifest_version: {}", output.manifest_version); if output.steps.is_empty() { println!("no schema changes"); return; } for step in &output.steps { println!("- {}", render_schema_plan_step(step)); } } fn query_kind_label(kind: QueryLintQueryKind) -> &'static str { match kind { QueryLintQueryKind::Read => "read", QueryLintQueryKind::Mutation => "mutation", } } fn severity_label(severity: QueryLintSeverity) -> &'static str { match severity { QueryLintSeverity::Error => "ERROR", QueryLintSeverity::Warning => "WARN ", QueryLintSeverity::Info => "INFO ", } } fn print_query_lint_human(output: &QueryLintOutput) { for result in &output.results { match result.status { QueryLintStatus::Ok => { println!( "OK query `{}` ({})", result.name, query_kind_label(result.kind) ); } QueryLintStatus::Error => { println!( "ERROR query `{}`: {}", result.name, result.error.as_deref().unwrap_or("unknown error") ); } } for warning in &result.warnings { println!("WARN query `{}`: {}", result.name, warning); } } for finding in &output.findings { println!("{} {}", severity_label(finding.severity), finding.message); } println!( "INFO Lint complete: {} queries processed ({} error(s), {} warning(s), {} info item(s))", output.queries_processed, output.errors, output.warnings, output.infos ); } fn finish_query_lint(output: &QueryLintOutput, json: bool) -> Result<()> { if json { print_json(output)?; } else { print_query_lint_human(output); } if output.status == QueryLintStatus::Error { io::stdout().flush()?; std::process::exit(1); } Ok(()) } fn ensure_local_graph_parent(uri: &str) -> Result<()> { if !uri.contains("://") { fs::create_dir_all(uri)?; } Ok(()) } fn print_json(value: &T) -> Result<()> { println!("{}", serde_json::to_string_pretty(value)?); Ok(()) } fn print_cluster_validate_human(output: &ValidateOutput) { if output.ok { println!( "cluster config valid: {} resource(s), {} dependency edge(s)", output.resources.len(), output.dependencies.len() ); } else { println!("cluster config invalid"); } print_cluster_diagnostics(&output.diagnostics); } fn print_cluster_plan_human(output: &PlanOutput) { if output.ok { println!( "cluster plan: {} change(s), {} approval gate(s)", output.changes.len(), output.approvals_required.len() ); for change in &output.changes { println!(" {:?} {}", change.operation, change.resource); } if output.changes.is_empty() { println!(" no changes"); } } else { println!("cluster plan failed"); } print_cluster_diagnostics(&output.diagnostics); } fn print_cluster_apply_human(output: &ApplyOutput) { if output.ok { println!( "cluster apply: {} applied, {} deferred/blocked", output.applied_count, output.deferred_count ); for change in &output.changes { match (&change.disposition, change.reason.as_deref()) { (Some(disposition), Some(reason)) => println!( " {:?} {} [{disposition:?}: {reason}]", change.operation, change.resource ), (Some(disposition), None) => println!( " {:?} {} [{disposition:?}]", change.operation, change.resource ), _ => println!(" {:?} {}", change.operation, change.resource), } } if output.changes.is_empty() { println!(" no changes"); } let state = &output.state_observations; println!( " state: revision {}, converged: {}, written: {}", state.state_revision, output.converged, output.state_written ); println!(" note: applied = recorded in the cluster catalog; the server still boots from omnigraph.yaml"); } else { println!("cluster apply failed"); } print_cluster_diagnostics(&output.diagnostics); } fn print_cluster_status_human(output: &StatusOutput) { if output.ok { let state = &output.state_observations; if state.state_found { println!( "cluster state: revision {}, {} resource(s)", state.state_revision, state.resource_count ); if let Some(digest) = state.applied_config_digest.as_deref() { println!(" applied config: {digest}"); } if state.locked { println!(" lock: held{}", cluster_lock_summary(state)); } else { println!(" lock: not held"); } } else { println!("cluster state missing"); } } else { println!("cluster status failed"); } print_cluster_diagnostics(&output.diagnostics); } fn print_cluster_state_sync_human(output: &StateSyncOutput) { let operation = match output.operation { omnigraph_cluster::StateSyncOperation::Refresh => "refresh", omnigraph_cluster::StateSyncOperation::Import => "import", }; if output.ok { let state = &output.state_observations; println!( "cluster {operation}: revision {}, {} resource(s)", state.state_revision, state.resource_count ); if let Some(cas) = state.state_cas.as_deref() { println!(" state_cas: {cas}"); } if state.locked { println!(" lock: acquired{}", cluster_lock_summary(state)); } else { println!(" lock: not acquired"); } } else { println!("cluster {operation} failed"); } print_cluster_diagnostics(&output.diagnostics); } fn print_cluster_force_unlock_human(output: &ForceUnlockOutput) { if output.ok { if output.lock_removed { println!( "cluster force-unlock: removed lock{}", cluster_lock_summary(&output.state_observations) ); } else { println!("cluster force-unlock: no lock removed"); } } else { println!("cluster force-unlock failed"); if output.state_observations.locked { println!( " lock: held{}", cluster_lock_summary(&output.state_observations) ); } } print_cluster_diagnostics(&output.diagnostics); } fn cluster_lock_summary(state: &omnigraph_cluster::StateObservations) -> String { let Some(lock_id) = state.lock_id.as_deref() else { return String::new(); }; let mut parts = vec![format!("id={lock_id}")]; if let Some(operation) = state.lock_operation.as_deref() { parts.push(format!("operation={operation}")); } if let Some(pid) = state.lock_pid { parts.push(format!("pid={pid}")); } if let Some(created_at) = state.lock_created_at.as_deref() { parts.push(format!("created_at={created_at}")); } if let Some(age_seconds) = state.lock_age_seconds { parts.push(format!("age_seconds={age_seconds}")); } format!(" ({})", parts.join(", ")) } fn print_cluster_diagnostics(diagnostics: &[omnigraph_cluster::Diagnostic]) { for diagnostic in diagnostics { let label = match diagnostic.severity { DiagnosticSeverity::Error => "ERROR", DiagnosticSeverity::Warning => "WARN ", }; println!( "{label} {} {}: {}", diagnostic.code, diagnostic.path, diagnostic.message ); } } fn finish_cluster_validate(output: &ValidateOutput, json: bool) -> Result<()> { if json { print_json(output)?; } else { print_cluster_validate_human(output); } if !output.ok { io::stdout().flush()?; std::process::exit(1); } Ok(()) } fn finish_cluster_plan(output: &PlanOutput, json: bool) -> Result<()> { if json { print_json(output)?; } else { print_cluster_plan_human(output); } if !output.ok { io::stdout().flush()?; std::process::exit(1); } Ok(()) } fn finish_cluster_apply(output: &ApplyOutput, json: bool) -> Result<()> { if json { print_json(output)?; } else { print_cluster_apply_human(output); } if !output.ok { io::stdout().flush()?; std::process::exit(1); } Ok(()) } fn finish_cluster_status(output: &StatusOutput, json: bool) -> Result<()> { if json { print_json(output)?; } else { print_cluster_status_human(output); } if !output.ok { io::stdout().flush()?; std::process::exit(1); } Ok(()) } fn finish_cluster_state_sync(output: &StateSyncOutput, json: bool) -> Result<()> { if json { print_json(output)?; } else { print_cluster_state_sync_human(output); } if !output.ok { io::stdout().flush()?; std::process::exit(1); } Ok(()) } fn finish_cluster_force_unlock(output: &ForceUnlockOutput, json: bool) -> Result<()> { if json { print_json(output)?; } else { print_cluster_force_unlock_human(output); } if !output.ok { io::stdout().flush()?; std::process::exit(1); } Ok(()) } fn is_remote_uri(uri: &str) -> bool { uri.starts_with("http://") || uri.starts_with("https://") } fn remote_url(base: &str, path: &str) -> String { format!("{}{}", base.trim_end_matches('/'), path) } fn remote_branch_url(base: &str, branch: &str) -> Result { let mut url = reqwest::Url::parse(&format!("{}/", base.trim_end_matches('/')))?; url.path_segments_mut() .map_err(|_| color_eyre::eyre::eyre!("invalid remote base url"))? .extend(["branches", branch]); Ok(url.to_string()) } fn normalize_bearer_token(value: Option) -> Option { value .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()) } fn bearer_token_from_env(var_name: &str) -> Option { normalize_bearer_token(std::env::var(var_name).ok()) } fn parse_env_assignment(line: &str) -> Option<(String, String)> { let line = line.trim(); if line.is_empty() || line.starts_with('#') { return None; } let line = line.strip_prefix("export ").unwrap_or(line).trim(); let (name, value) = line.split_once('=')?; let name = name.trim(); if name.is_empty() { return None; } let value = value.trim(); let value = if value.len() >= 2 && ((value.starts_with('"') && value.ends_with('"')) || (value.starts_with('\'') && value.ends_with('\''))) { &value[1..value.len() - 1] } else { value }; Some((name.to_string(), value.to_string())) } fn bearer_token_from_env_file(path: &Path, var_name: &str) -> Result> { if !path.exists() { return Ok(None); } for line in fs::read_to_string(path)?.lines() { let Some((name, value)) = parse_env_assignment(line) else { continue; }; if name == var_name { return Ok(normalize_bearer_token(Some(value))); } } Ok(None) } fn load_env_file_into_process(path: &Path) -> Result<()> { if !path.exists() { return Ok(()); } for line in fs::read_to_string(path)?.lines() { let Some((name, value)) = parse_env_assignment(line) else { continue; }; if std::env::var_os(&name).is_none() { unsafe { std::env::set_var(name, value); } } } Ok(()) } fn load_cli_config(config_path: Option<&PathBuf>) -> Result { let config = load_config(config_path)?; if let Some(path) = config.resolve_auth_env_file() { load_env_file_into_process(&path)?; } Ok(config) } #[derive(Debug, Clone)] struct ResolvedCliGraph { uri: String, selected: Option, graph_id: String, policy_file: Option, is_remote: bool, } impl ResolvedCliGraph { fn selected(&self) -> Option<&str> { self.selected.as_deref() } } struct ResolvedPolicyContext { policy_file: PathBuf, graph_id: String, } fn resolve_policy_context(config: &OmnigraphConfig) -> Result { let selected = config.resolve_policy_tooling_graph_selection()?; let policy_file = config.resolve_policy_file_for(selected).ok_or_else(|| { color_eyre::eyre::eyre!( "policy.file or graphs..policy.file must be set in omnigraph.yaml" ) })?; let graph_id = match selected { Some(name) => graph_resource_id_for_selection(Some(name), ""), None => graph_resource_id_for_selection(None, "default"), }; Ok(ResolvedPolicyContext { policy_file, graph_id, }) } fn resolve_policy_engine(context: &ResolvedPolicyContext) -> Result { PolicyEngine::load_graph(&context.policy_file, &context.graph_id) } fn resolve_policy_engine_for_graph(graph: &ResolvedCliGraph) -> Result { let policy_file = graph.policy_file.as_ref().ok_or_else(|| { color_eyre::eyre::eyre!( "policy.file or graphs..policy.file must be set in omnigraph.yaml" ) })?; PolicyEngine::load_graph(policy_file, &graph.graph_id) } /// Open a local graph and install the policy resolved for the same graph /// identity that produced the URI. A named graph uses /// `graphs..policy.file`; an explicit positional URI is anonymous and /// uses the legacy top-level `policy.file`. async fn open_local_db_with_policy(graph: &ResolvedCliGraph) -> Result { let db = Omnigraph::open(&graph.uri).await?; if graph.policy_file.is_some() { let engine = Arc::new(resolve_policy_engine_for_graph(graph)?); Ok(db.with_policy(engine as Arc)) } else { Ok(db) } } /// Resolve the CLI's effective actor identity for engine-layer policy /// (MR-722). Precedence: `--as ` (top-level flag) overrides /// `cli.actor` from `omnigraph.yaml`; both unset returns `None`. When /// policy is configured and this returns `None`, the engine-layer /// footgun guard intentionally denies — silent bypass via "I forgot the /// actor" is what the guard prevents. fn resolve_cli_actor<'a>(cli_as: Option<&'a str>, config: &'a OmnigraphConfig) -> Option<&'a str> { cli_as.or(config.cli.actor.as_deref()) } fn resolve_policy_tests_path(context: &ResolvedPolicyContext) -> PathBuf { context.policy_file.with_file_name("policy.tests.yaml") } fn normalize_policy_graph_uri(uri: &str) -> Result { if is_remote_uri(uri) { Ok(uri.trim_end_matches('/').to_string()) } else { Ok(normalize_root_uri(uri)?) } } fn resolve_remote_bearer_token( config: &OmnigraphConfig, explicit_uri: Option<&str>, explicit_target: Option<&str>, ) -> Result> { let scoped_env = config.graph_bearer_token_env(explicit_uri, explicit_target, config.cli_graph_name()); let mut env_names = Vec::new(); if let Some(name) = scoped_env { env_names.push(name.to_string()); } if env_names .iter() .all(|name| name != DEFAULT_BEARER_TOKEN_ENV) { env_names.push(DEFAULT_BEARER_TOKEN_ENV.to_string()); } let env_file = config.resolve_auth_env_file(); for env_name in env_names { if let Some(token) = bearer_token_from_env(&env_name) { return Ok(Some(token)); } if let Some(path) = env_file.as_ref() { if let Some(token) = bearer_token_from_env_file(path, &env_name)? { return Ok(Some(token)); } } } Ok(None) } fn build_http_client() -> Result { Ok(reqwest::Client::new()) } fn apply_bearer_token( request: reqwest::RequestBuilder, token: Option<&str>, ) -> reqwest::RequestBuilder { if let Some(token) = token { request.header(AUTHORIZATION, format!("Bearer {}", token)) } else { request } } async fn remote_json( client: &reqwest::Client, method: Method, url: String, body: Option, bearer_token: Option<&str>, ) -> Result { let request = apply_bearer_token(client.request(method, url), bearer_token); let request = if let Some(body) = body { request.json(&body) } else { request }; let response = request.send().await?; let status = response.status(); let text = response.text().await?; if !status.is_success() { if let Ok(error) = serde_json::from_str::(&text) { bail!(error.error); } bail!("server returned {}: {}", status, text); } Ok(serde_json::from_str(&text)?) } fn resolve_uri( config: &OmnigraphConfig, cli_uri: Option, cli_target: Option<&str>, ) -> Result { config.resolve_target_uri(cli_uri, cli_target, config.cli_graph_name()) } fn resolve_cli_graph( config: &OmnigraphConfig, cli_uri: Option, cli_target: Option<&str>, ) -> Result { let selected = if cli_uri.is_some() { None } else { cli_target .map(str::to_string) .or_else(|| config.cli_graph_name().map(str::to_string)) }; config.resolve_graph_selection(selected.as_deref())?; let uri = resolve_uri(config, cli_uri, cli_target)?; let normalized_uri = normalize_policy_graph_uri(&uri)?; let graph_id = graph_resource_id_for_selection(selected.as_deref(), &normalized_uri); Ok(ResolvedCliGraph { graph_id, is_remote: is_remote_uri(&uri), policy_file: config.resolve_policy_file_for(selected.as_deref()), selected, uri, }) } fn resolve_local_graph( config: &OmnigraphConfig, cli_uri: Option, cli_target: Option<&str>, operation: &str, ) -> Result { let graph = resolve_cli_graph(config, cli_uri, cli_target)?; if graph.is_remote { bail!( "{} is only supported against local graph URIs in this milestone", operation ); } Ok(graph) } /// Parse a Go-style compact duration: `7d`, `24h`, `30m`, `90s`, or a plain /// integer as seconds. Used by the `cleanup --older-than` flag. fn parse_duration_arg(s: &str) -> Result { let s = s.trim(); if s.is_empty() { bail!("duration is empty"); } let (num_part, unit) = match s .char_indices() .rev() .find(|(_, c)| c.is_ascii_alphabetic()) { Some((i, _)) => ( &s[..i + 1 - s[i..].chars().next().unwrap().len_utf8()], &s[i..], ), None => (s, ""), }; let n: u64 = num_part .parse() .map_err(|e| color_eyre::eyre::eyre!("invalid duration '{}': {}", s, e))?; let secs = match unit { "" | "s" => n, "m" => n * 60, "h" => n * 60 * 60, "d" => n * 60 * 60 * 24, "w" => n * 60 * 60 * 24 * 7, _ => bail!("unknown duration unit '{}'. Supported: s, m, h, d, w", unit), }; Ok(std::time::Duration::from_secs(secs)) } fn resolve_local_uri( config: &OmnigraphConfig, cli_uri: Option, cli_target: Option<&str>, operation: &str, ) -> Result { Ok(resolve_local_graph(config, cli_uri, cli_target, operation)?.uri) } fn resolve_branch( config: &OmnigraphConfig, cli_branch: Option, alias_branch: Option, default_branch: &str, ) -> String { cli_branch .or(alias_branch) .or_else(|| config.cli.branch.clone()) .unwrap_or_else(|| default_branch.to_string()) } fn resolve_read_target( config: &OmnigraphConfig, cli_branch: Option, cli_snapshot: Option, alias_branch: Option, ) -> Result { if cli_branch.is_some() && cli_snapshot.is_some() { bail!("read target may specify branch or snapshot, not both"); } Ok(read_target_from_cli( cli_branch .or(alias_branch) .or_else(|| config.cli.branch.clone()), cli_snapshot, )) } fn resolve_query_path( config: &OmnigraphConfig, explicit_query: Option<&PathBuf>, alias_query: Option<&str>, ) -> Result { explicit_query .map(PathBuf::from) .or_else(|| alias_query.map(PathBuf::from)) .ok_or_else(|| { color_eyre::eyre::eyre!( "exactly one of --query, --query-string, or --alias must be provided" ) }) .and_then(|query_path| config.resolve_query_path(&query_path)) } fn resolve_query_source( config: &OmnigraphConfig, explicit_query: Option<&PathBuf>, inline_query: Option<&str>, alias_query: Option<&str>, ) -> Result { if let Some(inline) = inline_query { if inline.trim().is_empty() { bail!("--query-string must not be empty"); } return Ok(inline.to_string()); } Ok(fs::read_to_string(resolve_query_path( config, explicit_query, alias_query, )?)?) } fn parse_alias_value(value: &str) -> Value { serde_json::from_str(value).unwrap_or_else(|_| Value::String(value.to_string())) } fn merged_params_json( alias_name: Option<&str>, alias_arg_names: &[String], alias_arg_values: &[String], explicit: Option, ) -> Result> { if alias_arg_values.len() > alias_arg_names.len() { let alias = alias_name.unwrap_or(""); bail!( "alias '{}' expects at most {} args but got {}", alias, alias_arg_names.len(), alias_arg_values.len() ); } let mut merged = serde_json::Map::new(); for (arg_name, arg_value) in alias_arg_names.iter().zip(alias_arg_values.iter()) { merged.insert(arg_name.clone(), parse_alias_value(arg_value)); } match explicit { Some(Value::Object(object)) => { for (key, value) in object { merged.insert(key, value); } } Some(_) => bail!("params JSON must be an object"), None => {} } if merged.is_empty() { Ok(None) } else { Ok(Some(Value::Object(merged))) } } fn print_load_human( uri: &str, branch: &str, mode: CliLoadMode, nodes_loaded: usize, edges_loaded: usize, node_types_loaded: usize, edge_types_loaded: usize, ) { println!( "loaded {} on branch {} with {}: {} nodes across {} node types, {} edges across {} edge types", uri, branch, mode.as_str(), nodes_loaded, node_types_loaded, edges_loaded, edge_types_loaded ); } fn print_ingest_human(output: &IngestOutput) { println!( "ingested {} into branch {} from {} with {} ({})", output.uri, output.branch, output.base_branch, output.mode.as_str(), if output.branch_created { "branch created" } else { "branch exists" } ); for table in &output.tables { println!("{} rows_loaded={}", table.table_key, table.rows_loaded); } if let Some(actor_id) = &output.actor_id { println!("actor_id: {}", actor_id); } } fn print_schema_plan_human(uri: &str, plan: &SchemaMigrationPlan) { println!("schema plan for {}", uri); println!("supported: {}", if plan.supported { "yes" } else { "no" }); if plan.steps.is_empty() { println!("no schema changes"); return; } for step in &plan.steps { println!("- {}", render_schema_plan_step(step)); } } fn render_schema_plan_step(step: &SchemaMigrationStep) -> String { match step { SchemaMigrationStep::AddType { type_kind, name } => { format!("add {} type '{}'", schema_type_kind_label(*type_kind), name) } SchemaMigrationStep::RenameType { type_kind, from, to, } => format!( "rename {} type '{}' -> '{}'", schema_type_kind_label(*type_kind), from, to ), SchemaMigrationStep::AddProperty { type_kind, type_name, property_name, property_type, } => format!( "add property '{}.{}' ({}) on {} '{}'", type_name, property_name, render_prop_type(property_type), schema_type_kind_label(*type_kind), type_name ), SchemaMigrationStep::RenameProperty { type_kind, type_name, from, to, } => format!( "rename property '{}.{}' -> '{}.{}' on {} '{}'", type_name, from, type_name, to, schema_type_kind_label(*type_kind), type_name ), SchemaMigrationStep::AddConstraint { type_kind, type_name, constraint, } => format!( "add constraint {} on {} '{}'", render_constraint(constraint), schema_type_kind_label(*type_kind), type_name ), SchemaMigrationStep::UpdateTypeMetadata { type_kind, name, annotations, } => format!( "update metadata on {} '{}' ({})", schema_type_kind_label(*type_kind), name, render_annotations(annotations) ), SchemaMigrationStep::UpdatePropertyMetadata { type_kind, type_name, property_name, annotations, } => format!( "update metadata on property '{}.{}' of {} '{}' ({})", type_name, property_name, schema_type_kind_label(*type_kind), type_name, render_annotations(annotations) ), SchemaMigrationStep::DropType { type_kind, name, mode, } => format!( "drop {} type '{}' ({} mode)", schema_type_kind_label(*type_kind), name, drop_mode_label(*mode), ), SchemaMigrationStep::DropProperty { type_kind, type_name, property_name, mode, } => format!( "drop property '{}.{}' of {} '{}' ({} mode)", type_name, property_name, schema_type_kind_label(*type_kind), type_name, drop_mode_label(*mode), ), SchemaMigrationStep::UnsupportedChange { entity, reason, .. } => { // When a schema-lint code is attached, render code + tier // so operators see at-a-glance the kind of risk (destructive // / validated / safe) — not just the rule identifier. // Reach the diagnostic via the `diagnostic()` helper so the // CLI doesn't need to know how the lookup works. match step.diagnostic() { Some(diag) => format!( "unsupported change on {} [{}, {}]: {}", entity, diag.code, schema_lint_tier_label(diag.tier), reason, ), None => format!("unsupported change on {}: {}", entity, reason), } } } } fn schema_type_kind_label(kind: omnigraph_compiler::SchemaTypeKind) -> &'static str { match kind { omnigraph_compiler::SchemaTypeKind::Interface => "interface", omnigraph_compiler::SchemaTypeKind::Node => "node", omnigraph_compiler::SchemaTypeKind::Edge => "edge", } } fn schema_lint_tier_label(tier: omnigraph_compiler::SafetyTier) -> &'static str { match tier { omnigraph_compiler::SafetyTier::Safe => "safe", omnigraph_compiler::SafetyTier::Validated => "validated", omnigraph_compiler::SafetyTier::Destructive => "destructive", } } fn drop_mode_label(mode: omnigraph_compiler::DropMode) -> &'static str { match mode { omnigraph_compiler::DropMode::Soft => "soft", omnigraph_compiler::DropMode::Hard => "hard", } } fn render_prop_type(prop_type: &omnigraph_compiler::PropType) -> String { let base = if let Some(values) = &prop_type.enum_values { format!("Enum({})", values.join("|")) } else { prop_type.scalar.to_string() }; let base = if prop_type.list { format!("[{}]", base) } else { base }; if prop_type.nullable { format!("{}?", base) } else { base } } fn render_constraint(constraint: &omnigraph_compiler::schema::ast::Constraint) -> String { match constraint { omnigraph_compiler::schema::ast::Constraint::Key(columns) => { format!("@key({})", columns.join(", ")) } omnigraph_compiler::schema::ast::Constraint::Unique(columns) => { format!("@unique({})", columns.join(", ")) } omnigraph_compiler::schema::ast::Constraint::Index(columns) => { format!("@index({})", columns.join(", ")) } omnigraph_compiler::schema::ast::Constraint::Range { property, min, max } => { format!("@range({}, {:?}, {:?})", property, min, max) } omnigraph_compiler::schema::ast::Constraint::Check { property, pattern } => { format!("@check({}, {:?})", property, pattern) } } } fn render_annotations(annotations: &[omnigraph_compiler::schema::ast::Annotation]) -> String { annotations .iter() .map(|annotation| match &annotation.value { Some(value) => format!("@{}({})", annotation.name, value), None => format!("@{}", annotation.name), }) .collect::>() .join(", ") } fn print_embed_human(output: &EmbedOutput) { println!( "embedded {} rows (selected {}, cleaned {}) from {} -> {} [{} {}d]", output.embedded_rows, output.selected_rows, output.cleaned_rows, output.input, output.output, output.mode, output.dimension ); } fn print_snapshot_human(branch: &str, manifest_version: u64, entries: &[SnapshotTableOutput]) { println!("branch: {}", branch); println!("manifest_version: {}", manifest_version); for entry in entries { println!( "{} v{} branch={} rows={}", entry.table_key, entry.table_version, entry.table_branch.as_deref().unwrap_or("main"), entry.row_count ); } } fn print_read_output( output: &ReadOutput, format: ReadOutputFormat, config: &OmnigraphConfig, ) -> Result<()> { println!( "{}", render_read( output, format, &ReadRenderOptions { max_column_width: config.table_max_column_width(), cell_layout: config.table_cell_layout(), }, )? ); Ok(()) } fn print_change_human(output: &ChangeOutput) { println!( "changed {} via {}: {} nodes, {} edges", output.branch, output.query_name, output.affected_nodes, output.affected_edges ); if let Some(actor_id) = &output.actor_id { println!("actor_id: {}", actor_id); } } fn print_commit_list_human(commits: &[CommitOutput]) { for commit in commits { let branch = commit.manifest_branch.as_deref().unwrap_or("main"); println!( "{} branch={} version={}{}", commit.graph_commit_id, branch, commit.manifest_version, commit .actor_id .as_deref() .map(|actor| format!(" actor={}", actor)) .unwrap_or_default() ); } } fn print_commit_human(commit: &CommitOutput) { println!("graph_commit_id: {}", commit.graph_commit_id); println!( "manifest_branch: {}", commit.manifest_branch.as_deref().unwrap_or("main") ); println!("manifest_version: {}", commit.manifest_version); if let Some(parent_commit_id) = &commit.parent_commit_id { println!("parent_commit_id: {}", parent_commit_id); } if let Some(merged_parent_commit_id) = &commit.merged_parent_commit_id { println!("merged_parent_commit_id: {}", merged_parent_commit_id); } if let Some(actor_id) = &commit.actor_id { println!("actor_id: {}", actor_id); } println!("created_at: {}", commit.created_at); } fn print_policy_explain(decision: &PolicyDecision, actor_id: &str, request: &PolicyRequest) { println!( "decision: {}", if decision.allowed { "allow" } else { "deny" } ); println!("actor: {}", actor_id); println!("action: {}", request.action); if let Some(branch) = &request.branch { println!("branch: {}", branch); } if let Some(target_branch) = &request.target_branch { println!("target_branch: {}", target_branch); } if let Some(rule_id) = &decision.matched_rule_id { println!("matched_rule: {}", rule_id); } println!("message: {}", decision.message); } fn resolve_read_format( config: &OmnigraphConfig, cli_format: Option, json: bool, alias_format: Option, ) -> ReadOutputFormat { if json { ReadOutputFormat::Json } else { cli_format .or(alias_format) .unwrap_or_else(|| config.cli_output_format()) } } fn resolve_alias<'a>( config: &'a OmnigraphConfig, alias_name: Option<&'a str>, expected: AliasCommand, ) -> Result> { let Some(alias_name) = alias_name else { return Ok(None); }; let alias = config.alias(alias_name)?; if alias.command != expected { bail!( "alias '{}' is a {:?} alias, not a {:?} alias", alias_name, alias.command, expected ); } Ok(Some((alias_name, alias))) } fn normalize_legacy_alias_uri( uri: Option, target_available: bool, alias_name: Option<&str>, mut alias_args: Vec, ) -> (Option, Vec) { let Some(candidate) = uri else { return (None, alias_args); }; if alias_name.is_some() && target_available { alias_args.insert(0, candidate); return (None, alias_args); } (Some(candidate), alias_args) } fn scaffold_config_if_missing(uri: &str) -> Result<()> { let path = inferred_config_path(uri)?; if path.exists() { return Ok(()); } fs::write( path, format!( "\ project: name: Omnigraph Project graphs: local: uri: {} # bearer_token_env: OMNIGRAPH_BEARER_TOKEN server: graph: local bind: 127.0.0.1:8080 cli: graph: local branch: main output_format: table table_max_column_width: 80 table_cell_layout: truncate query: roots: - queries - . aliases: # owner: # command: read # query: context.gq # name: decision_owner # args: [slug] # graph: local # branch: main # format: kv # # attach_trace: # command: change # query: mutations.gq # name: attach_trace # args: [decision_slug, trace_slug] # graph: local # branch: main # auth: # env_file: ./.env.omni # # policy: # file: ./policy.yaml ", yaml_string(uri), ), )?; Ok(()) } fn yaml_string(value: &str) -> String { format!("'{}'", value.replace('\'', "''")) } fn inferred_config_path(uri: &str) -> Result { if uri.contains("://") { return Ok(omnigraph_server::config::default_config_path()); } let path = Path::new(uri); let base = if path.is_absolute() { path.parent() .map(Path::to_path_buf) .unwrap_or(std::env::current_dir()?) } else { std::env::current_dir()?.join(path.parent().unwrap_or_else(|| Path::new("."))) }; Ok(base.join(omnigraph_server::config::DEFAULT_CONFIG_FILE)) } fn read_target_from_cli(branch: Option, snapshot: Option) -> ReadTarget { if let Some(snapshot) = snapshot { ReadTarget::snapshot(SnapshotId::new(snapshot)) } else { ReadTarget::branch(branch.unwrap_or_else(|| "main".to_string())) } } fn load_params_json(params: &ParamsArgs) -> Result> { match (¶ms.params, ¶ms.params_file) { (Some(inline), None) => Ok(Some(serde_json::from_str(inline)?)), (None, Some(path)) => Ok(Some(serde_json::from_str(&fs::read_to_string(path)?)?)), (None, None) => Ok(None), (Some(_), Some(_)) => bail!("only one of --params or --params-file may be provided"), } } fn select_named_query( query_source: &str, requested_name: Option<&str>, ) -> Result<(String, Vec)> { let parsed = parse_query(query_source)?; let query = if let Some(name) = requested_name { parsed .queries .into_iter() .find(|query| query.name == name) .ok_or_else(|| color_eyre::eyre::eyre!("query '{}' not found", name))? } else if parsed.queries.len() == 1 { parsed.queries.into_iter().next().unwrap() } else { bail!("query file contains multiple queries; pass --name"); }; Ok((query.name, query.params)) } fn query_params_from_json( query_params: &[omnigraph_compiler::query::ast::Param], params_json: Option<&Value>, ) -> Result { json_params_to_param_map(params_json, query_params, JsonParamMode::Standard) .map_err(|err| color_eyre::eyre::eyre!(err.to_string())) } async fn execute_query_lint( config: &OmnigraphConfig, cli_uri: Option, cli_target: Option<&str>, schema_path: Option<&PathBuf>, query_path: &PathBuf, ) -> Result { let resolved_query_path = resolve_query_path(config, Some(query_path), None)?; let query_source = fs::read_to_string(&resolved_query_path)?; let query_path = resolved_query_path.to_string_lossy().into_owned(); if let Some(schema_path) = schema_path { let schema_source = fs::read_to_string(schema_path)?; let schema = parse_schema(&schema_source).map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?; let catalog = build_catalog(&schema).map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?; return Ok(lint_query_file( &catalog, &query_source, query_path, QueryLintSchemaSource::file(schema_path.to_string_lossy().into_owned()), )); } let has_graph_target = cli_uri.is_some() || cli_target.is_some() || config.cli_graph_name().is_some(); if !has_graph_target { bail!("query lint requires --schema or a resolvable graph target"); } let uri = resolve_local_uri(config, cli_uri, cli_target, "query lint")?; let db = Omnigraph::open(&uri).await?; Ok(lint_query_file( &db.catalog(), &query_source, query_path, QueryLintSchemaSource::graph(uri), )) } #[derive(serde::Serialize)] struct QueriesIssue { query: String, message: String, } #[derive(serde::Serialize)] struct QueriesValidateOutput { ok: bool, breakages: Vec, warnings: Vec, } #[derive(serde::Serialize)] struct QueriesParam { name: String, #[serde(rename = "type")] type_name: String, nullable: bool, } #[derive(serde::Serialize)] struct QueriesListItem { name: String, mcp_expose: bool, tool_name: Option, mutation: bool, params: Vec, } #[derive(serde::Serialize)] struct QueriesListOutput { queries: Vec, } /// Resolve the selected graph to `(local URI, registry selection)` from one /// precedence, so a command's schema and its stored-query registry can never /// come from different graphs. A **positional URI is anonymous** (top-level /// registry, ignoring the configured default graph); otherwise `--target` /// or the configured `cli.graph` names the graph (its per-graph block). /// Mirrors the server's single-mode identity rule. fn resolve_selected_graph( config: &OmnigraphConfig, cli_uri: Option, cli_target: Option<&str>, operation: &str, ) -> Result<(String, Option)> { let graph = resolve_local_graph(config, cli_uri, cli_target, operation)?; Ok((graph.uri, graph.selected)) } /// Load the stored-query registry for an already-resolved graph selection /// (`None` = anonymous → top-level; `Some(name)` = that graph's block). fn load_registry_or_report( config: &OmnigraphConfig, selected: Option<&str>, ) -> Result { QueryRegistry::load(config, config.query_entries_for(selected)).map_err(|errors| { color_eyre::eyre::eyre!( "stored-query registry failed to load:\n {}", errors .iter() .map(|e| e.to_string()) .collect::>() .join("\n ") ) }) } fn graph_query_registry_names(config: &OmnigraphConfig) -> Vec<&str> { config .graphs .iter() .filter_map(|(name, graph)| (!graph.queries.is_empty()).then_some(name.as_str())) .collect() } fn resolve_registry_selection_for_list( config: &OmnigraphConfig, target: Option<&str>, ) -> Result> { let selected = target .map(str::to_string) .or_else(|| config.cli_graph_name().map(str::to_string)); if let Some(name) = selected.as_deref() { config.resolve_graph_selection(Some(name))?; return Ok(selected); } if !config.query_entries().is_empty() { return Ok(None); } let graph_names = graph_query_registry_names(config); if graph_names.is_empty() { return Ok(None); } bail!( "stored-query registries are configured for graph{} {} but no graph was selected. Pass `--target {}` or set `cli.graph`.", if graph_names.len() == 1 { "" } else { "s" }, graph_names.join(", "), graph_names[0], ) } fn validate_registry_for_catalog( registry: &QueryRegistry, catalog: &omnigraph_compiler::catalog::Catalog, label: &str, ) -> omnigraph::error::Result<()> { let report = check(registry, catalog); if report.has_breakages() { return Err(omnigraph::error::OmniError::manifest( format_check_breakages(label, &report), )); } Ok(()) } async fn execute_queries_validate( uri: Option, target: Option, config_path: Option<&PathBuf>, json: bool, ) -> Result<()> { let config = load_cli_config(config_path)?; // One selection drives both the schema URI and the registry, so a // positional URI and a `--target` can't validate different graphs. let (uri, selected) = resolve_selected_graph(&config, uri, target.as_deref(), "queries validate")?; let registry = load_registry_or_report(&config, selected.as_deref())?; let db = Omnigraph::open(&uri).await?; let report = check(®istry, &db.catalog()); let output = QueriesValidateOutput { ok: !report.has_breakages(), breakages: report .breakages .iter() .map(|b| QueriesIssue { query: b.query.clone(), message: b.message.clone(), }) .collect(), warnings: report .warnings .iter() .map(|w| QueriesIssue { query: w.query.clone(), message: w.message.clone(), }) .collect(), }; if json { print_json(&output)?; } else { if output.breakages.is_empty() { println!( "OK {} stored quer{} type-check against the schema", registry.len(), if registry.len() == 1 { "y" } else { "ies" } ); } for issue in &output.breakages { println!("ERROR query '{}': {}", issue.query, issue.message); } for issue in &output.warnings { println!("WARN query '{}': {}", issue.query, issue.message); } } if report.has_breakages() { io::stdout().flush()?; std::process::exit(1); } Ok(()) } fn execute_queries_list( target: Option, config_path: Option<&PathBuf>, json: bool, ) -> Result<()> { let config = load_cli_config(config_path)?; let selected = resolve_registry_selection_for_list(&config, target.as_deref())?; let registry = load_registry_or_report(&config, selected.as_deref())?; let output = QueriesListOutput { queries: registry .iter() .map(|q| QueriesListItem { name: q.name.clone(), mcp_expose: q.expose, tool_name: q.tool_name.clone(), mutation: q.is_mutation(), params: q .decl .params .iter() .map(|p| QueriesParam { name: p.name.clone(), type_name: p.type_name.clone(), nullable: p.nullable, }) .collect(), }) .collect(), }; if json { print_json(&output)?; } else if output.queries.is_empty() { println!("(no stored queries registered)"); } else { for q in &output.queries { let kind = if q.mutation { "mutation" } else { "read" }; let params = q .params .iter() .map(|p| { format!( "${}: {}{}", p.name, p.type_name, if p.nullable { "?" } else { "" } ) }) .collect::>() .join(", "); let mcp = if q.mcp_expose { format!(" [mcp: {}]", q.tool_name.as_deref().unwrap_or(&q.name)) } else { String::new() }; println!("{kind} {}({params}){mcp}", q.name); } } Ok(()) } async fn execute_read( uri: &str, query_source: &str, query_name: Option<&str>, target: ReadTarget, params_json: Option<&Value>, ) -> Result { let (selected_name, query_params) = select_named_query(query_source, query_name)?; let params = query_params_from_json(&query_params, params_json)?; let db = Omnigraph::open(uri).await?; let result = db .query(target.clone(), query_source, &selected_name, ¶ms) .await?; Ok(read_output(selected_name, &target, result)) } async fn execute_read_remote( client: &reqwest::Client, uri: &str, query_source: &str, query_name: Option<&str>, target: ReadTarget, params_json: Option<&Value>, bearer_token: Option<&str>, ) -> Result { let (branch, snapshot) = match &target { ReadTarget::Branch(branch) => (Some(branch.clone()), None), ReadTarget::Snapshot(snapshot) => (None, Some(snapshot.as_str().to_string())), }; remote_json( client, Method::POST, remote_url(uri, "/read"), Some(serde_json::to_value(ReadRequest { query_source: query_source.to_string(), query_name: query_name.map(ToOwned::to_owned), params: params_json.cloned(), branch, snapshot, })?), bearer_token, ) .await } async fn execute_change( graph: &ResolvedCliGraph, query_source: &str, query_name: Option<&str>, branch: &str, params_json: Option<&Value>, config: &OmnigraphConfig, cli_as_actor: Option<&str>, ) -> Result { let (selected_name, query_params) = select_named_query(query_source, query_name)?; let params = query_params_from_json(&query_params, params_json)?; let db = open_local_db_with_policy(graph).await?; let actor = resolve_cli_actor(cli_as_actor, config); let result = db .mutate_as(branch, query_source, &selected_name, ¶ms, actor) .await?; Ok(ChangeOutput { branch: branch.to_string(), query_name: selected_name, affected_nodes: result.affected_nodes, affected_edges: result.affected_edges, actor_id: actor.map(String::from), }) } /// Build the JSON body for `POST /change` using the legacy wire shape. /// /// `ChangeRequest`'s Rust field names are now `query` / `name` (the canonical /// wire shape going forward), but old `omnigraph-server` builds still require /// the legacy `query_source` / `query_name` keys on `/change`. Hand-rolling /// the JSON with the legacy names keeps a newer CLI talking to an older /// server intact -- the same byte-stability contract we apply to /// `execute_read_remote` against `/read`. fn legacy_change_request_body( query_source: &str, query_name: Option<&str>, branch: &str, params_json: Option<&Value>, ) -> Value { let mut body = serde_json::json!({ "query_source": query_source, "branch": branch, }); if let Some(name) = query_name { body["query_name"] = Value::String(name.to_string()); } if let Some(params) = params_json { body["params"] = params.clone(); } body } async fn execute_change_remote( client: &reqwest::Client, uri: &str, query_source: &str, query_name: Option<&str>, branch: &str, params_json: Option<&Value>, bearer_token: Option<&str>, ) -> Result { remote_json( client, Method::POST, remote_url(uri, "/change"), Some(legacy_change_request_body( query_source, query_name, branch, params_json, )), bearer_token, ) .await } async fn execute_export_to_writer( uri: &str, branch: &str, type_names: &[String], table_keys: &[String], writer: &mut W, ) -> Result<()> { let db = Omnigraph::open(uri).await?; db.export_jsonl_to_writer(branch, type_names, table_keys, writer) .await?; writer.flush()?; Ok(()) } async fn execute_export_remote_to_writer( client: &reqwest::Client, uri: &str, branch: &str, type_names: &[String], table_keys: &[String], bearer_token: Option<&str>, writer: &mut W, ) -> Result<()> { let request = apply_bearer_token( client.request(Method::POST, remote_url(uri, "/export")), bearer_token, ) .json(&ExportRequest { branch: Some(branch.to_string()), type_names: type_names.to_vec(), table_keys: table_keys.to_vec(), }); let mut response = request.send().await?; let status = response.status(); if !status.is_success() { let text = response.text().await?; if let Ok(error) = serde_json::from_str::(&text) { bail!(error.error); } bail!("server returned {}: {}", status, text); } while let Some(chunk) = response.chunk().await? { writer.write_all(&chunk)?; } writer.flush()?; Ok(()) } /// Rewrite deprecated CLI invocations into their canonical form. /// /// The current rename pass moves four subcommands: /// - `omnigraph read` -> `omnigraph query` (clap `visible_alias` handles parsing; we warn) /// - `omnigraph change` -> `omnigraph mutate` (clap `visible_alias` handles parsing; we warn) /// - `omnigraph check` -> `omnigraph lint` (rewrite required; no visible_alias by design) /// - `omnigraph query lint` -> `omnigraph lint` (rewrite required; `query` is now the read-runner) /// - `omnigraph query check` -> `omnigraph lint` (rewrite required) /// /// `check` is *not* a clap visible_alias on `lint` even though they're /// semantically equivalent. Visible aliases create two canonical names /// that agents emit interchangeably depending on training-data drift /// (see MR-981 §6 for the policy). The argv-shim + stderr warning /// pattern preserves back-compat for human users while pointing every /// caller at the single canonical name in `--help`. /// /// Returns the (possibly rewritten) argv that clap should parse. fn rewrite_deprecated_argv(args: Vec) -> Vec { if args.len() >= 3 { let sub = args[1].to_str(); let sub2 = args[2].to_str(); if sub == Some("query") && matches!(sub2, Some("lint") | Some("check")) { let suffix = sub2.unwrap(); eprintln!( "warning: `omnigraph query {suffix}` is deprecated; use `omnigraph lint` instead" ); // Drop the leading `query` token AND normalize `check` -> `lint`. // `check` is no longer a clap visible_alias (MR-981 §6), so the // rewritten argv must reach the canonical `lint` subcommand // directly. Result for `omnigraph query check --query foo.gq`: // `omnigraph lint --query foo.gq`. let mut out = Vec::with_capacity(args.len() - 1); out.push(args[0].clone()); out.push(OsString::from("lint")); out.extend(args[3..].iter().cloned()); return out; } } if let Some(sub) = args.get(1).and_then(|s| s.to_str()) { match sub { "read" => { eprintln!("warning: `omnigraph read` is deprecated; use `omnigraph query` instead") } "change" => eprintln!( "warning: `omnigraph change` is deprecated; use `omnigraph mutate` instead" ), "check" => { eprintln!("warning: `omnigraph check` is deprecated; use `omnigraph lint` instead"); // Rewrite the top-level subcommand to `lint`; pass through the rest. let mut out = Vec::with_capacity(args.len()); out.push(args[0].clone()); out.push(OsString::from("lint")); out.extend(args[2..].iter().cloned()); return out; } _ => {} } } args } #[tokio::main] async fn main() -> Result<()> { color_eyre::install()?; let cli = { let raw_args = rewrite_deprecated_argv(std::env::args_os().collect()); let matches = Cli::command() .arg( Arg::new("version") .short('v') .long("version") .action(ArgAction::Version) .help("Print version"), ) .get_matches_from(raw_args); Cli::from_arg_matches(&matches)? }; let http_client = build_http_client()?; match cli.command { Command::Version => { println!("omnigraph {}", env!("CARGO_PKG_VERSION")); } Command::Embed(args) => { let output = execute_embed(&args).await?; if args.json { print_json(&output)?; } else { print_embed_human(&output); } } Command::Init { schema, uri, force } => { let schema_source = fs::read_to_string(&schema)?; ensure_local_graph_parent(&uri)?; Omnigraph::init_with_options( &uri, &schema_source, omnigraph::db::InitOptions { force }, ) .await?; scaffold_config_if_missing(&uri)?; println!("initialized {}", uri); } Command::Load { uri, target, config, data, branch, mode, json, } => { let config = load_cli_config(config.as_ref())?; let graph = resolve_local_graph(&config, uri, target.as_deref(), "load")?; let uri = graph.uri.clone(); let branch = resolve_branch(&config, branch, None, "main"); let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let result = db .load_file_as(&branch, &data.to_string_lossy(), mode.into(), actor) .await?; let payload = LoadOutput { uri: &uri, branch: &branch, mode: mode.as_str(), nodes_loaded: result.nodes_loaded.values().sum(), edges_loaded: result.edges_loaded.values().sum(), node_types_loaded: result.nodes_loaded.len(), edge_types_loaded: result.edges_loaded.len(), }; if json { print_json(&payload)?; } else { print_load_human( &uri, &branch, mode, payload.nodes_loaded, payload.edges_loaded, payload.node_types_loaded, payload.edge_types_loaded, ); } } Command::Ingest { uri, target, config, data, branch, from, mode, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let graph = resolve_cli_graph(&config, uri, target.as_deref())?; let uri = graph.uri.clone(); let branch = resolve_branch(&config, branch, None, "main"); let from = resolve_branch(&config, from, None, "main"); let payload = if graph.is_remote { let data = fs::read_to_string(&data)?; remote_json::( &http_client, Method::POST, remote_url(&uri, "/ingest"), Some(serde_json::to_value(IngestRequest { branch: Some(branch.clone()), from: Some(from.clone()), mode: Some(mode.into()), data, })?), bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let result = db .ingest_file_as( &branch, Some(&from), &data.to_string_lossy(), mode.into(), actor, ) .await?; ingest_output(&uri, &result, None) }; if json { print_json(&payload)?; } else { print_ingest_human(&payload); } } Command::Branch { command } => match command { BranchCommand::Create { uri, target, config, from, name, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let graph = resolve_cli_graph(&config, uri, target.as_deref())?; let uri = graph.uri.clone(); let from = resolve_branch(&config, from, None, "main"); let payload = if graph.is_remote { remote_json::( &http_client, Method::POST, remote_url(&uri, "/branches"), Some(serde_json::to_value(BranchCreateRequest { from: Some(from.clone()), name: name.clone(), })?), bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); db.branch_create_from_as(ReadTarget::branch(&from), &name, actor) .await?; BranchCreateOutput { uri: uri.clone(), from: from.clone(), name: name.clone(), actor_id: actor.map(String::from), } }; if json { print_json(&payload)?; } else { println!("created branch {} from {}", payload.name, payload.from); } } BranchCommand::List { uri, target, config, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let graph = resolve_cli_graph(&config, uri, target.as_deref())?; let uri = graph.uri.clone(); let payload = if graph.is_remote { remote_json::( &http_client, Method::GET, remote_url(&uri, "/branches"), None, bearer_token.as_deref(), ) .await? } else { let db = Omnigraph::open(&uri).await?; let mut branches = db.branch_list().await?; branches.sort(); BranchListOutput { branches } }; if json { print_json(&payload)?; } else { for branch in payload.branches { println!("{}", branch); } } } BranchCommand::Delete { uri, target, config, name, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let graph = resolve_cli_graph(&config, uri, target.as_deref())?; let uri = graph.uri.clone(); let payload = if graph.is_remote { remote_json::( &http_client, Method::DELETE, remote_branch_url(&uri, &name)?, None, bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); db.branch_delete_as(&name, actor).await?; BranchDeleteOutput { uri: uri.clone(), name: name.clone(), actor_id: actor.map(String::from), } }; if json { print_json(&payload)?; } else { println!("deleted branch {}", payload.name); } } BranchCommand::Merge { uri, target, config, source, into, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let graph = resolve_cli_graph(&config, uri, target.as_deref())?; let uri = graph.uri.clone(); let into = resolve_branch(&config, into, None, "main"); let payload = if graph.is_remote { remote_json::( &http_client, Method::POST, remote_url(&uri, "/branches/merge"), Some(serde_json::to_value(BranchMergeRequest { source: source.clone(), target: Some(into.clone()), })?), bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let outcome = db.branch_merge_as(&source, &into, actor).await?; BranchMergeOutput { source: source.clone(), target: into.clone(), outcome: outcome.into(), actor_id: actor.map(String::from), } }; if json { print_json(&payload)?; } else { println!( "merged {} into {}: {}", payload.source, payload.target, payload.outcome.as_str() ); } } }, Command::Commit { command } => match command { CommitCommand::List { uri, target, config, branch, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let commits = if is_remote_uri(&uri) { remote_json::( &http_client, Method::GET, if let Some(branch) = branch.as_deref() { format!("{}?branch={}", remote_url(&uri, "/commits"), branch) } else { remote_url(&uri, "/commits") }, None, bearer_token.as_deref(), ) .await? .commits } else { let db = Omnigraph::open(&uri).await?; db.list_commits(branch.as_deref()) .await? .iter() .map(commit_output) .collect::>() }; if json { print_json(&CommitListOutput { commits })?; } else { print_commit_list_human(&commits); } } CommitCommand::Show { uri, target, config, commit_id, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let commit = if is_remote_uri(&uri) { remote_json::( &http_client, Method::GET, remote_url(&uri, &format!("/commits/{}", commit_id)), None, bearer_token.as_deref(), ) .await? } else { let db = Omnigraph::open(&uri).await?; commit_output(&db.get_commit(&commit_id).await?) }; if json { print_json(&commit)?; } else { print_commit_human(&commit); } } }, Command::Schema { command } => match command { SchemaCommand::Plan { uri, target, config, schema, json, allow_data_loss, } => { let config = load_cli_config(config.as_ref())?; let uri = resolve_local_uri(&config, uri, target.as_deref(), "schema plan")?; let schema_source = fs::read_to_string(&schema)?; let db = Omnigraph::open(&uri).await?; let plan = db .plan_schema_with_options( &schema_source, omnigraph::db::SchemaApplyOptions { allow_data_loss }, ) .await?; let output = SchemaPlanOutput { uri: &uri, supported: plan.supported, step_count: plan.steps.len(), steps: &plan.steps, }; if json { print_json(&output)?; } else { print_schema_plan_human(&uri, &plan); } } SchemaCommand::Apply { uri, target, config, schema, json, allow_data_loss, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let graph = resolve_cli_graph(&config, uri, target.as_deref())?; let uri = graph.uri.clone(); let schema_source = fs::read_to_string(&schema)?; let output = if graph.is_remote { // MR-694 PR B: SchemaApplyRequest gained an // allow_data_loss field so Hard-mode drops are no // longer CLI-only. The previous bail is gone; the // field is forwarded into the JSON payload, and // the server's `server_schema_apply` honors it. remote_json::( &http_client, Method::POST, remote_url(&uri, "/schema/apply"), Some(serde_json::to_value(SchemaApplyRequest { schema_source: schema_source.clone(), allow_data_loss, })?), bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let registry = load_registry_or_report(&config, graph.selected())?; let registry = (!registry.is_empty()).then_some(registry); let label = graph.selected().unwrap_or(&uri).to_string(); let result = db .apply_schema_as_with_catalog_check( &schema_source, omnigraph::db::SchemaApplyOptions { allow_data_loss }, actor, |catalog| { if let Some(registry) = registry.as_ref() { validate_registry_for_catalog(registry, catalog, &label)?; } Ok(()) }, ) .await?; schema_apply_output(&uri, result) }; if json { print_json(&output)?; } else { print_schema_apply_human(&output); } } SchemaCommand::Show { uri, target, config, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let output = if is_remote_uri(&uri) { remote_json::( &http_client, Method::GET, remote_url(&uri, "/schema"), None, bearer_token.as_deref(), ) .await? } else { let db = Omnigraph::open(&uri).await?; SchemaOutput { schema_source: db.schema_source().to_string(), } }; if json { print_json(&output)?; } else { println!("{}", output.schema_source); } } }, Command::Lint { uri, target, config, query, schema, json, } => { let config = load_cli_config(config.as_ref())?; let output = execute_query_lint(&config, uri, target.as_deref(), schema.as_ref(), &query) .await?; finish_query_lint(&output, json)?; } Command::Queries { command } => match command { QueriesCommand::Validate { uri, target, config, json, } => { execute_queries_validate(uri, target, config.as_ref(), json).await?; } QueriesCommand::List { target, config, json, } => { execute_queries_list(target, config.as_ref(), json)?; } }, Command::Snapshot { uri, target, config, branch, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let branch = resolve_branch(&config, branch, None, "main"); let payload = if is_remote_uri(&uri) { remote_json::( &http_client, Method::GET, format!("{}?branch={}", remote_url(&uri, "/snapshot"), branch), None, bearer_token.as_deref(), ) .await? } else { let db = Omnigraph::open(&uri).await?; let snapshot = db.snapshot_of(ReadTarget::branch(branch.as_str())).await?; snapshot_payload(&branch, &snapshot) }; if json { print_json(&payload)?; } else { print_snapshot_human(&payload.branch, payload.manifest_version, &payload.tables); } } Command::Export { uri, target, config, branch, jsonl, type_names, table_keys, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let branch = resolve_branch(&config, branch, None, "main"); if jsonl { eprintln!("warning: --jsonl is deprecated; `omnigraph export` always emits JSONL"); } let stdout = io::stdout(); let mut stdout = stdout.lock(); if is_remote_uri(&uri) { execute_export_remote_to_writer( &http_client, &uri, &branch, &type_names, &table_keys, bearer_token.as_deref(), &mut stdout, ) .await?; } else { execute_export_to_writer(&uri, &branch, &type_names, &table_keys, &mut stdout) .await?; } } Command::Query { uri, legacy_uri, target, config, alias, query, query_string, name, params, branch, snapshot, format, json, alias_args, } => { if alias.is_none() && query.is_none() && query_string.is_none() { bail!("exactly one of --query, --query-string, or --alias must be provided"); } let config = load_cli_config(config.as_ref())?; let alias = resolve_alias(&config, alias.as_deref(), AliasCommand::Read)?; let alias_name = alias.as_ref().map(|(name, _)| *name); let alias_config = alias.as_ref().map(|(_, alias)| *alias); let target_available = target.is_some() || alias_config .and_then(|alias| alias.graph.as_deref()) .is_some() || config.cli_graph_name().is_some(); let (legacy_uri, alias_args) = normalize_legacy_alias_uri(legacy_uri, target_available, alias_name, alias_args); let uri = uri.or(legacy_uri); let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.graph.as_deref())); let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?; let graph = resolve_cli_graph(&config, uri, target_name)?; let uri = graph.uri.clone(); let query_source = resolve_query_source( &config, query.as_ref(), query_string.as_deref(), alias_config.map(|a| a.query.as_str()), )?; let params_json = merged_params_json( alias_name, alias_config .map(|alias| alias.args.as_slice()) .unwrap_or(&[]), &alias_args, load_params_json(¶ms)?, )?; let target = resolve_read_target( &config, branch, snapshot, alias_config.and_then(|alias| alias.branch.clone()), )?; let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone())); let output = if graph.is_remote { execute_read_remote( &http_client, &uri, &query_source, query_name.as_deref(), target, params_json.as_ref(), bearer_token.as_deref(), ) .await? } else { execute_read( &uri, &query_source, query_name.as_deref(), target, params_json.as_ref(), ) .await? }; let format = resolve_read_format( &config, format, json, alias_config.and_then(|alias| alias.format), ); print_read_output(&output, format, &config)?; } Command::Mutate { uri, legacy_uri, target, config, alias, query, query_string, name, params, branch, json, alias_args, } => { if alias.is_none() && query.is_none() && query_string.is_none() { bail!("exactly one of --query, --query-string, or --alias must be provided"); } let config = load_cli_config(config.as_ref())?; let alias = resolve_alias(&config, alias.as_deref(), AliasCommand::Change)?; let alias_name = alias.as_ref().map(|(name, _)| *name); let alias_config = alias.as_ref().map(|(_, alias)| *alias); let target_available = target.is_some() || alias_config .and_then(|alias| alias.graph.as_deref()) .is_some() || config.cli_graph_name().is_some(); let (legacy_uri, alias_args) = normalize_legacy_alias_uri(legacy_uri, target_available, alias_name, alias_args); let uri = uri.or(legacy_uri); let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.graph.as_deref())); let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?; let graph = resolve_cli_graph(&config, uri, target_name)?; let uri = graph.uri.clone(); let query_source = resolve_query_source( &config, query.as_ref(), query_string.as_deref(), alias_config.map(|a| a.query.as_str()), )?; let params_json = merged_params_json( alias_name, alias_config .map(|alias| alias.args.as_slice()) .unwrap_or(&[]), &alias_args, load_params_json(¶ms)?, )?; let branch = resolve_branch( &config, branch, alias_config.and_then(|alias| alias.branch.clone()), "main", ); let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone())); let output = if graph.is_remote { execute_change_remote( &http_client, &uri, &query_source, query_name.as_deref(), &branch, params_json.as_ref(), bearer_token.as_deref(), ) .await? } else { execute_change( &graph, &query_source, query_name.as_deref(), &branch, params_json.as_ref(), &config, cli.as_actor.as_deref(), ) .await? }; if json { print_json(&output)?; } else { print_change_human(&output); } } Command::Policy { command } => match command { PolicyCommand::Validate { config } => { let config = load_cli_config(config.as_ref())?; let context = resolve_policy_context(&config)?; let engine = resolve_policy_engine(&context)?; println!( "policy valid: {} [{} actors]", context.policy_file.display(), engine.known_actor_count() ); } PolicyCommand::Test { config } => { let config = load_cli_config(config.as_ref())?; let context = resolve_policy_context(&config)?; let engine = resolve_policy_engine(&context)?; let tests_path = resolve_policy_tests_path(&context); let tests = PolicyTestConfig::load(&tests_path)?; engine.run_tests(&tests)?; println!("policy tests passed: {} cases", tests.cases.len()); } PolicyCommand::Explain { config, actor, action, branch, target_branch, } => { let config = load_cli_config(config.as_ref())?; let context = resolve_policy_context(&config)?; let engine = resolve_policy_engine(&context)?; let request = PolicyRequest { action, branch, target_branch, }; let decision = engine.authorize(&actor, &request)?; print_policy_explain(&decision, &actor, &request); } }, Command::Optimize { uri, target, config, json, } => { let config = load_cli_config(config.as_ref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let db = Omnigraph::open(&uri).await?; let stats = db.optimize().await?; if json { let value = serde_json::json!({ "uri": uri, "tables": stats.iter().map(|s| serde_json::json!({ "table_key": s.table_key, "fragments_removed": s.fragments_removed, "fragments_added": s.fragments_added, "committed": s.committed, "skipped": s.skipped.map(|r| r.as_str()), "manifest_version": s.manifest_version, "lance_head_version": s.lance_head_version, })).collect::>(), }); print_json(&value)?; } else { println!("optimize {} — {} tables", uri, stats.len()); for s in &stats { if let Some(reason) = s.skipped { println!(" {:<40} skipped ({reason})", s.table_key); } else if s.committed { println!( " {:<40} frags {} → {} ✓", s.table_key, s.fragments_removed, s.fragments_added ); } else { println!(" {:<40} no-op", s.table_key); } } } } Command::Repair { uri, target, config, confirm, force, json, } => { let config = load_cli_config(config.as_ref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let db = Omnigraph::open(&uri).await?; let stats = db .repair(omnigraph::db::RepairOptions { confirm, force }) .await?; let refused_count = stats .tables .iter() .filter(|s| matches!(s.action, omnigraph::db::RepairAction::Refused)) .count(); if json { let value = serde_json::json!({ "uri": uri, "confirm": confirm, "force": force, "manifest_version": stats.manifest_version, "tables": stats.tables.iter().map(|s| serde_json::json!({ "table_key": s.table_key, "manifest_version": s.manifest_version, "lance_head_version": s.lance_head_version, "classification": s.classification.as_str(), "action": s.action.as_str(), "operations": s.operations, "error": s.error, })).collect::>(), }); print_json(&value)?; } else { let mode = if confirm { "confirm" } else { "preview" }; println!( "repair {} — {} mode, {} tables", uri, mode, stats.tables.len() ); for s in &stats.tables { let drift = if s.manifest_version == s.lance_head_version { format!("{}", s.manifest_version) } else { format!("{} → {}", s.manifest_version, s.lance_head_version) }; let ops = if s.operations.is_empty() { String::new() } else { format!(" [{}]", s.operations.join(", ")) }; let err = s .error .as_ref() .map(|err| format!(" ({err})")) .unwrap_or_default(); println!( " {:<40} {:<12} {:<22} {}{}{}", s.table_key, s.action.as_str(), s.classification.as_str(), drift, ops, err ); } if !confirm { println!("rerun with --confirm to publish verified maintenance drift"); } } if refused_count > 0 { bail!( "repair refused {} suspicious or unverifiable table(s); review the preview \ output and rerun with --force --confirm only if publishing that drift is \ intentional", refused_count ); } } Command::Cleanup { uri, target, config, keep, older_than, confirm, json, } => { let config = load_cli_config(config.as_ref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let older_than_dur = older_than.as_deref().map(parse_duration_arg).transpose()?; if keep.is_none() && older_than_dur.is_none() { bail!("cleanup requires at least one of --keep or --older-than"); } let policy_desc = match (keep, older_than_dur) { (Some(k), Some(d)) => { format!("keep {} versions, remove anything older than {:?}", k, d) } (Some(k), None) => format!("keep {} versions", k), (None, Some(d)) => format!("remove anything older than {:?}", d), _ => unreachable!(), }; if !confirm { eprintln!( "cleanup is destructive — rerun with --confirm. Policy for {}: {}", uri, policy_desc ); return Ok(()); } let options = omnigraph::db::CleanupPolicyOptions { keep_versions: keep, older_than: older_than_dur, }; let mut db = Omnigraph::open(&uri).await?; let stats = db.cleanup(options).await?; if json { let value = serde_json::json!({ "uri": uri, "keep_versions": keep, "older_than_secs": older_than_dur.map(|d| d.as_secs()), "tables": stats.iter().map(|s| serde_json::json!({ "table_key": s.table_key, "bytes_removed": s.bytes_removed, "old_versions_removed": s.old_versions_removed, "error": s.error, })).collect::>(), }); print_json(&value)?; } else { let total_bytes: u64 = stats.iter().map(|s| s.bytes_removed).sum(); let total_versions: u64 = stats.iter().map(|s| s.old_versions_removed).sum(); let failed: Vec<&str> = stats .iter() .filter(|s| s.error.is_some()) .map(|s| s.table_key.as_str()) .collect(); println!( "cleanup {} ({}) — removed {} versions ({} bytes) across {} tables", uri, policy_desc, total_versions, total_bytes, stats.len() - failed.len() ); if !failed.is_empty() { println!( " {} table(s) failed and will be retried on the next cleanup: {}", failed.len(), failed.join(", ") ); } } } Command::Cluster { command } => match command { ClusterCommand::Validate { config, json } => { let output = validate_config_dir(config); finish_cluster_validate(&output, json)?; } ClusterCommand::Plan { config, json } => { let output = plan_config_dir(config); finish_cluster_plan(&output, json)?; } ClusterCommand::Apply { config, json } => { let output = apply_config_dir(config); finish_cluster_apply(&output, json)?; } ClusterCommand::Status { config, json } => { let output = status_config_dir(config); finish_cluster_status(&output, json)?; } ClusterCommand::Refresh { config, json } => { let output = refresh_config_dir(config).await; finish_cluster_state_sync(&output, json)?; } ClusterCommand::Import { config, json } => { let output = import_config_dir(config).await; finish_cluster_state_sync(&output, json)?; } ClusterCommand::ForceUnlock { lock_id, config, json, } => { let output = force_unlock_config_dir(config, lock_id); finish_cluster_force_unlock(&output, json)?; } }, Command::Graphs { command } => match command { GraphsCommand::List { uri, target, config, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; if !is_remote_uri(&uri) { bail!( "`omnigraph graphs list` requires a remote multi-graph server URL \ (http:// or https://). To enumerate local graphs, read `omnigraph.yaml` \ directly." ); } let payload = remote_json::( &http_client, Method::GET, remote_url(&uri, "/graphs"), None, bearer_token.as_deref(), ) .await?; if json { print_json(&payload)?; } else { for entry in payload.graphs { println!("{}\t{}", entry.graph_id, entry.uri); } } } }, } Ok(()) } #[cfg(test)] mod tests { use std::fs; use super::{ DEFAULT_BEARER_TOKEN_ENV, apply_bearer_token, bearer_token_from_env_file, legacy_change_request_body, load_cli_config, load_env_file_into_process, normalize_bearer_token, parse_env_assignment, resolve_cli_graph, resolve_policy_context, resolve_remote_bearer_token, }; use omnigraph_server::load_config; use reqwest::header::AUTHORIZATION; use serde_json::json; use tempfile::tempdir; #[test] fn legacy_change_request_body_uses_legacy_field_names() { // `execute_change_remote` hits `POST /change`, which old // `omnigraph-server` builds deserialize as `ChangeRequest` with // **required** `query_source` and optional `query_name` keys. // Newer servers accept both spellings via serde alias, but a // newer CLI must still emit the legacy keys on the wire so it // can talk to an old server during a rolling upgrade. let body = legacy_change_request_body( "query insert_person($n: String) { insert Person { name: $n } }", Some("insert_person"), "main", Some(&json!({ "n": "Alice" })), ); assert_eq!( body["query_source"].as_str(), Some("query insert_person($n: String) { insert Person { name: $n } }"), ); assert_eq!(body["query_name"].as_str(), Some("insert_person")); assert_eq!(body["branch"].as_str(), Some("main")); assert_eq!(body["params"]["n"].as_str(), Some("Alice")); // Crucially, the **new** field names must NOT appear -- old // servers would silently treat them as unknown fields and then // fail on missing required `query_source`. assert!( body.get("query").is_none(), "legacy /change body must not carry the renamed `query` key; got {body}" ); assert!( body.get("name").is_none(), "legacy /change body must not carry the renamed `name` key; got {body}" ); } #[test] fn legacy_change_request_body_omits_optional_fields_when_unset() { let body = legacy_change_request_body( "query find() { match { $p: Person } return { $p.name } }", None, "main", None, ); assert_eq!(body["branch"].as_str(), Some("main")); assert!(body.get("query_name").is_none()); assert!(body.get("params").is_none()); } #[test] fn apply_bearer_token_adds_header_when_configured() { let client = reqwest::Client::new(); let request = apply_bearer_token(client.get("http://example.com"), Some("demo-token")) .build() .unwrap(); assert_eq!( request .headers() .get(AUTHORIZATION) .and_then(|value| value.to_str().ok()), Some("Bearer demo-token") ); } #[test] fn apply_bearer_token_leaves_request_unchanged_when_not_configured() { let client = reqwest::Client::new(); let request = apply_bearer_token(client.get("http://example.com"), None) .build() .unwrap(); assert!(request.headers().get(AUTHORIZATION).is_none()); } #[test] fn normalize_bearer_token_trims_and_filters_blank_values() { assert_eq!(normalize_bearer_token(None), None); assert_eq!(normalize_bearer_token(Some(" ".to_string())), None); assert_eq!( normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(), Some("demo-token") ); } #[test] fn parse_env_assignment_supports_plain_and_exported_values() { assert_eq!( parse_env_assignment("DEMO_TOKEN=demo-token"), Some(("DEMO_TOKEN".to_string(), "demo-token".to_string())) ); assert_eq!( parse_env_assignment("export DEMO_TOKEN=\"quoted-token\""), Some(("DEMO_TOKEN".to_string(), "quoted-token".to_string())) ); assert_eq!(parse_env_assignment("# comment"), None); assert_eq!(parse_env_assignment(" "), None); } #[test] fn bearer_token_from_env_file_reads_named_value() { let temp = tempdir().unwrap(); let env_file = temp.path().join(".env.omni"); fs::write( &env_file, "FIRST=ignore\nexport DEMO_TOKEN=\" demo-token \"\n", ) .unwrap(); assert_eq!( bearer_token_from_env_file(&env_file, "DEMO_TOKEN") .unwrap() .as_deref(), Some("demo-token") ); assert_eq!( bearer_token_from_env_file(&env_file, "MISSING").unwrap(), None ); } #[test] fn load_env_file_into_process_sets_missing_values_without_overriding_existing_ones() { let temp = tempdir().unwrap(); let env_file = temp.path().join(".env.omni"); fs::write( &env_file, "AUTOLOAD_ONLY=from-file\nAUTOLOAD_PRESET=from-file\n", ) .unwrap(); let missing_key = "AUTOLOAD_ONLY"; let preset_key = "AUTOLOAD_PRESET"; let previous_missing = std::env::var_os(missing_key); let previous_preset = std::env::var_os(preset_key); unsafe { std::env::remove_var(missing_key); std::env::set_var(preset_key, "from-env"); } load_env_file_into_process(&env_file).unwrap(); assert_eq!(std::env::var(missing_key).unwrap(), "from-file"); assert_eq!(std::env::var(preset_key).unwrap(), "from-env"); unsafe { if let Some(value) = previous_missing { std::env::set_var(missing_key, value); } else { std::env::remove_var(missing_key); } if let Some(value) = previous_preset { std::env::set_var(preset_key, value); } else { std::env::remove_var(preset_key); } } } #[test] fn resolve_remote_bearer_token_uses_scoped_env_file_with_global_fallback() { let temp = tempdir().unwrap(); fs::write( temp.path().join("omnigraph.yaml"), r#" graphs: demo: uri: https://example.com bearer_token_env: DEMO_TOKEN auth: env_file: .env.omni cli: graph: demo "#, ) .unwrap(); fs::write( temp.path().join(".env.omni"), "DEMO_TOKEN=scoped-token\nOMNIGRAPH_BEARER_TOKEN=global-token\n", ) .unwrap(); let previous = std::env::var_os(DEFAULT_BEARER_TOKEN_ENV); unsafe { std::env::remove_var(DEFAULT_BEARER_TOKEN_ENV); } let config_path = temp.path().join("omnigraph.yaml"); let config = load_config(Some(&config_path)).unwrap(); assert_eq!( resolve_remote_bearer_token(&config, None, Some("demo")) .unwrap() .as_deref(), Some("scoped-token") ); assert_eq!( resolve_remote_bearer_token(&config, Some("https://override.example.com"), None) .unwrap() .as_deref(), Some("global-token") ); unsafe { if let Some(value) = previous { std::env::set_var(DEFAULT_BEARER_TOKEN_ENV, value); } else { std::env::remove_var(DEFAULT_BEARER_TOKEN_ENV); } } } #[test] fn load_cli_config_autoloads_env_file_into_process() { let temp = tempdir().unwrap(); fs::write( temp.path().join("omnigraph.yaml"), r#" auth: env_file: .env.omni graphs: demo: uri: s3://bucket/prefix "#, ) .unwrap(); fs::write( temp.path().join(".env.omni"), "AUTOLOAD_FROM_CONFIG=loaded\n", ) .unwrap(); let key = "AUTOLOAD_FROM_CONFIG"; let previous = std::env::var_os(key); unsafe { std::env::remove_var(key); } let config_path = temp.path().join("omnigraph.yaml"); let config = load_cli_config(Some(&config_path)).unwrap(); assert_eq!( config.resolve_target_uri(None, Some("demo"), None).unwrap(), "s3://bucket/prefix" ); assert_eq!(std::env::var(key).unwrap(), "loaded"); unsafe { if let Some(value) = previous { std::env::set_var(key, value); } else { std::env::remove_var(key); } } } #[test] fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri() { let temp = tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( &config_path, r#" project: name: misleading-project graphs: local: uri: /tmp/local-policy-graph.omni policy: file: ./policy.yaml cli: graph: local "#, ) .unwrap(); let config = load_config(Some(&config_path)).unwrap(); let context = resolve_policy_context(&config).unwrap(); assert_eq!(context.graph_id, "local"); } #[test] fn graph_identity_resolve_policy_context_server_graph_uses_graph_key_when_cli_graph_absent() { let temp = tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( &config_path, r#" project: name: misleading-project graphs: local: uri: /tmp/local-policy-graph.omni policy: file: ./server-policy.yaml server: graph: local "#, ) .unwrap(); let config = load_config(Some(&config_path)).unwrap(); let context = resolve_policy_context(&config).unwrap(); assert_eq!(context.graph_id, "local"); assert!(context.policy_file.ends_with("server-policy.yaml")); } #[test] fn graph_identity_resolve_policy_context_anonymous_uses_top_level_default_identity() { let temp = tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( &config_path, r#" project: name: misleading-project graphs: local: uri: /tmp/local-policy-graph.omni policy: file: ./top-policy.yaml "#, ) .unwrap(); let config = load_config(Some(&config_path)).unwrap(); let context = resolve_policy_context(&config).unwrap(); assert_eq!(context.graph_id, "default"); assert!(context.policy_file.ends_with("top-policy.yaml")); } #[test] fn graph_identity_resolve_cli_graph_named_target_uses_graph_key_not_project_name_or_uri() { let temp = tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( &config_path, r#" project: name: misleading-project graphs: prod: uri: s3://bucket/prod-graph/ policy: file: ./prod-policy.yaml "#, ) .unwrap(); let config = load_config(Some(&config_path)).unwrap(); let graph = resolve_cli_graph(&config, None, Some("prod")).unwrap(); assert_eq!(graph.selected(), Some("prod")); assert_eq!(graph.graph_id, "prod"); assert_eq!(graph.uri, "s3://bucket/prod-graph/"); } #[test] fn graph_identity_resolve_cli_graph_positional_uri_uses_anonymous_normalized_uri() { let temp = tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write( &config_path, r#" project: name: misleading-project graphs: local: uri: /tmp/configured-graph.omni policy: file: ./policy.yaml cli: graph: local "#, ) .unwrap(); let config = load_config(Some(&config_path)).unwrap(); let local_graph_path = temp.path().join("explicit-graph.omni"); let local_graph = resolve_cli_graph( &config, Some(format!("file://{}", local_graph_path.display())), None, ) .unwrap(); assert_eq!(local_graph.selected(), None); assert_eq!( local_graph.graph_id, local_graph_path.to_string_lossy().as_ref() ); assert_eq!(local_graph.policy_file, None); let s3_graph = resolve_cli_graph( &config, Some("s3://bucket/anonymous-graph/".to_string()), None, ) .unwrap(); assert_eq!(s3_graph.selected(), None); assert_eq!(s3_graph.graph_id, "s3://bucket/anonymous-graph"); assert_eq!(s3_graph.policy_file, None); } }