From d32c1ac191fbf88ef0fc249b1c6aa393cbd8cef1 Mon Sep 17 00:00:00 2001 From: Andrew Altshuler Date: Sat, 13 Jun 2026 19:25:57 +0300 Subject: [PATCH] refactor(cli): collapse write/query forks onto GraphClient (RFC-009 Phase 3b) (#211) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3a put the GraphClient enum in place and collapsed the five uniform read forks. 3b folds the remaining data-plane forks onto the same enum: load, ingest, mutate, query, branch create/delete/merge, and schema apply. The wrinkle 3a deferred was the local policy attachment. Reads and query open the local engine without a policy; writes open through open_local_db_with_policy and attribute a resolved actor. So the Embedded variant grows an optional policy context (graph/actor) filled by a second factory, resolve_with_policy; resolve() leaves it empty. open_embedded picks the open path from whether the context is present, preserving both of today's behaviors exactly. query still uses resolve() (no policy), as the read path did. apply_schema takes the catalog-validator closure as impl FnOnce(&Catalog) — the embedded arm runs it inside apply_schema_as_with_catalog_check, the remote arm ignores it (the server runs its own check). That non-object-safe closure is why GraphClient is an enum, not a trait. The stored-query registry is still built caller-side and only for the local path. load and ingest stay separate methods: same operation, but load surfaces the CLI LoadOutput (two distinct per-arm mappings preserved) while ingest surfaces the wire IngestOutput. The now-fully-dead execute_read/ execute_read_remote and execute_change/execute_change_remote pairs are retired (legacy_change_request_body stays — client.rs uses it); the export pair remains for 3c. The Phase-1 parity matrix is unchanged and green; full workspace tests pass. Co-authored-by: Claude Fable 5 --- crates/omnigraph-cli/src/client.rs | 465 ++++++++++++++++++++++++++-- crates/omnigraph-cli/src/helpers.rs | 95 ------ crates/omnigraph-cli/src/main.rs | 349 ++++++--------------- 3 files changed, 541 insertions(+), 368 deletions(-) diff --git a/crates/omnigraph-cli/src/client.rs b/crates/omnigraph-cli/src/client.rs index feeaf16..02daae4 100644 --- a/crates/omnigraph-cli/src/client.rs +++ b/crates/omnigraph-cli/src/client.rs @@ -4,38 +4,56 @@ //! 15 per-command `if graph.is_remote { … } else { … }` forks collapse //! into two arms here. //! -//! Phase 3a scope: the factory + the uniform read verbs (snapshot, -//! schema show, branch list, commit list/show — all of which open the -//! local engine WITHOUT policy today, preserved exactly). Write verbs -//! and the policy-bearing `query`/`mutate` arrive in 3b (the Embedded -//! variant will grow the policy context then); export + graphs-list in -//! 3c. Behavior is unchanged per verb — the Phase-1 parity matrix is the -//! referee and stays textually unchanged. +//! Phase 3a put the factory + the uniform read verbs in place. Phase 3b +//! adds the data-plane writes (`load`/`ingest`/`mutate`/`branch_*`/ +//! `apply_schema`) and `query`. The wrinkle 3a deferred: writes open the +//! local engine WITH policy (`open_local_db_with_policy`) and carry a +//! resolved actor, while reads/`query` open WITHOUT policy. So the +//! `Embedded` variant grows an optional policy context (`graph`/`actor`) +//! and a second factory (`resolve_with_policy`) fills it; `resolve()` +//! leaves it empty. The open path picks itself from whether `graph` is +//! set, preserving today's two behaviors exactly. Export + graphs-list +//! land in 3c. Behavior is unchanged per verb — the Phase-1 parity matrix +//! is the referee and stays textually unchanged. //! //! Enum, not a trait (RFC sketch said "trait"): only two variants ever, //! and inherent async methods sidestep `async_trait` boxing plus the //! `apply_schema` catalog-validator closure that is not object-safe. //! Same one-body-two-impls collapse, less ceremony. -use reqwest::Method; use color_eyre::Result; use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph_api_types::{ - BranchListOutput, CommitListOutput, CommitOutput, SchemaOutput, SnapshotOutput, commit_output, + BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput, + BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitListOutput, CommitOutput, + IngestOutput, IngestRequest, ReadOutput, ReadRequest, SchemaApplyOutput, SchemaApplyRequest, + SchemaOutput, SnapshotOutput, commit_output, ingest_output, read_output, schema_apply_output, snapshot_payload, }; +use omnigraph_compiler::catalog::Catalog; +use reqwest::Method; +use serde_json::Value; +use crate::cli::CliLoadMode; use crate::helpers::{ - apply_server_flag, build_http_client, is_remote_uri, remote_json, remote_url, - resolve_remote_bearer_token, + ResolvedCliGraph, apply_server_flag, build_http_client, is_remote_uri, + legacy_change_request_body, open_local_db_with_policy, query_params_from_json, remote_branch_url, + remote_json, remote_url, resolve_cli_actor, resolve_cli_graph, resolve_remote_bearer_token, + select_named_query, }; +use crate::output::{LoadOutput, load_output_from_result, load_output_from_tables}; use omnigraph_server::config::OmnigraphConfig; pub(crate) enum GraphClient { - /// Local engine at `uri`. Reads open the dataset per call (no policy - /// attached — matches today's read behavior; the write verbs in 3b - /// add a policy-bearing context). - Embedded { uri: String }, + /// Local engine at `uri`. Reads (`resolve()`) leave `graph`/`actor` + /// empty and open without policy; writes (`resolve_with_policy()`) + /// fill them, opening through `open_local_db_with_policy` and + /// attributing the resolved actor. + Embedded { + uri: String, + graph: Option, + actor: Option, + }, /// Remote HTTP server. The actor is resolved server-side from the /// token; the client never sets identity. Remote { @@ -50,7 +68,8 @@ impl GraphClient { /// and credential once, then pick the variant by URI scheme — the /// single branch point that replaces every per-command `is_remote` /// fork. Mirrors the read verbs' current preamble (`resolve_uri` - /// path, not the policy-bearing `resolve_cli_graph`). + /// path, not the policy-bearing `resolve_cli_graph`). Used by reads + /// and `query` (which opens without policy, like the reads). pub(crate) fn resolve( config: &OmnigraphConfig, server: Option<&str>, @@ -68,7 +87,76 @@ impl GraphClient { token, }) } else { - Ok(GraphClient::Embedded { uri }) + Ok(GraphClient::Embedded { + uri, + graph: None, + actor: None, + }) + } + } + + /// Write-path factory: the same addressing/credential resolution as + /// `resolve()`, but through the stricter `resolve_cli_graph` (which + /// carries `policy_file`/`graph_id`/`selected`), and with the actor + /// resolved up front. The embedded arm then opens WITH policy. The + /// resolution order matches the write arms exactly: server flag → + /// bearer token → graph. + pub(crate) fn resolve_with_policy( + config: &OmnigraphConfig, + server: Option<&str>, + graph: Option<&str>, + uri: Option, + target: Option<&str>, + cli_as: Option<&str>, + ) -> Result { + let uri = apply_server_flag(server, graph, uri, target)?; + let token = resolve_remote_bearer_token(config, uri.as_deref(), target)?; + let resolved = resolve_cli_graph(config, uri, target)?; + if resolved.is_remote { + Ok(GraphClient::Remote { + http: build_http_client()?, + base_url: resolved.uri, + token, + }) + } else { + let actor = resolve_cli_actor(cli_as, config)?; + Ok(GraphClient::Embedded { + uri: resolved.uri.clone(), + graph: Some(resolved), + actor, + }) + } + } + + /// The graph URI (local path / remote base URL) this client addresses. + pub(crate) fn uri(&self) -> &str { + match self { + GraphClient::Embedded { uri, .. } => uri, + GraphClient::Remote { base_url, .. } => base_url, + } + } + + /// The selected graph name, when a policy-bearing embedded client was + /// resolved against a named graph. `None` for remote and for reads. + pub(crate) fn selected(&self) -> Option<&str> { + match self { + GraphClient::Embedded { graph, .. } => graph.as_ref().and_then(ResolvedCliGraph::selected), + GraphClient::Remote { .. } => None, + } + } + + pub(crate) fn is_remote(&self) -> bool { + matches!(self, GraphClient::Remote { .. }) + } + + /// Open the local engine the way the resolved client demands: with + /// policy when a `graph` context is present (write path), bare + /// otherwise (read/`query` path). Captures today's two open paths in + /// one place so each verb stays a single match arm. + async fn open_embedded(uri: &str, graph: &Option) -> Result { + match graph { + Some(graph) => open_local_db_with_policy(graph).await, + None => Ok(Omnigraph::open(uri).await?), } } @@ -88,7 +176,7 @@ impl GraphClient { ) .await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; let mut branches = db.branch_list().await?; branches.sort(); @@ -113,7 +201,7 @@ impl GraphClient { ) .await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; let snapshot = db.snapshot_of(ReadTarget::branch(branch)).await?; Ok(snapshot_payload(branch, &snapshot)) @@ -137,7 +225,7 @@ impl GraphClient { ) .await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; Ok(SchemaOutput { schema_source: db.schema_source().to_string(), @@ -159,7 +247,7 @@ impl GraphClient { }; remote_json(http, Method::GET, url, None, token.as_deref()).await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; let commits = db .list_commits(branch) @@ -188,10 +276,343 @@ impl GraphClient { ) .await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; Ok(commit_output(&db.get_commit(commit_id).await?)) } } } + + /// `load` — bulk-load `data` (a file path) onto `branch`, forking from + /// `from` if missing. Returns the CLI `LoadOutput`; each arm keeps its + /// own mapping (remote sums the wire `IngestOutput.tables`, embedded + /// reads the richer `LoadResult` directly) — preserved exactly. + pub(crate) async fn load( + &self, + branch: &str, + from: Option<&str>, + data: &str, + mode: CliLoadMode, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + let data = std::fs::read_to_string(data)?; + let output = remote_json::( + http, + Method::POST, + remote_url(base_url, "/ingest"), + Some(serde_json::to_value(IngestRequest { + branch: Some(branch.to_string()), + from: from.map(ToOwned::to_owned), + mode: Some(mode.into()), + data, + })?), + token.as_deref(), + ) + .await?; + Ok(load_output_from_tables(base_url, branch, mode.as_str(), &output)) + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let result = db + .load_file_as(branch, from, data, mode.into(), actor.as_deref()) + .await?; + Ok(load_output_from_result(uri, branch, mode.as_str(), &result)) + } + } + } + + /// `ingest` — the deprecated alias of `load`. Same operation, but the + /// surfaced shape is the wire `IngestOutput` (printed by + /// `print_ingest_human`), so it is its own method. The embedded arm + /// echoes `actor_id: None` in the output exactly as the legacy arm did + /// (the actor is still attributed on the commit via `load_file_as`). + pub(crate) async fn ingest( + &self, + branch: &str, + from: &str, + data: &str, + mode: CliLoadMode, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + let data = std::fs::read_to_string(data)?; + remote_json( + http, + Method::POST, + remote_url(base_url, "/ingest"), + Some(serde_json::to_value(IngestRequest { + branch: Some(branch.to_string()), + from: Some(from.to_string()), + mode: Some(mode.into()), + data, + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let result = db + .load_file_as(branch, Some(from), data, mode.into(), actor.as_deref()) + .await?; + Ok(ingest_output(uri, &result, mode.into(), None)) + } + } + } + + /// `mutate` — run a change query against `branch`. Folds + /// `execute_change` / `execute_change_remote` + the legacy request body. + pub(crate) async fn mutate( + &self, + branch: &str, + query_source: &str, + query_name: Option<&str>, + params_json: Option<&Value>, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::POST, + remote_url(base_url, "/change"), + Some(legacy_change_request_body( + query_source, + query_name, + branch, + params_json, + )), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let (selected_name, query_params) = select_named_query(query_source, query_name)?; + let params = query_params_from_json(&query_params, params_json)?; + let db = Self::open_embedded(uri, graph).await?; + let actor = actor.as_deref(); + let result = db + .mutate_as(branch, query_source, &selected_name, ¶ms, actor) + .await?; + Ok(ChangeOutput { + branch: branch.to_string(), + query_name: selected_name, + affected_nodes: result.affected_nodes, + affected_edges: result.affected_edges, + actor_id: actor.map(String::from), + }) + } + } + } + + /// `query` — run a read query against `target`. Folds `execute_read` / + /// `execute_read_remote`; the embedded arm opens WITHOUT policy (reads + /// never attach one), so this verb resolves via `resolve()`. + pub(crate) async fn query( + &self, + target: ReadTarget, + query_source: &str, + query_name: Option<&str>, + params_json: Option<&Value>, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + let (branch, snapshot) = match &target { + ReadTarget::Branch(branch) => (Some(branch.clone()), None), + ReadTarget::Snapshot(snapshot) => (None, Some(snapshot.as_str().to_string())), + }; + remote_json( + http, + Method::POST, + remote_url(base_url, "/read"), + Some(serde_json::to_value(ReadRequest { + query_source: query_source.to_string(), + query_name: query_name.map(ToOwned::to_owned), + params: params_json.cloned(), + branch, + snapshot, + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, .. } => { + let (selected_name, query_params) = select_named_query(query_source, query_name)?; + let params = query_params_from_json(&query_params, params_json)?; + let db = Self::open_embedded(uri, graph).await?; + let result = db + .query(target.clone(), query_source, &selected_name, ¶ms) + .await?; + Ok(read_output(selected_name, &target, result)) + } + } + } + + pub(crate) async fn branch_create_from( + &self, + from: &str, + name: &str, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::POST, + remote_url(base_url, "/branches"), + Some(serde_json::to_value(BranchCreateRequest { + from: Some(from.to_string()), + name: name.to_string(), + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let actor = actor.as_deref(); + db.branch_create_from_as(ReadTarget::branch(from), name, actor) + .await?; + Ok(BranchCreateOutput { + uri: uri.clone(), + from: from.to_string(), + name: name.to_string(), + actor_id: actor.map(String::from), + }) + } + } + } + + pub(crate) async fn branch_delete(&self, name: &str) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::DELETE, + remote_branch_url(base_url, name)?, + None, + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let actor = actor.as_deref(); + db.branch_delete_as(name, actor).await?; + Ok(BranchDeleteOutput { + uri: uri.clone(), + name: name.to_string(), + actor_id: actor.map(String::from), + }) + } + } + } + + pub(crate) async fn branch_merge(&self, source: &str, into: &str) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::POST, + remote_url(base_url, "/branches/merge"), + Some(serde_json::to_value(BranchMergeRequest { + source: source.to_string(), + target: Some(into.to_string()), + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let actor = actor.as_deref(); + let outcome = db.branch_merge_as(source, into, actor).await?; + Ok(BranchMergeOutput { + source: source.to_string(), + target: into.to_string(), + outcome: outcome.into(), + actor_id: actor.map(String::from), + }) + } + } + } + + /// `apply_schema` — apply `schema_source`. The embedded arm runs the + /// caller's catalog validator (stored-query registry check) inside the + /// engine's `apply_schema_as_with_catalog_check`; the remote arm runs + /// the server's own check and IGNORES `validate`. The `impl FnOnce` + /// validator is exactly why this is an enum, not a trait (non-object- + /// safe). + pub(crate) async fn apply_schema( + &self, + schema_source: &str, + allow_data_loss: bool, + validate: F, + ) -> Result + where + F: FnOnce(&Catalog) -> omnigraph::error::Result<()>, + { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + // MR-694 PR B: SchemaApplyRequest carries allow_data_loss so + // Hard-mode drops are no longer CLI-only; the server's + // `server_schema_apply` honors it (and runs its own catalog + // check, so `validate` does not apply here). + remote_json::( + http, + Method::POST, + remote_url(base_url, "/schema/apply"), + Some(serde_json::to_value(SchemaApplyRequest { + schema_source: schema_source.to_string(), + allow_data_loss, + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let result = db + .apply_schema_as_with_catalog_check( + schema_source, + omnigraph::db::SchemaApplyOptions { allow_data_loss }, + actor.as_deref(), + validate, + ) + .await?; + Ok(schema_apply_output(uri, result)) + } + } + } } diff --git a/crates/omnigraph-cli/src/helpers.rs b/crates/omnigraph-cli/src/helpers.rs index 67fb6ea..e9dfcc1 100644 --- a/crates/omnigraph-cli/src/helpers.rs +++ b/crates/omnigraph-cli/src/helpers.rs @@ -979,77 +979,6 @@ pub(crate) fn execute_queries_list( Ok(()) } -pub(crate) async fn execute_read( - uri: &str, - query_source: &str, - query_name: Option<&str>, - target: ReadTarget, - params_json: Option<&Value>, -) -> Result { - let (selected_name, query_params) = select_named_query(query_source, query_name)?; - let params = query_params_from_json(&query_params, params_json)?; - let db = Omnigraph::open(uri).await?; - let result = db - .query(target.clone(), query_source, &selected_name, ¶ms) - .await?; - Ok(read_output(selected_name, &target, result)) -} - -pub(crate) async fn execute_read_remote( - client: &reqwest::Client, - uri: &str, - query_source: &str, - query_name: Option<&str>, - target: ReadTarget, - params_json: Option<&Value>, - bearer_token: Option<&str>, -) -> Result { - let (branch, snapshot) = match &target { - ReadTarget::Branch(branch) => (Some(branch.clone()), None), - ReadTarget::Snapshot(snapshot) => (None, Some(snapshot.as_str().to_string())), - }; - remote_json( - client, - Method::POST, - remote_url(uri, "/read"), - Some(serde_json::to_value(ReadRequest { - query_source: query_source.to_string(), - query_name: query_name.map(ToOwned::to_owned), - params: params_json.cloned(), - branch, - snapshot, - })?), - bearer_token, - ) - .await -} - -pub(crate) async fn execute_change( - graph: &ResolvedCliGraph, - query_source: &str, - query_name: Option<&str>, - branch: &str, - params_json: Option<&Value>, - config: &OmnigraphConfig, - cli_as_actor: Option<&str>, -) -> Result { - let (selected_name, query_params) = select_named_query(query_source, query_name)?; - let params = query_params_from_json(&query_params, params_json)?; - let db = open_local_db_with_policy(graph).await?; - let actor = resolve_cli_actor(cli_as_actor, config)?; - let actor = actor.as_deref(); - let result = db - .mutate_as(branch, query_source, &selected_name, ¶ms, actor) - .await?; - Ok(ChangeOutput { - branch: branch.to_string(), - query_name: selected_name, - affected_nodes: result.affected_nodes, - affected_edges: result.affected_edges, - actor_id: actor.map(String::from), - }) -} - pub(crate) fn legacy_change_request_body( query_source: &str, query_name: Option<&str>, @@ -1069,30 +998,6 @@ pub(crate) fn legacy_change_request_body( body } -pub(crate) async fn execute_change_remote( - client: &reqwest::Client, - uri: &str, - query_source: &str, - query_name: Option<&str>, - branch: &str, - params_json: Option<&Value>, - bearer_token: Option<&str>, -) -> Result { - remote_json( - client, - Method::POST, - remote_url(uri, "/change"), - Some(legacy_change_request_body( - query_source, - query_name, - branch, - params_json, - )), - bearer_token, - ) - .await -} - pub(crate) async fn execute_export_to_writer( uri: &str, branch: &str, diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index e979622..53eb4c7 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -23,11 +23,8 @@ use omnigraph_compiler::{ json_params_to_param_map, lint_query_file, }; use omnigraph_api_types::{ - BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, - BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitOutput, - ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, IngestRequest, ReadOutput, - ReadRequest, SchemaApplyOutput, SchemaApplyRequest, - SnapshotTableOutput, ingest_output, read_output, schema_apply_output, + ChangeOutput, CommitOutput, ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, + ReadOutput, SchemaApplyOutput, SnapshotTableOutput, }; use omnigraph_server::queries::{QueryRegistry, check, format_check_breakages}; use omnigraph_server::{ @@ -166,44 +163,18 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let branch = resolve_branch(&config, branch, None, "main"); - let payload = if graph.is_remote { - let data = fs::read_to_string(&data)?; - let output = remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/ingest"), - Some(serde_json::to_value(IngestRequest { - branch: Some(branch.clone()), - from: from.clone(), - mode: Some(mode.into()), - data, - })?), - bearer_token.as_deref(), - ) + let payload = client + .load(&branch, from.as_deref(), &data.to_string_lossy(), mode) .await?; - load_output_from_tables(&uri, &branch, mode.as_str(), &output) - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - let result = db - .load_file_as( - &branch, - from.as_deref(), - &data.to_string_lossy(), - mode.into(), - actor, - ) - .await?; - load_output_from_result(&uri, &branch, mode.as_str(), &result) - }; if json { print_json(&payload)?; } else { @@ -226,44 +197,19 @@ async fn main() -> Result<()> { use `omnigraph load --from --mode ` (ingest defaults: --from main --mode merge)" ); let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let branch = resolve_branch(&config, branch, None, "main"); let from = resolve_branch(&config, from, None, "main"); - let payload = if graph.is_remote { - let data = fs::read_to_string(&data)?; - remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/ingest"), - Some(serde_json::to_value(IngestRequest { - branch: Some(branch.clone()), - from: Some(from.clone()), - mode: Some(mode.into()), - data, - })?), - bearer_token.as_deref(), - ) - .await? - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - let result = db - .load_file_as( - &branch, - Some(&from), - &data.to_string_lossy(), - mode.into(), - actor, - ) - .await?; - ingest_output(&uri, &result, mode.into(), None) - }; + let payload = client + .ingest(&branch, &from, &data.to_string_lossy(), mode) + .await?; if json { print_json(&payload)?; } else { @@ -280,38 +226,16 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let from = resolve_branch(&config, from, None, "main"); - let payload = if graph.is_remote { - remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/branches"), - Some(serde_json::to_value(BranchCreateRequest { - from: Some(from.clone()), - name: name.clone(), - })?), - bearer_token.as_deref(), - ) - .await? - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - db.branch_create_from_as(ReadTarget::branch(&from), &name, actor) - .await?; - BranchCreateOutput { - uri: uri.clone(), - from: from.clone(), - name: name.clone(), - actor_id: actor.map(String::from), - } - }; + let payload = client.branch_create_from(&from, &name).await?; if json { print_json(&payload)?; } else { @@ -349,32 +273,15 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); - let payload = if graph.is_remote { - remote_json::( - &http_client, - Method::DELETE, - remote_branch_url(&uri, &name)?, - None, - bearer_token.as_deref(), - ) - .await? - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - db.branch_delete_as(&name, actor).await?; - BranchDeleteOutput { - uri: uri.clone(), - name: name.clone(), - actor_id: actor.map(String::from), - } - }; + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; + let payload = client.branch_delete(&name).await?; if json { print_json(&payload)?; } else { @@ -390,37 +297,16 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let into = resolve_branch(&config, into, None, "main"); - let payload = if graph.is_remote { - remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/branches/merge"), - Some(serde_json::to_value(BranchMergeRequest { - source: source.clone(), - target: Some(into.clone()), - })?), - bearer_token.as_deref(), - ) - .await? - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - let outcome = db.branch_merge_as(&source, &into, actor).await?; - BranchMergeOutput { - source: source.clone(), - target: into.clone(), - outcome: outcome.into(), - actor_id: actor.map(String::from), - } - }; + let payload = client.branch_merge(&source, &into).await?; if json { print_json(&payload)?; } else { @@ -519,52 +405,34 @@ async fn main() -> Result<()> { allow_data_loss, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let schema_source = fs::read_to_string(&schema)?; - let output = if graph.is_remote { - // MR-694 PR B: SchemaApplyRequest gained an - // allow_data_loss field so Hard-mode drops are no - // longer CLI-only. The previous bail is gone; the - // field is forwarded into the JSON payload, and - // the server's `server_schema_apply` honors it. - remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/schema/apply"), - Some(serde_json::to_value(SchemaApplyRequest { - schema_source: schema_source.clone(), - allow_data_loss, - })?), - bearer_token.as_deref(), - ) - .await? + // The stored-query registry check is an embedded-only concern + // (the remote arm ignores the validator — the server runs its + // own check); build it only for the local path so the remote + // path keeps its no-registry-load behavior. + let registry = if client.is_remote() { + None } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - let registry = load_registry_or_report(&config, graph.selected())?; - let registry = (!registry.is_empty()).then_some(registry); - let label = graph.selected().unwrap_or(&uri).to_string(); - let result = db - .apply_schema_as_with_catalog_check( - &schema_source, - omnigraph::db::SchemaApplyOptions { allow_data_loss }, - actor, - |catalog| { - if let Some(registry) = registry.as_ref() { - validate_registry_for_catalog(registry, catalog, &label)?; - } - Ok(()) - }, - ) - .await?; - schema_apply_output(&uri, result) + let registry = load_registry_or_report(&config, client.selected())?; + (!registry.is_empty()).then_some(registry) }; + let label = client.selected().unwrap_or(client.uri()).to_string(); + let output = client + .apply_schema(&schema_source, allow_data_loss, |catalog| { + if let Some(registry) = registry.as_ref() { + validate_registry_for_catalog(registry, catalog, &label)?; + } + Ok(()) + }) + .await?; if json { print_json(&output)?; } else { @@ -757,10 +625,13 @@ async fn main() -> Result<()> { let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.graph.as_deref())); - let uri = apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target_name)?; - let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?; - let graph = resolve_cli_graph(&config, uri, target_name)?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target_name, + )?; let query_source = resolve_query_source( &config, query.as_ref(), @@ -782,27 +653,14 @@ async fn main() -> Result<()> { alias_config.and_then(|alias| alias.branch.clone()), )?; let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone())); - let output = if graph.is_remote { - execute_read_remote( - &http_client, - &uri, + let output = client + .query( + target, &query_source, query_name.as_deref(), - target, - params_json.as_ref(), - bearer_token.as_deref(), - ) - .await? - } else { - execute_read( - &uri, - &query_source, - query_name.as_deref(), - target, params_json.as_ref(), ) - .await? - }; + .await?; let format = resolve_read_format( &config, format, @@ -844,10 +702,14 @@ async fn main() -> Result<()> { let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.graph.as_deref())); - let uri = apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target_name)?; - let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?; - let graph = resolve_cli_graph(&config, uri, target_name)?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target_name, + cli.as_actor.as_deref(), + )?; let query_source = resolve_query_source( &config, query.as_ref(), @@ -869,29 +731,14 @@ async fn main() -> Result<()> { "main", ); let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone())); - let output = if graph.is_remote { - execute_change_remote( - &http_client, - &uri, + let output = client + .mutate( + &branch, &query_source, query_name.as_deref(), - &branch, params_json.as_ref(), - bearer_token.as_deref(), ) - .await? - } else { - execute_change( - &graph, - &query_source, - query_name.as_deref(), - &branch, - params_json.as_ref(), - &config, - cli.as_actor.as_deref(), - ) - .await? - }; + .await?; if json { print_json(&output)?; } else {