use std::fs; use std::io::{self, Write}; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use clap::{Arg, ArgAction, Args, CommandFactory, FromArgMatches, Parser, Subcommand, ValueEnum}; use color_eyre::eyre::{Result, bail}; use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId}; use omnigraph::loader::LoadMode; use omnigraph_compiler::query::parser::parse_query; use omnigraph_compiler::schema::parser::parse_schema; use omnigraph_compiler::{ JsonParamMode, ParamMap, QueryLintOutput, QueryLintQueryKind, QueryLintSchemaSource, QueryLintSeverity, QueryLintStatus, SchemaMigrationPlan, SchemaMigrationStep, build_catalog, json_params_to_param_map, lint_query_file, }; use omnigraph_server::api::{ BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput, BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput, CommitOutput, ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, IngestRequest, ReadOutput, ReadRequest, SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotOutput, SnapshotTableOutput, commit_output, ingest_output, read_output, schema_apply_output, snapshot_payload, }; use omnigraph_server::{ AliasCommand, OmnigraphConfig, PolicyAction, PolicyDecision, PolicyEngine, PolicyRequest, PolicyTestConfig, ReadOutputFormat, load_config, }; use reqwest::Method; use reqwest::header::AUTHORIZATION; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; mod embed; mod read_format; use embed::{EmbedArgs, EmbedOutput, execute_embed}; use read_format::{ReadRenderOptions, render_read}; const DEFAULT_BEARER_TOKEN_ENV: &str = "OMNIGRAPH_BEARER_TOKEN"; #[derive(Debug, Parser)] #[command(name = "omnigraph")] #[command(about = "Omnigraph graph database CLI")] #[command(version = env!("CARGO_PKG_VERSION"), disable_version_flag = true)] struct Cli { /// Actor identity for direct-engine writes (MR-722). Overrides /// `cli.actor` from `omnigraph.yaml`. When the configured policy /// is in effect, Cedar evaluates this actor against the requested /// action and scope; with policy configured but neither this flag /// nor `cli.actor` set, the engine-layer footgun guard fires and /// the write is denied (no silent bypass). Has no effect on remote /// HTTP writes — those resolve their actor server-side from the /// bearer token. #[arg(long = "as", global = true, value_name = "ACTOR")] as_actor: Option, #[command(subcommand)] command: Command, } #[derive(Debug, Subcommand)] enum Command { /// Print the CLI version Version, /// Generate, clean, or refresh explicit seed embeddings Embed(EmbedArgs), /// Initialize a new graph from a schema Init { #[arg(long)] schema: PathBuf, /// Graph URI (local path or s3://) uri: String, /// Overwrite existing schema artifacts at the URI. Without /// this flag, init refuses to touch a URI that already holds /// `_schema.pg`, `_schema.ir.json`, or `__schema_state.json` /// — closes the re-init footgun (MR-668 follow-up). With the /// flag, the operator opts in to destructive semantics. #[arg(long)] force: bool, }, /// Load data into a graph Load { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] data: PathBuf, #[arg(long)] branch: Option, #[arg(long, default_value = "overwrite")] mode: CliLoadMode, #[arg(long)] json: bool, }, /// Ingest data into a reviewable named branch Ingest { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] data: PathBuf, #[arg(long)] branch: Option, #[arg(long)] from: Option, #[arg(long, default_value = "merge")] mode: CliLoadMode, #[arg(long)] json: bool, }, /// Branch operations Branch { #[command(subcommand)] command: BranchCommand, }, /// Schema planning operations Schema { #[command(subcommand)] command: SchemaCommand, }, /// Query validation and linting Query { #[command(subcommand)] command: QueryCommand, }, /// Show graph snapshot Snapshot { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] branch: Option, #[arg(long)] json: bool, }, /// Export a full graph snapshot as JSONL Export { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] branch: Option, #[arg(long, hide = true)] jsonl: bool, #[arg(long = "type")] type_names: Vec, #[arg(long = "table")] table_keys: Vec, }, /// Commit history operations Commit { #[command(subcommand)] command: CommitCommand, }, /// Execute a read query against a branch or snapshot Read { /// Graph URI #[arg(long)] uri: Option, #[arg(hide = true)] legacy_uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] alias: Option, #[arg(long)] query: Option, #[arg(long)] name: Option, #[command(flatten)] params: ParamsArgs, #[arg(long, conflicts_with = "snapshot")] branch: Option, #[arg(long, conflicts_with = "branch")] snapshot: Option, #[arg(long, conflicts_with = "json")] format: Option, #[arg(long, conflicts_with = "format")] json: bool, #[arg()] alias_args: Vec, }, /// Execute a graph change query against a branch Change { /// Graph URI #[arg(long)] uri: Option, #[arg(hide = true)] legacy_uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] alias: Option, #[arg(long)] query: Option, #[arg(long)] name: Option, #[command(flatten)] params: ParamsArgs, #[arg(long)] branch: Option, #[arg(long)] json: bool, #[arg()] alias_args: Vec, }, /// Policy administration and diagnostics Policy { #[command(subcommand)] command: PolicyCommand, }, /// Compact small Lance fragments in every table of the graph Optimize { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, /// Remove old Lance versions from every table of the graph (destructive) Cleanup { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, /// Number of recent versions to keep per table. Either `--keep` or /// `--older-than` (or both) must be set. #[arg(long)] keep: Option, /// Only remove versions older than this duration. Accepts Go-style /// durations: `7d`, `24h`, `90m`. At least one of --keep / --older-than. #[arg(long)] older_than: Option, /// Required to actually run; without it, prints what would be removed #[arg(long)] confirm: bool, #[arg(long)] json: bool, }, /// Manage graphs on a multi-graph server (MR-668) Graphs { #[command(subcommand)] command: GraphsCommand, }, } /// Operations on the graph registry of a multi-graph server (MR-668). /// /// All operations target a remote multi-graph server URL (http:// or /// https://). Local-URI invocations return a clear error. To add or /// remove graphs, operators edit `omnigraph.yaml` directly and restart /// the server — runtime mutation is not exposed in v0.7.0. #[derive(Debug, Subcommand)] enum GraphsCommand { /// List every graph registered with the multi-graph server. List { /// Remote server URL (e.g. `https://server.example.com`). #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, } #[derive(Debug, Subcommand)] enum BranchCommand { /// Create a new branch Create { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] from: Option, name: String, #[arg(long)] json: bool, }, /// List branches List { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, /// Delete a branch Delete { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, name: String, #[arg(long)] json: bool, }, /// Merge a source branch into a target branch Merge { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, source: String, #[arg(long)] into: Option, #[arg(long)] json: bool, }, } #[derive(Debug, Subcommand)] enum SchemaCommand { /// Plan a schema migration against the accepted persisted schema Plan { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] schema: PathBuf, #[arg(long)] json: bool, /// Show the plan as it would execute with `--allow-data-loss`. /// Promotes every `DropMode::Soft` step to `DropMode::Hard` /// so the plan output reflects the destructive intent. #[arg(long, default_value_t = false)] allow_data_loss: bool, }, /// Apply a supported schema migration Apply { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] schema: PathBuf, #[arg(long)] json: bool, /// Allow destructive (data-loss) schema changes. /// /// Without this flag, drops are "soft": the column or table /// is removed from the current manifest version but prior /// versions are retained, so `snapshot_at_version(pre_drop)` /// can still read the dropped data until `omnigraph cleanup` /// runs. With this flag, drops are "hard": `cleanup_old_versions` /// runs on the affected datasets immediately after the apply, /// making the prior data unreachable. #[arg(long, default_value_t = false)] allow_data_loss: bool, }, /// Show the current accepted schema source #[command(alias = "get")] Show { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] json: bool, }, } #[derive(Debug, Subcommand)] enum QueryCommand { /// Validate queries and report higher-level drift warnings #[command(visible_alias = "check")] Lint { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] query: PathBuf, #[arg(long)] schema: Option, #[arg(long)] json: bool, }, } #[derive(Debug, Subcommand)] enum CommitCommand { /// List graph commits List { /// Graph URI uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, #[arg(long)] branch: Option, #[arg(long)] json: bool, }, /// Show a graph commit Show { /// Graph URI #[arg(long)] uri: Option, #[arg(long)] target: Option, #[arg(long)] config: Option, commit_id: String, #[arg(long)] json: bool, }, } #[derive(Debug, Subcommand)] enum PolicyCommand { /// Validate policy YAML and compiled Cedar policy state Validate { #[arg(long)] config: Option, }, /// Run declarative policy tests from policy.tests.yaml Test { #[arg(long)] config: Option, }, /// Explain one policy decision locally Explain { #[arg(long)] config: Option, #[arg(long)] actor: String, #[arg(long)] action: PolicyAction, #[arg(long)] branch: Option, #[arg(long = "target-branch")] target_branch: Option, }, } #[derive(Debug, Args, Clone)] struct ParamsArgs { #[arg(long, conflicts_with = "params_file")] params: Option, #[arg(long, conflicts_with = "params")] params_file: Option, } #[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, ValueEnum)] #[serde(rename_all = "snake_case")] enum CliLoadMode { Overwrite, Append, Merge, } impl From for LoadMode { fn from(value: CliLoadMode) -> Self { match value { CliLoadMode::Overwrite => LoadMode::Overwrite, CliLoadMode::Append => LoadMode::Append, CliLoadMode::Merge => LoadMode::Merge, } } } impl CliLoadMode { fn as_str(self) -> &'static str { match self { CliLoadMode::Overwrite => "overwrite", CliLoadMode::Append => "append", CliLoadMode::Merge => "merge", } } } #[derive(Debug, Serialize)] struct LoadOutput<'a> { uri: &'a str, branch: &'a str, mode: &'a str, nodes_loaded: usize, edges_loaded: usize, node_types_loaded: usize, edge_types_loaded: usize, } #[derive(Debug, Serialize)] struct SchemaPlanOutput<'a> { uri: &'a str, supported: bool, step_count: usize, steps: &'a [SchemaMigrationStep], } fn print_schema_apply_human(output: &SchemaApplyOutput) { println!("schema apply for {}", output.uri); println!("supported: {}", if output.supported { "yes" } else { "no" }); println!("applied: {}", if output.applied { "yes" } else { "no" }); println!("manifest_version: {}", output.manifest_version); if output.steps.is_empty() { println!("no schema changes"); return; } for step in &output.steps { println!("- {}", render_schema_plan_step(step)); } } fn query_kind_label(kind: QueryLintQueryKind) -> &'static str { match kind { QueryLintQueryKind::Read => "read", QueryLintQueryKind::Mutation => "mutation", } } fn severity_label(severity: QueryLintSeverity) -> &'static str { match severity { QueryLintSeverity::Error => "ERROR", QueryLintSeverity::Warning => "WARN ", QueryLintSeverity::Info => "INFO ", } } fn print_query_lint_human(output: &QueryLintOutput) { for result in &output.results { match result.status { QueryLintStatus::Ok => { println!( "OK query `{}` ({})", result.name, query_kind_label(result.kind) ); } QueryLintStatus::Error => { println!( "ERROR query `{}`: {}", result.name, result.error.as_deref().unwrap_or("unknown error") ); } } for warning in &result.warnings { println!("WARN query `{}`: {}", result.name, warning); } } for finding in &output.findings { println!("{} {}", severity_label(finding.severity), finding.message); } println!( "INFO Lint complete: {} queries processed ({} error(s), {} warning(s), {} info item(s))", output.queries_processed, output.errors, output.warnings, output.infos ); } fn finish_query_lint(output: &QueryLintOutput, json: bool) -> Result<()> { if json { print_json(output)?; } else { print_query_lint_human(output); } if output.status == QueryLintStatus::Error { io::stdout().flush()?; std::process::exit(1); } Ok(()) } fn ensure_local_graph_parent(uri: &str) -> Result<()> { if !uri.contains("://") { fs::create_dir_all(uri)?; } Ok(()) } fn print_json(value: &T) -> Result<()> { println!("{}", serde_json::to_string_pretty(value)?); Ok(()) } fn is_remote_uri(uri: &str) -> bool { uri.starts_with("http://") || uri.starts_with("https://") } fn remote_url(base: &str, path: &str) -> String { format!("{}{}", base.trim_end_matches('/'), path) } fn remote_branch_url(base: &str, branch: &str) -> Result { let mut url = reqwest::Url::parse(&format!("{}/", base.trim_end_matches('/')))?; url.path_segments_mut() .map_err(|_| color_eyre::eyre::eyre!("invalid remote base url"))? .extend(["branches", branch]); Ok(url.to_string()) } fn normalize_bearer_token(value: Option) -> Option { value .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()) } fn bearer_token_from_env(var_name: &str) -> Option { normalize_bearer_token(std::env::var(var_name).ok()) } fn parse_env_assignment(line: &str) -> Option<(String, String)> { let line = line.trim(); if line.is_empty() || line.starts_with('#') { return None; } let line = line.strip_prefix("export ").unwrap_or(line).trim(); let (name, value) = line.split_once('=')?; let name = name.trim(); if name.is_empty() { return None; } let value = value.trim(); let value = if value.len() >= 2 && ((value.starts_with('"') && value.ends_with('"')) || (value.starts_with('\'') && value.ends_with('\''))) { &value[1..value.len() - 1] } else { value }; Some((name.to_string(), value.to_string())) } fn bearer_token_from_env_file(path: &Path, var_name: &str) -> Result> { if !path.exists() { return Ok(None); } for line in fs::read_to_string(path)?.lines() { let Some((name, value)) = parse_env_assignment(line) else { continue; }; if name == var_name { return Ok(normalize_bearer_token(Some(value))); } } Ok(None) } fn load_env_file_into_process(path: &Path) -> Result<()> { if !path.exists() { return Ok(()); } for line in fs::read_to_string(path)?.lines() { let Some((name, value)) = parse_env_assignment(line) else { continue; }; if std::env::var_os(&name).is_none() { unsafe { std::env::set_var(name, value); } } } Ok(()) } fn load_cli_config(config_path: Option<&PathBuf>) -> Result { let config = load_config(config_path)?; if let Some(path) = config.resolve_auth_env_file() { load_env_file_into_process(&path)?; } Ok(config) } fn resolve_policy_engine(config: &OmnigraphConfig) -> Result { let policy_file = config .resolve_policy_file() .ok_or_else(|| color_eyre::eyre::eyre!("policy.file must be set in omnigraph.yaml"))?; PolicyEngine::load_graph(&policy_file, &policy_graph_id(config)) } /// Open a local-URI graph and, when `policy.file` is configured in /// `omnigraph.yaml`, install the resolved `PolicyEngine` on the engine /// handle so every direct-engine write goes through /// `Omnigraph::enforce(...)` (MR-722). Without a configured policy this /// is identical to a bare `Omnigraph::open`. /// /// Returns owned `Omnigraph`; chained on top of `Omnigraph::open(...)`'s /// existing future to keep call sites narrow. async fn open_local_db_with_policy(uri: &str, config: &OmnigraphConfig) -> Result { let db = Omnigraph::open(uri).await?; if config.resolve_policy_file().is_some() { let engine = Arc::new(resolve_policy_engine(config)?); Ok(db.with_policy(engine as Arc)) } else { Ok(db) } } /// Resolve the CLI's effective actor identity for engine-layer policy /// (MR-722). Precedence: `--as ` (top-level flag) overrides /// `cli.actor` from `omnigraph.yaml`; both unset returns `None`. When /// policy is configured and this returns `None`, the engine-layer /// footgun guard intentionally denies — silent bypass via "I forgot the /// actor" is what the guard prevents. fn resolve_cli_actor<'a>(cli_as: Option<&'a str>, config: &'a OmnigraphConfig) -> Option<&'a str> { cli_as.or(config.cli.actor.as_deref()) } fn resolve_policy_tests_path(config: &OmnigraphConfig) -> Result { config.resolve_policy_tests_file().ok_or_else(|| { color_eyre::eyre::eyre!( "policy.tests.yaml requires policy.file to be set in omnigraph.yaml" ) }) } fn policy_graph_id(config: &OmnigraphConfig) -> String { if let Some(name) = &config.project.name { return name.clone(); } config .resolve_target_uri(None, None, config.server_graph_name()) .or_else(|_| config.resolve_target_uri(None, None, config.cli_graph_name())) .unwrap_or_else(|_| "default".to_string()) } fn resolve_remote_bearer_token( config: &OmnigraphConfig, explicit_uri: Option<&str>, explicit_target: Option<&str>, ) -> Result> { let scoped_env = config.graph_bearer_token_env(explicit_uri, explicit_target, config.cli_graph_name()); let mut env_names = Vec::new(); if let Some(name) = scoped_env { env_names.push(name.to_string()); } if env_names .iter() .all(|name| name != DEFAULT_BEARER_TOKEN_ENV) { env_names.push(DEFAULT_BEARER_TOKEN_ENV.to_string()); } let env_file = config.resolve_auth_env_file(); for env_name in env_names { if let Some(token) = bearer_token_from_env(&env_name) { return Ok(Some(token)); } if let Some(path) = env_file.as_ref() { if let Some(token) = bearer_token_from_env_file(path, &env_name)? { return Ok(Some(token)); } } } Ok(None) } fn build_http_client() -> Result { Ok(reqwest::Client::new()) } fn apply_bearer_token( request: reqwest::RequestBuilder, token: Option<&str>, ) -> reqwest::RequestBuilder { if let Some(token) = token { request.header(AUTHORIZATION, format!("Bearer {}", token)) } else { request } } async fn remote_json( client: &reqwest::Client, method: Method, url: String, body: Option, bearer_token: Option<&str>, ) -> Result { let request = apply_bearer_token(client.request(method, url), bearer_token); let request = if let Some(body) = body { request.json(&body) } else { request }; let response = request.send().await?; let status = response.status(); let text = response.text().await?; if !status.is_success() { if let Ok(error) = serde_json::from_str::(&text) { bail!(error.error); } bail!("server returned {}: {}", status, text); } Ok(serde_json::from_str(&text)?) } fn resolve_uri( config: &OmnigraphConfig, cli_uri: Option, cli_target: Option<&str>, ) -> Result { config.resolve_target_uri(cli_uri, cli_target, config.cli_graph_name()) } /// Parse a Go-style compact duration: `7d`, `24h`, `30m`, `90s`, or a plain /// integer as seconds. Used by the `cleanup --older-than` flag. fn parse_duration_arg(s: &str) -> Result { let s = s.trim(); if s.is_empty() { bail!("duration is empty"); } let (num_part, unit) = match s .char_indices() .rev() .find(|(_, c)| c.is_ascii_alphabetic()) { Some((i, _)) => ( &s[..i + 1 - s[i..].chars().next().unwrap().len_utf8()], &s[i..], ), None => (s, ""), }; let n: u64 = num_part .parse() .map_err(|e| color_eyre::eyre::eyre!("invalid duration '{}': {}", s, e))?; let secs = match unit { "" | "s" => n, "m" => n * 60, "h" => n * 60 * 60, "d" => n * 60 * 60 * 24, "w" => n * 60 * 60 * 24 * 7, _ => bail!("unknown duration unit '{}'. Supported: s, m, h, d, w", unit), }; Ok(std::time::Duration::from_secs(secs)) } fn resolve_local_uri( config: &OmnigraphConfig, cli_uri: Option, cli_target: Option<&str>, operation: &str, ) -> Result { let uri = resolve_uri(config, cli_uri, cli_target)?; if is_remote_uri(&uri) { bail!( "{} is only supported against local graph URIs in this milestone", operation ); } Ok(uri) } fn resolve_branch( config: &OmnigraphConfig, cli_branch: Option, alias_branch: Option, default_branch: &str, ) -> String { cli_branch .or(alias_branch) .or_else(|| config.cli.branch.clone()) .unwrap_or_else(|| default_branch.to_string()) } fn resolve_read_target( config: &OmnigraphConfig, cli_branch: Option, cli_snapshot: Option, alias_branch: Option, ) -> Result { if cli_branch.is_some() && cli_snapshot.is_some() { bail!("read target may specify branch or snapshot, not both"); } Ok(read_target_from_cli( cli_branch .or(alias_branch) .or_else(|| config.cli.branch.clone()), cli_snapshot, )) } fn resolve_query_path( config: &OmnigraphConfig, explicit_query: Option<&PathBuf>, alias_query: Option<&str>, ) -> Result { explicit_query .map(PathBuf::from) .or_else(|| alias_query.map(PathBuf::from)) .ok_or_else(|| { color_eyre::eyre::eyre!("exactly one of --query or --alias must be provided") }) .and_then(|query_path| config.resolve_query_path(&query_path)) } fn resolve_query_source( config: &OmnigraphConfig, explicit_query: Option<&PathBuf>, alias_query: Option<&str>, ) -> Result { Ok(fs::read_to_string(resolve_query_path( config, explicit_query, alias_query, )?)?) } fn parse_alias_value(value: &str) -> Value { serde_json::from_str(value).unwrap_or_else(|_| Value::String(value.to_string())) } fn merged_params_json( alias_name: Option<&str>, alias_arg_names: &[String], alias_arg_values: &[String], explicit: Option, ) -> Result> { if alias_arg_values.len() > alias_arg_names.len() { let alias = alias_name.unwrap_or(""); bail!( "alias '{}' expects at most {} args but got {}", alias, alias_arg_names.len(), alias_arg_values.len() ); } let mut merged = serde_json::Map::new(); for (arg_name, arg_value) in alias_arg_names.iter().zip(alias_arg_values.iter()) { merged.insert(arg_name.clone(), parse_alias_value(arg_value)); } match explicit { Some(Value::Object(object)) => { for (key, value) in object { merged.insert(key, value); } } Some(_) => bail!("params JSON must be an object"), None => {} } if merged.is_empty() { Ok(None) } else { Ok(Some(Value::Object(merged))) } } fn print_load_human( uri: &str, branch: &str, mode: CliLoadMode, nodes_loaded: usize, edges_loaded: usize, node_types_loaded: usize, edge_types_loaded: usize, ) { println!( "loaded {} on branch {} with {}: {} nodes across {} node types, {} edges across {} edge types", uri, branch, mode.as_str(), nodes_loaded, node_types_loaded, edges_loaded, edge_types_loaded ); } fn print_ingest_human(output: &IngestOutput) { println!( "ingested {} into branch {} from {} with {} ({})", output.uri, output.branch, output.base_branch, output.mode.as_str(), if output.branch_created { "branch created" } else { "branch exists" } ); for table in &output.tables { println!("{} rows_loaded={}", table.table_key, table.rows_loaded); } if let Some(actor_id) = &output.actor_id { println!("actor_id: {}", actor_id); } } fn print_schema_plan_human(uri: &str, plan: &SchemaMigrationPlan) { println!("schema plan for {}", uri); println!("supported: {}", if plan.supported { "yes" } else { "no" }); if plan.steps.is_empty() { println!("no schema changes"); return; } for step in &plan.steps { println!("- {}", render_schema_plan_step(step)); } } fn render_schema_plan_step(step: &SchemaMigrationStep) -> String { match step { SchemaMigrationStep::AddType { type_kind, name } => { format!("add {} type '{}'", schema_type_kind_label(*type_kind), name) } SchemaMigrationStep::RenameType { type_kind, from, to, } => format!( "rename {} type '{}' -> '{}'", schema_type_kind_label(*type_kind), from, to ), SchemaMigrationStep::AddProperty { type_kind, type_name, property_name, property_type, } => format!( "add property '{}.{}' ({}) on {} '{}'", type_name, property_name, render_prop_type(property_type), schema_type_kind_label(*type_kind), type_name ), SchemaMigrationStep::RenameProperty { type_kind, type_name, from, to, } => format!( "rename property '{}.{}' -> '{}.{}' on {} '{}'", type_name, from, type_name, to, schema_type_kind_label(*type_kind), type_name ), SchemaMigrationStep::AddConstraint { type_kind, type_name, constraint, } => format!( "add constraint {} on {} '{}'", render_constraint(constraint), schema_type_kind_label(*type_kind), type_name ), SchemaMigrationStep::UpdateTypeMetadata { type_kind, name, annotations, } => format!( "update metadata on {} '{}' ({})", schema_type_kind_label(*type_kind), name, render_annotations(annotations) ), SchemaMigrationStep::UpdatePropertyMetadata { type_kind, type_name, property_name, annotations, } => format!( "update metadata on property '{}.{}' of {} '{}' ({})", type_name, property_name, schema_type_kind_label(*type_kind), type_name, render_annotations(annotations) ), SchemaMigrationStep::DropType { type_kind, name, mode, } => format!( "drop {} type '{}' ({} mode)", schema_type_kind_label(*type_kind), name, drop_mode_label(*mode), ), SchemaMigrationStep::DropProperty { type_kind, type_name, property_name, mode, } => format!( "drop property '{}.{}' of {} '{}' ({} mode)", type_name, property_name, schema_type_kind_label(*type_kind), type_name, drop_mode_label(*mode), ), SchemaMigrationStep::UnsupportedChange { entity, reason, .. } => { // When a schema-lint code is attached, render code + tier // so operators see at-a-glance the kind of risk (destructive // / validated / safe) — not just the rule identifier. // Reach the diagnostic via the `diagnostic()` helper so the // CLI doesn't need to know how the lookup works. match step.diagnostic() { Some(diag) => format!( "unsupported change on {} [{}, {}]: {}", entity, diag.code, schema_lint_tier_label(diag.tier), reason, ), None => format!("unsupported change on {}: {}", entity, reason), } } } } fn schema_type_kind_label(kind: omnigraph_compiler::SchemaTypeKind) -> &'static str { match kind { omnigraph_compiler::SchemaTypeKind::Interface => "interface", omnigraph_compiler::SchemaTypeKind::Node => "node", omnigraph_compiler::SchemaTypeKind::Edge => "edge", } } fn schema_lint_tier_label(tier: omnigraph_compiler::SafetyTier) -> &'static str { match tier { omnigraph_compiler::SafetyTier::Safe => "safe", omnigraph_compiler::SafetyTier::Validated => "validated", omnigraph_compiler::SafetyTier::Destructive => "destructive", } } fn drop_mode_label(mode: omnigraph_compiler::DropMode) -> &'static str { match mode { omnigraph_compiler::DropMode::Soft => "soft", omnigraph_compiler::DropMode::Hard => "hard", } } fn render_prop_type(prop_type: &omnigraph_compiler::PropType) -> String { let base = if let Some(values) = &prop_type.enum_values { format!("Enum({})", values.join("|")) } else { prop_type.scalar.to_string() }; let base = if prop_type.list { format!("[{}]", base) } else { base }; if prop_type.nullable { format!("{}?", base) } else { base } } fn render_constraint(constraint: &omnigraph_compiler::schema::ast::Constraint) -> String { match constraint { omnigraph_compiler::schema::ast::Constraint::Key(columns) => { format!("@key({})", columns.join(", ")) } omnigraph_compiler::schema::ast::Constraint::Unique(columns) => { format!("@unique({})", columns.join(", ")) } omnigraph_compiler::schema::ast::Constraint::Index(columns) => { format!("@index({})", columns.join(", ")) } omnigraph_compiler::schema::ast::Constraint::Range { property, min, max } => { format!("@range({}, {:?}, {:?})", property, min, max) } omnigraph_compiler::schema::ast::Constraint::Check { property, pattern } => { format!("@check({}, {:?})", property, pattern) } } } fn render_annotations(annotations: &[omnigraph_compiler::schema::ast::Annotation]) -> String { annotations .iter() .map(|annotation| match &annotation.value { Some(value) => format!("@{}({})", annotation.name, value), None => format!("@{}", annotation.name), }) .collect::>() .join(", ") } fn print_embed_human(output: &EmbedOutput) { println!( "embedded {} rows (selected {}, cleaned {}) from {} -> {} [{} {}d]", output.embedded_rows, output.selected_rows, output.cleaned_rows, output.input, output.output, output.mode, output.dimension ); } fn print_snapshot_human(branch: &str, manifest_version: u64, entries: &[SnapshotTableOutput]) { println!("branch: {}", branch); println!("manifest_version: {}", manifest_version); for entry in entries { println!( "{} v{} branch={} rows={}", entry.table_key, entry.table_version, entry.table_branch.as_deref().unwrap_or("main"), entry.row_count ); } } fn print_read_output( output: &ReadOutput, format: ReadOutputFormat, config: &OmnigraphConfig, ) -> Result<()> { println!( "{}", render_read( output, format, &ReadRenderOptions { max_column_width: config.table_max_column_width(), cell_layout: config.table_cell_layout(), }, )? ); Ok(()) } fn print_change_human(output: &ChangeOutput) { println!( "changed {} via {}: {} nodes, {} edges", output.branch, output.query_name, output.affected_nodes, output.affected_edges ); if let Some(actor_id) = &output.actor_id { println!("actor_id: {}", actor_id); } } fn print_commit_list_human(commits: &[CommitOutput]) { for commit in commits { let branch = commit.manifest_branch.as_deref().unwrap_or("main"); println!( "{} branch={} version={}{}", commit.graph_commit_id, branch, commit.manifest_version, commit .actor_id .as_deref() .map(|actor| format!(" actor={}", actor)) .unwrap_or_default() ); } } fn print_commit_human(commit: &CommitOutput) { println!("graph_commit_id: {}", commit.graph_commit_id); println!( "manifest_branch: {}", commit.manifest_branch.as_deref().unwrap_or("main") ); println!("manifest_version: {}", commit.manifest_version); if let Some(parent_commit_id) = &commit.parent_commit_id { println!("parent_commit_id: {}", parent_commit_id); } if let Some(merged_parent_commit_id) = &commit.merged_parent_commit_id { println!("merged_parent_commit_id: {}", merged_parent_commit_id); } if let Some(actor_id) = &commit.actor_id { println!("actor_id: {}", actor_id); } println!("created_at: {}", commit.created_at); } fn print_policy_explain(decision: &PolicyDecision, actor_id: &str, request: &PolicyRequest) { println!( "decision: {}", if decision.allowed { "allow" } else { "deny" } ); println!("actor: {}", actor_id); println!("action: {}", request.action); if let Some(branch) = &request.branch { println!("branch: {}", branch); } if let Some(target_branch) = &request.target_branch { println!("target_branch: {}", target_branch); } if let Some(rule_id) = &decision.matched_rule_id { println!("matched_rule: {}", rule_id); } println!("message: {}", decision.message); } fn resolve_read_format( config: &OmnigraphConfig, cli_format: Option, json: bool, alias_format: Option, ) -> ReadOutputFormat { if json { ReadOutputFormat::Json } else { cli_format .or(alias_format) .unwrap_or_else(|| config.cli_output_format()) } } fn resolve_alias<'a>( config: &'a OmnigraphConfig, alias_name: Option<&'a str>, expected: AliasCommand, ) -> Result> { let Some(alias_name) = alias_name else { return Ok(None); }; let alias = config.alias(alias_name)?; if alias.command != expected { bail!( "alias '{}' is a {:?} alias, not a {:?} alias", alias_name, alias.command, expected ); } Ok(Some((alias_name, alias))) } fn normalize_legacy_alias_uri( uri: Option, target_available: bool, alias_name: Option<&str>, mut alias_args: Vec, ) -> (Option, Vec) { let Some(candidate) = uri else { return (None, alias_args); }; if alias_name.is_some() && target_available { alias_args.insert(0, candidate); return (None, alias_args); } (Some(candidate), alias_args) } fn scaffold_config_if_missing(uri: &str) -> Result<()> { let path = inferred_config_path(uri)?; if path.exists() { return Ok(()); } fs::write( path, format!( "\ project: name: Omnigraph Project graphs: local: uri: {} # bearer_token_env: OMNIGRAPH_BEARER_TOKEN server: graph: local bind: 127.0.0.1:8080 cli: graph: local branch: main output_format: table table_max_column_width: 80 table_cell_layout: truncate query: roots: - queries - . aliases: # owner: # command: read # query: context.gq # name: decision_owner # args: [slug] # graph: local # branch: main # format: kv # # attach_trace: # command: change # query: mutations.gq # name: attach_trace # args: [decision_slug, trace_slug] # graph: local # branch: main # auth: # env_file: ./.env.omni # # policy: # file: ./policy.yaml ", yaml_string(uri), ), )?; Ok(()) } fn yaml_string(value: &str) -> String { format!("'{}'", value.replace('\'', "''")) } fn inferred_config_path(uri: &str) -> Result { if uri.contains("://") { return Ok(omnigraph_server::config::default_config_path()); } let path = Path::new(uri); let base = if path.is_absolute() { path.parent() .map(Path::to_path_buf) .unwrap_or(std::env::current_dir()?) } else { std::env::current_dir()?.join(path.parent().unwrap_or_else(|| Path::new("."))) }; Ok(base.join(omnigraph_server::config::DEFAULT_CONFIG_FILE)) } fn read_target_from_cli(branch: Option, snapshot: Option) -> ReadTarget { if let Some(snapshot) = snapshot { ReadTarget::snapshot(SnapshotId::new(snapshot)) } else { ReadTarget::branch(branch.unwrap_or_else(|| "main".to_string())) } } fn load_params_json(params: &ParamsArgs) -> Result> { match (¶ms.params, ¶ms.params_file) { (Some(inline), None) => Ok(Some(serde_json::from_str(inline)?)), (None, Some(path)) => Ok(Some(serde_json::from_str(&fs::read_to_string(path)?)?)), (None, None) => Ok(None), (Some(_), Some(_)) => bail!("only one of --params or --params-file may be provided"), } } fn select_named_query( query_source: &str, requested_name: Option<&str>, ) -> Result<(String, Vec)> { let parsed = parse_query(query_source)?; let query = if let Some(name) = requested_name { parsed .queries .into_iter() .find(|query| query.name == name) .ok_or_else(|| color_eyre::eyre::eyre!("query '{}' not found", name))? } else if parsed.queries.len() == 1 { parsed.queries.into_iter().next().unwrap() } else { bail!("query file contains multiple queries; pass --name"); }; Ok((query.name, query.params)) } fn query_params_from_json( query_params: &[omnigraph_compiler::query::ast::Param], params_json: Option<&Value>, ) -> Result { json_params_to_param_map(params_json, query_params, JsonParamMode::Standard) .map_err(|err| color_eyre::eyre::eyre!(err.to_string())) } async fn execute_query_lint( config: &OmnigraphConfig, cli_uri: Option, cli_target: Option<&str>, schema_path: Option<&PathBuf>, query_path: &PathBuf, ) -> Result { let resolved_query_path = resolve_query_path(config, Some(query_path), None)?; let query_source = fs::read_to_string(&resolved_query_path)?; let query_path = resolved_query_path.to_string_lossy().into_owned(); if let Some(schema_path) = schema_path { let schema_source = fs::read_to_string(schema_path)?; let schema = parse_schema(&schema_source).map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?; let catalog = build_catalog(&schema).map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?; return Ok(lint_query_file( &catalog, &query_source, query_path, QueryLintSchemaSource::file(schema_path.to_string_lossy().into_owned()), )); } let has_graph_target = cli_uri.is_some() || cli_target.is_some() || config.cli_graph_name().is_some(); if !has_graph_target { bail!("query lint requires --schema or a resolvable graph target"); } let uri = resolve_local_uri(config, cli_uri, cli_target, "query lint")?; let db = Omnigraph::open(&uri).await?; Ok(lint_query_file( &db.catalog(), &query_source, query_path, QueryLintSchemaSource::graph(uri), )) } async fn execute_read( uri: &str, query_source: &str, query_name: Option<&str>, target: ReadTarget, params_json: Option<&Value>, ) -> Result { let (selected_name, query_params) = select_named_query(query_source, query_name)?; let params = query_params_from_json(&query_params, params_json)?; let db = Omnigraph::open(uri).await?; let result = db .query(target.clone(), query_source, &selected_name, ¶ms) .await?; Ok(read_output(selected_name, &target, result)) } async fn execute_read_remote( client: &reqwest::Client, uri: &str, query_source: &str, query_name: Option<&str>, target: ReadTarget, params_json: Option<&Value>, bearer_token: Option<&str>, ) -> Result { let (branch, snapshot) = match &target { ReadTarget::Branch(branch) => (Some(branch.clone()), None), ReadTarget::Snapshot(snapshot) => (None, Some(snapshot.as_str().to_string())), }; remote_json( client, Method::POST, remote_url(uri, "/read"), Some(serde_json::to_value(ReadRequest { query_source: query_source.to_string(), query_name: query_name.map(ToOwned::to_owned), params: params_json.cloned(), branch, snapshot, })?), bearer_token, ) .await } async fn execute_change( uri: &str, query_source: &str, query_name: Option<&str>, branch: &str, params_json: Option<&Value>, config: &OmnigraphConfig, cli_as_actor: Option<&str>, ) -> Result { let (selected_name, query_params) = select_named_query(query_source, query_name)?; let params = query_params_from_json(&query_params, params_json)?; let db = open_local_db_with_policy(uri, config).await?; let actor = resolve_cli_actor(cli_as_actor, config); let result = db .mutate_as(branch, query_source, &selected_name, ¶ms, actor) .await?; Ok(ChangeOutput { branch: branch.to_string(), query_name: selected_name, affected_nodes: result.affected_nodes, affected_edges: result.affected_edges, actor_id: actor.map(String::from), }) } async fn execute_change_remote( client: &reqwest::Client, uri: &str, query_source: &str, query_name: Option<&str>, branch: &str, params_json: Option<&Value>, bearer_token: Option<&str>, ) -> Result { remote_json( client, Method::POST, remote_url(uri, "/change"), Some(serde_json::to_value(ChangeRequest { query_source: query_source.to_string(), query_name: query_name.map(ToOwned::to_owned), params: params_json.cloned(), branch: Some(branch.to_string()), })?), bearer_token, ) .await } async fn execute_export_to_writer( uri: &str, branch: &str, type_names: &[String], table_keys: &[String], writer: &mut W, ) -> Result<()> { let db = Omnigraph::open(uri).await?; db.export_jsonl_to_writer(branch, type_names, table_keys, writer) .await?; writer.flush()?; Ok(()) } async fn execute_export_remote_to_writer( client: &reqwest::Client, uri: &str, branch: &str, type_names: &[String], table_keys: &[String], bearer_token: Option<&str>, writer: &mut W, ) -> Result<()> { let request = apply_bearer_token( client.request(Method::POST, remote_url(uri, "/export")), bearer_token, ) .json(&ExportRequest { branch: Some(branch.to_string()), type_names: type_names.to_vec(), table_keys: table_keys.to_vec(), }); let mut response = request.send().await?; let status = response.status(); if !status.is_success() { let text = response.text().await?; if let Ok(error) = serde_json::from_str::(&text) { bail!(error.error); } bail!("server returned {}: {}", status, text); } while let Some(chunk) = response.chunk().await? { writer.write_all(&chunk)?; } writer.flush()?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { color_eyre::install()?; let cli = { let matches = Cli::command() .arg( Arg::new("version") .short('v') .long("version") .action(ArgAction::Version) .help("Print version"), ) .get_matches(); Cli::from_arg_matches(&matches)? }; let http_client = build_http_client()?; match cli.command { Command::Version => { println!("omnigraph {}", env!("CARGO_PKG_VERSION")); } Command::Embed(args) => { let output = execute_embed(&args).await?; if args.json { print_json(&output)?; } else { print_embed_human(&output); } } Command::Init { schema, uri, force } => { let schema_source = fs::read_to_string(&schema)?; ensure_local_graph_parent(&uri)?; Omnigraph::init_with_options( &uri, &schema_source, omnigraph::db::InitOptions { force }, ) .await?; scaffold_config_if_missing(&uri)?; println!("initialized {}", uri); } Command::Load { uri, target, config, data, branch, mode, json, } => { let config = load_cli_config(config.as_ref())?; let uri = resolve_local_uri(&config, uri, target.as_deref(), "load")?; let branch = resolve_branch(&config, branch, None, "main"); let db = open_local_db_with_policy(&uri, &config).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let result = db .load_file_as(&branch, &data.to_string_lossy(), mode.into(), actor) .await?; let payload = LoadOutput { uri: &uri, branch: &branch, mode: mode.as_str(), nodes_loaded: result.nodes_loaded.values().sum(), edges_loaded: result.edges_loaded.values().sum(), node_types_loaded: result.nodes_loaded.len(), edge_types_loaded: result.edges_loaded.len(), }; if json { print_json(&payload)?; } else { print_load_human( &uri, &branch, mode, payload.nodes_loaded, payload.edges_loaded, payload.node_types_loaded, payload.edge_types_loaded, ); } } Command::Ingest { uri, target, config, data, branch, from, mode, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let branch = resolve_branch(&config, branch, None, "main"); let from = resolve_branch(&config, from, None, "main"); let payload = if is_remote_uri(&uri) { let data = fs::read_to_string(&data)?; remote_json::( &http_client, Method::POST, remote_url(&uri, "/ingest"), Some(serde_json::to_value(IngestRequest { branch: Some(branch.clone()), from: Some(from.clone()), mode: Some(mode.into()), data, })?), bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&uri, &config).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let result = db .ingest_file_as( &branch, Some(&from), &data.to_string_lossy(), mode.into(), actor, ) .await?; ingest_output(&uri, &result, None) }; if json { print_json(&payload)?; } else { print_ingest_human(&payload); } } Command::Branch { command } => match command { BranchCommand::Create { uri, target, config, from, name, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let from = resolve_branch(&config, from, None, "main"); let payload = if is_remote_uri(&uri) { remote_json::( &http_client, Method::POST, remote_url(&uri, "/branches"), Some(serde_json::to_value(BranchCreateRequest { from: Some(from.clone()), name: name.clone(), })?), bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&uri, &config).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); db.branch_create_from_as(ReadTarget::branch(&from), &name, actor) .await?; BranchCreateOutput { uri: uri.clone(), from: from.clone(), name: name.clone(), actor_id: actor.map(String::from), } }; if json { print_json(&payload)?; } else { println!("created branch {} from {}", payload.name, payload.from); } } BranchCommand::List { uri, target, config, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let payload = if is_remote_uri(&uri) { remote_json::( &http_client, Method::GET, remote_url(&uri, "/branches"), None, bearer_token.as_deref(), ) .await? } else { let db = Omnigraph::open(&uri).await?; let mut branches = db.branch_list().await?; branches.sort(); BranchListOutput { branches } }; if json { print_json(&payload)?; } else { for branch in payload.branches { println!("{}", branch); } } } BranchCommand::Delete { uri, target, config, name, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let payload = if is_remote_uri(&uri) { remote_json::( &http_client, Method::DELETE, remote_branch_url(&uri, &name)?, None, bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&uri, &config).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); db.branch_delete_as(&name, actor).await?; BranchDeleteOutput { uri: uri.clone(), name: name.clone(), actor_id: actor.map(String::from), } }; if json { print_json(&payload)?; } else { println!("deleted branch {}", payload.name); } } BranchCommand::Merge { uri, target, config, source, into, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let into = resolve_branch(&config, into, None, "main"); let payload = if is_remote_uri(&uri) { remote_json::( &http_client, Method::POST, remote_url(&uri, "/branches/merge"), Some(serde_json::to_value(BranchMergeRequest { source: source.clone(), target: Some(into.clone()), })?), bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&uri, &config).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let outcome = db.branch_merge_as(&source, &into, actor).await?; BranchMergeOutput { source: source.clone(), target: into.clone(), outcome: outcome.into(), actor_id: actor.map(String::from), } }; if json { print_json(&payload)?; } else { println!( "merged {} into {}: {}", payload.source, payload.target, payload.outcome.as_str() ); } } }, Command::Commit { command } => match command { CommitCommand::List { uri, target, config, branch, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let commits = if is_remote_uri(&uri) { remote_json::( &http_client, Method::GET, if let Some(branch) = branch.as_deref() { format!("{}?branch={}", remote_url(&uri, "/commits"), branch) } else { remote_url(&uri, "/commits") }, None, bearer_token.as_deref(), ) .await? .commits } else { let db = Omnigraph::open(&uri).await?; db.list_commits(branch.as_deref()) .await? .iter() .map(commit_output) .collect::>() }; if json { print_json(&CommitListOutput { commits })?; } else { print_commit_list_human(&commits); } } CommitCommand::Show { uri, target, config, commit_id, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let commit = if is_remote_uri(&uri) { remote_json::( &http_client, Method::GET, remote_url(&uri, &format!("/commits/{}", commit_id)), None, bearer_token.as_deref(), ) .await? } else { let db = Omnigraph::open(&uri).await?; commit_output(&db.get_commit(&commit_id).await?) }; if json { print_json(&commit)?; } else { print_commit_human(&commit); } } }, Command::Schema { command } => match command { SchemaCommand::Plan { uri, target, config, schema, json, allow_data_loss, } => { let config = load_cli_config(config.as_ref())?; let uri = resolve_local_uri(&config, uri, target.as_deref(), "schema plan")?; let schema_source = fs::read_to_string(&schema)?; let db = Omnigraph::open(&uri).await?; let plan = db .plan_schema_with_options( &schema_source, omnigraph::db::SchemaApplyOptions { allow_data_loss }, ) .await?; let output = SchemaPlanOutput { uri: &uri, supported: plan.supported, step_count: plan.steps.len(), steps: &plan.steps, }; if json { print_json(&output)?; } else { print_schema_plan_human(&uri, &plan); } } SchemaCommand::Apply { uri, target, config, schema, json, allow_data_loss, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let schema_source = fs::read_to_string(&schema)?; let output = if is_remote_uri(&uri) { // MR-694 PR B: SchemaApplyRequest gained an // allow_data_loss field so Hard-mode drops are no // longer CLI-only. The previous bail is gone; the // field is forwarded into the JSON payload, and // the server's `server_schema_apply` honors it. remote_json::( &http_client, Method::POST, remote_url(&uri, "/schema/apply"), Some(serde_json::to_value(SchemaApplyRequest { schema_source: schema_source.clone(), allow_data_loss, })?), bearer_token.as_deref(), ) .await? } else { let db = open_local_db_with_policy(&uri, &config).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let result = db .apply_schema_as( &schema_source, omnigraph::db::SchemaApplyOptions { allow_data_loss }, actor, ) .await?; schema_apply_output(&uri, result) }; if json { print_json(&output)?; } else { print_schema_apply_human(&output); } } SchemaCommand::Show { uri, target, config, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let output = if is_remote_uri(&uri) { remote_json::( &http_client, Method::GET, remote_url(&uri, "/schema"), None, bearer_token.as_deref(), ) .await? } else { let db = Omnigraph::open(&uri).await?; SchemaOutput { schema_source: db.schema_source().to_string(), } }; if json { print_json(&output)?; } else { println!("{}", output.schema_source); } } }, Command::Query { command } => match command { QueryCommand::Lint { uri, target, config, query, schema, json, } => { let config = load_cli_config(config.as_ref())?; let output = execute_query_lint(&config, uri, target.as_deref(), schema.as_ref(), &query) .await?; finish_query_lint(&output, json)?; } }, Command::Snapshot { uri, target, config, branch, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let branch = resolve_branch(&config, branch, None, "main"); let payload = if is_remote_uri(&uri) { remote_json::( &http_client, Method::GET, format!("{}?branch={}", remote_url(&uri, "/snapshot"), branch), None, bearer_token.as_deref(), ) .await? } else { let db = Omnigraph::open(&uri).await?; let snapshot = db.snapshot_of(ReadTarget::branch(branch.as_str())).await?; snapshot_payload(&branch, &snapshot) }; if json { print_json(&payload)?; } else { print_snapshot_human(&payload.branch, payload.manifest_version, &payload.tables); } } Command::Export { uri, target, config, branch, jsonl, type_names, table_keys, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let branch = resolve_branch(&config, branch, None, "main"); if jsonl { eprintln!("warning: --jsonl is deprecated; `omnigraph export` always emits JSONL"); } let stdout = io::stdout(); let mut stdout = stdout.lock(); if is_remote_uri(&uri) { execute_export_remote_to_writer( &http_client, &uri, &branch, &type_names, &table_keys, bearer_token.as_deref(), &mut stdout, ) .await?; } else { execute_export_to_writer(&uri, &branch, &type_names, &table_keys, &mut stdout) .await?; } } Command::Read { uri, legacy_uri, target, config, alias, query, name, params, branch, snapshot, format, json, alias_args, } => { if alias.is_some() == query.is_some() { bail!("exactly one of --alias or --query must be provided"); } let config = load_cli_config(config.as_ref())?; let alias = resolve_alias(&config, alias.as_deref(), AliasCommand::Read)?; let alias_name = alias.as_ref().map(|(name, _)| *name); let alias_config = alias.as_ref().map(|(_, alias)| *alias); let target_available = target.is_some() || alias_config .and_then(|alias| alias.graph.as_deref()) .is_some() || config.cli_graph_name().is_some(); let (legacy_uri, alias_args) = normalize_legacy_alias_uri(legacy_uri, target_available, alias_name, alias_args); let uri = uri.or(legacy_uri); let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.graph.as_deref())); let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?; let uri = resolve_uri(&config, uri, target_name)?; let query_source = resolve_query_source( &config, query.as_ref(), alias_config.map(|a| a.query.as_str()), )?; let params_json = merged_params_json( alias_name, alias_config .map(|alias| alias.args.as_slice()) .unwrap_or(&[]), &alias_args, load_params_json(¶ms)?, )?; let target = resolve_read_target( &config, branch, snapshot, alias_config.and_then(|alias| alias.branch.clone()), )?; let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone())); let output = if is_remote_uri(&uri) { execute_read_remote( &http_client, &uri, &query_source, query_name.as_deref(), target, params_json.as_ref(), bearer_token.as_deref(), ) .await? } else { execute_read( &uri, &query_source, query_name.as_deref(), target, params_json.as_ref(), ) .await? }; let format = resolve_read_format( &config, format, json, alias_config.and_then(|alias| alias.format), ); print_read_output(&output, format, &config)?; } Command::Change { uri, legacy_uri, target, config, alias, query, name, params, branch, json, alias_args, } => { if alias.is_some() == query.is_some() { bail!("exactly one of --alias or --query must be provided"); } let config = load_cli_config(config.as_ref())?; let alias = resolve_alias(&config, alias.as_deref(), AliasCommand::Change)?; let alias_name = alias.as_ref().map(|(name, _)| *name); let alias_config = alias.as_ref().map(|(_, alias)| *alias); let target_available = target.is_some() || alias_config .and_then(|alias| alias.graph.as_deref()) .is_some() || config.cli_graph_name().is_some(); let (legacy_uri, alias_args) = normalize_legacy_alias_uri(legacy_uri, target_available, alias_name, alias_args); let uri = uri.or(legacy_uri); let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.graph.as_deref())); let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?; let uri = resolve_uri(&config, uri, target_name)?; let query_source = resolve_query_source( &config, query.as_ref(), alias_config.map(|a| a.query.as_str()), )?; let params_json = merged_params_json( alias_name, alias_config .map(|alias| alias.args.as_slice()) .unwrap_or(&[]), &alias_args, load_params_json(¶ms)?, )?; let branch = resolve_branch( &config, branch, alias_config.and_then(|alias| alias.branch.clone()), "main", ); let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone())); let output = if is_remote_uri(&uri) { execute_change_remote( &http_client, &uri, &query_source, query_name.as_deref(), &branch, params_json.as_ref(), bearer_token.as_deref(), ) .await? } else { execute_change( &uri, &query_source, query_name.as_deref(), &branch, params_json.as_ref(), &config, cli.as_actor.as_deref(), ) .await? }; if json { print_json(&output)?; } else { print_change_human(&output); } } Command::Policy { command } => match command { PolicyCommand::Validate { config } => { let config = load_cli_config(config.as_ref())?; let engine = resolve_policy_engine(&config)?; let policy_file = config .resolve_policy_file() .expect("policy file should exist after resolve_policy_engine"); println!( "policy valid: {} [{} actors]", policy_file.display(), engine.known_actor_count() ); } PolicyCommand::Test { config } => { let config = load_cli_config(config.as_ref())?; let engine = resolve_policy_engine(&config)?; let tests_path = resolve_policy_tests_path(&config)?; let tests = PolicyTestConfig::load(&tests_path)?; engine.run_tests(&tests)?; println!("policy tests passed: {} cases", tests.cases.len()); } PolicyCommand::Explain { config, actor, action, branch, target_branch, } => { let config = load_cli_config(config.as_ref())?; let engine = resolve_policy_engine(&config)?; let request = PolicyRequest { action, branch, target_branch, }; let decision = engine.authorize(&actor, &request)?; print_policy_explain(&decision, &actor, &request); } }, Command::Optimize { uri, target, config, json, } => { let config = load_cli_config(config.as_ref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let db = Omnigraph::open(&uri).await?; let stats = db.optimize().await?; if json { let value = serde_json::json!({ "uri": uri, "tables": stats.iter().map(|s| serde_json::json!({ "table_key": s.table_key, "fragments_removed": s.fragments_removed, "fragments_added": s.fragments_added, "committed": s.committed, })).collect::>(), }); print_json(&value)?; } else { println!("optimize {} — {} tables", uri, stats.len()); for s in &stats { if s.committed { println!( " {:<40} frags {} → {} ✓", s.table_key, s.fragments_removed + s.fragments_added - s.fragments_added, s.fragments_added ); } else { println!(" {:<40} no-op", s.table_key); } } } } Command::Cleanup { uri, target, config, keep, older_than, confirm, json, } => { let config = load_cli_config(config.as_ref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; let older_than_dur = older_than.as_deref().map(parse_duration_arg).transpose()?; if keep.is_none() && older_than_dur.is_none() { bail!("cleanup requires at least one of --keep or --older-than"); } let policy_desc = match (keep, older_than_dur) { (Some(k), Some(d)) => { format!("keep {} versions, remove anything older than {:?}", k, d) } (Some(k), None) => format!("keep {} versions", k), (None, Some(d)) => format!("remove anything older than {:?}", d), _ => unreachable!(), }; if !confirm { eprintln!( "cleanup is destructive — rerun with --confirm. Policy for {}: {}", uri, policy_desc ); return Ok(()); } let options = omnigraph::db::CleanupPolicyOptions { keep_versions: keep, older_than: older_than_dur, }; let mut db = Omnigraph::open(&uri).await?; let stats = db.cleanup(options).await?; if json { let value = serde_json::json!({ "uri": uri, "keep_versions": keep, "older_than_secs": older_than_dur.map(|d| d.as_secs()), "tables": stats.iter().map(|s| serde_json::json!({ "table_key": s.table_key, "bytes_removed": s.bytes_removed, "old_versions_removed": s.old_versions_removed, })).collect::>(), }); print_json(&value)?; } else { let total_bytes: u64 = stats.iter().map(|s| s.bytes_removed).sum(); let total_versions: u64 = stats.iter().map(|s| s.old_versions_removed).sum(); println!( "cleanup {} ({}) — removed {} versions ({} bytes) across {} tables", uri, policy_desc, total_versions, total_bytes, stats.len() ); } } Command::Graphs { command } => match command { GraphsCommand::List { uri, target, config, json, } => { let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; let uri = resolve_uri(&config, uri, target.as_deref())?; if !is_remote_uri(&uri) { bail!( "`omnigraph graphs list` requires a remote multi-graph server URL \ (http:// or https://). To enumerate local graphs, read `omnigraph.yaml` \ directly." ); } let payload = remote_json::( &http_client, Method::GET, remote_url(&uri, "/graphs"), None, bearer_token.as_deref(), ) .await?; if json { print_json(&payload)?; } else { for entry in payload.graphs { println!("{}\t{}", entry.graph_id, entry.uri); } } } }, } Ok(()) } #[cfg(test)] mod tests { use std::fs; use super::{ DEFAULT_BEARER_TOKEN_ENV, apply_bearer_token, bearer_token_from_env_file, load_cli_config, load_env_file_into_process, normalize_bearer_token, parse_env_assignment, resolve_remote_bearer_token, }; use omnigraph_server::load_config; use reqwest::header::AUTHORIZATION; use tempfile::tempdir; #[test] fn apply_bearer_token_adds_header_when_configured() { let client = reqwest::Client::new(); let request = apply_bearer_token(client.get("http://example.com"), Some("demo-token")) .build() .unwrap(); assert_eq!( request .headers() .get(AUTHORIZATION) .and_then(|value| value.to_str().ok()), Some("Bearer demo-token") ); } #[test] fn apply_bearer_token_leaves_request_unchanged_when_not_configured() { let client = reqwest::Client::new(); let request = apply_bearer_token(client.get("http://example.com"), None) .build() .unwrap(); assert!(request.headers().get(AUTHORIZATION).is_none()); } #[test] fn normalize_bearer_token_trims_and_filters_blank_values() { assert_eq!(normalize_bearer_token(None), None); assert_eq!(normalize_bearer_token(Some(" ".to_string())), None); assert_eq!( normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(), Some("demo-token") ); } #[test] fn parse_env_assignment_supports_plain_and_exported_values() { assert_eq!( parse_env_assignment("DEMO_TOKEN=demo-token"), Some(("DEMO_TOKEN".to_string(), "demo-token".to_string())) ); assert_eq!( parse_env_assignment("export DEMO_TOKEN=\"quoted-token\""), Some(("DEMO_TOKEN".to_string(), "quoted-token".to_string())) ); assert_eq!(parse_env_assignment("# comment"), None); assert_eq!(parse_env_assignment(" "), None); } #[test] fn bearer_token_from_env_file_reads_named_value() { let temp = tempdir().unwrap(); let env_file = temp.path().join(".env.omni"); fs::write( &env_file, "FIRST=ignore\nexport DEMO_TOKEN=\" demo-token \"\n", ) .unwrap(); assert_eq!( bearer_token_from_env_file(&env_file, "DEMO_TOKEN") .unwrap() .as_deref(), Some("demo-token") ); assert_eq!( bearer_token_from_env_file(&env_file, "MISSING").unwrap(), None ); } #[test] fn load_env_file_into_process_sets_missing_values_without_overriding_existing_ones() { let temp = tempdir().unwrap(); let env_file = temp.path().join(".env.omni"); fs::write( &env_file, "AUTOLOAD_ONLY=from-file\nAUTOLOAD_PRESET=from-file\n", ) .unwrap(); let missing_key = "AUTOLOAD_ONLY"; let preset_key = "AUTOLOAD_PRESET"; let previous_missing = std::env::var_os(missing_key); let previous_preset = std::env::var_os(preset_key); unsafe { std::env::remove_var(missing_key); std::env::set_var(preset_key, "from-env"); } load_env_file_into_process(&env_file).unwrap(); assert_eq!(std::env::var(missing_key).unwrap(), "from-file"); assert_eq!(std::env::var(preset_key).unwrap(), "from-env"); unsafe { if let Some(value) = previous_missing { std::env::set_var(missing_key, value); } else { std::env::remove_var(missing_key); } if let Some(value) = previous_preset { std::env::set_var(preset_key, value); } else { std::env::remove_var(preset_key); } } } #[test] fn resolve_remote_bearer_token_uses_scoped_env_file_with_global_fallback() { let temp = tempdir().unwrap(); fs::write( temp.path().join("omnigraph.yaml"), r#" graphs: demo: uri: https://example.com bearer_token_env: DEMO_TOKEN auth: env_file: .env.omni cli: graph: demo "#, ) .unwrap(); fs::write( temp.path().join(".env.omni"), "DEMO_TOKEN=scoped-token\nOMNIGRAPH_BEARER_TOKEN=global-token\n", ) .unwrap(); let previous = std::env::var_os(DEFAULT_BEARER_TOKEN_ENV); unsafe { std::env::remove_var(DEFAULT_BEARER_TOKEN_ENV); } let config_path = temp.path().join("omnigraph.yaml"); let config = load_config(Some(&config_path)).unwrap(); assert_eq!( resolve_remote_bearer_token(&config, None, Some("demo")) .unwrap() .as_deref(), Some("scoped-token") ); assert_eq!( resolve_remote_bearer_token(&config, Some("https://override.example.com"), None) .unwrap() .as_deref(), Some("global-token") ); unsafe { if let Some(value) = previous { std::env::set_var(DEFAULT_BEARER_TOKEN_ENV, value); } else { std::env::remove_var(DEFAULT_BEARER_TOKEN_ENV); } } } #[test] fn load_cli_config_autoloads_env_file_into_process() { let temp = tempdir().unwrap(); fs::write( temp.path().join("omnigraph.yaml"), r#" auth: env_file: .env.omni graphs: demo: uri: s3://bucket/prefix "#, ) .unwrap(); fs::write( temp.path().join(".env.omni"), "AUTOLOAD_FROM_CONFIG=loaded\n", ) .unwrap(); let key = "AUTOLOAD_FROM_CONFIG"; let previous = std::env::var_os(key); unsafe { std::env::remove_var(key); } let config_path = temp.path().join("omnigraph.yaml"); let config = load_cli_config(Some(&config_path)).unwrap(); assert_eq!( config.resolve_target_uri(None, Some("demo"), None).unwrap(), "s3://bucket/prefix" ); assert_eq!(std::env::var(key).unwrap(), "loaded"); unsafe { if let Some(value) = previous { std::env::set_var(key, value); } else { std::env::remove_var(key); } } } }