From 25d74d689d0801962f8897d9d05d4303974fc932 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Sat, 13 Jun 2026 17:44:23 +0300 Subject: [PATCH 1/3] refactor(cli): GraphClient enum + read verbs (RFC-009 Phase 3a) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The embedded-vs-remote split gets one home: a GraphClient enum (Embedded { uri } | Remote { http, base_url, token }) with a resolve() factory that absorbs the shared preamble (apply_server_flag -> token -> URI/remoteness) and a verb method per command. The five uniform read forks — branch list, commit list, commit show, schema show, snapshot — collapse from per-command if-graph-is-remote else to one line each (main.rs: -113/+47). Behavior identical per verb (local reads still open WITHOUT policy, as today); the Phase-1 parity matrix is the referee and passes textually unchanged. Enum, not the RFC trait: only two variants ever, and inherent async methods avoid async_trait boxing and the apply_schema closure that is not object-safe (3b) — same one-body-two-impls collapse, less ceremony. Scope: the uniform reads only. The query verb (policy-open + operator- alias early-return + param merge) joins the write verbs in 3b; export/streaming and graphs-list in 3c, where the now-shared execute_*_remote/execute_* pairs get retired. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/client.rs | 197 +++++++++++++++++++++++++++++ crates/omnigraph-cli/src/main.rs | 160 +++++++---------------- 2 files changed, 244 insertions(+), 113 deletions(-) create mode 100644 crates/omnigraph-cli/src/client.rs diff --git a/crates/omnigraph-cli/src/client.rs b/crates/omnigraph-cli/src/client.rs new file mode 100644 index 0000000..feeaf16 --- /dev/null +++ b/crates/omnigraph-cli/src/client.rs @@ -0,0 +1,197 @@ +//! `GraphClient` — the one place the embedded-vs-remote split lives +//! (RFC-009 Phase 3). A CLI command body calls a verb method; the +//! enum routes to the engine (local URI) or HTTP (remote URI). The +//! 15 per-command `if graph.is_remote { … } else { … }` forks collapse +//! into two arms here. +//! +//! Phase 3a scope: the factory + the uniform read verbs (snapshot, +//! schema show, branch list, commit list/show — all of which open the +//! local engine WITHOUT policy today, preserved exactly). Write verbs +//! and the policy-bearing `query`/`mutate` arrive in 3b (the Embedded +//! variant will grow the policy context then); export + graphs-list in +//! 3c. Behavior is unchanged per verb — the Phase-1 parity matrix is the +//! referee and stays textually unchanged. +//! +//! Enum, not a trait (RFC sketch said "trait"): only two variants ever, +//! and inherent async methods sidestep `async_trait` boxing plus the +//! `apply_schema` catalog-validator closure that is not object-safe. +//! Same one-body-two-impls collapse, less ceremony. + +use reqwest::Method; +use color_eyre::Result; +use omnigraph::db::{Omnigraph, ReadTarget}; +use omnigraph_api_types::{ + BranchListOutput, CommitListOutput, CommitOutput, SchemaOutput, SnapshotOutput, commit_output, + snapshot_payload, +}; + +use crate::helpers::{ + apply_server_flag, build_http_client, is_remote_uri, remote_json, remote_url, + resolve_remote_bearer_token, +}; +use omnigraph_server::config::OmnigraphConfig; + +pub(crate) enum GraphClient { + /// Local engine at `uri`. Reads open the dataset per call (no policy + /// attached — matches today's read behavior; the write verbs in 3b + /// add a policy-bearing context). + Embedded { uri: String }, + /// Remote HTTP server. The actor is resolved server-side from the + /// token; the client never sets identity. + Remote { + http: reqwest::Client, + base_url: String, + token: Option, + }, +} + +impl GraphClient { + /// Resolve the addressing (positional URI / `--target` / `--server`) + /// and credential once, then pick the variant by URI scheme — the + /// single branch point that replaces every per-command `is_remote` + /// fork. Mirrors the read verbs' current preamble (`resolve_uri` + /// path, not the policy-bearing `resolve_cli_graph`). + pub(crate) fn resolve( + config: &OmnigraphConfig, + server: Option<&str>, + graph: Option<&str>, + uri: Option, + target: Option<&str>, + ) -> Result { + let uri = apply_server_flag(server, graph, uri, target)?; + let token = resolve_remote_bearer_token(config, uri.as_deref(), target)?; + let uri = crate::helpers::resolve_uri(config, uri, target)?; + if is_remote_uri(&uri) { + Ok(GraphClient::Remote { + http: build_http_client()?, + base_url: uri, + token, + }) + } else { + Ok(GraphClient::Embedded { uri }) + } + } + + pub(crate) async fn branch_list(&self) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::GET, + remote_url(base_url, "/branches"), + None, + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri } => { + let db = Omnigraph::open(uri).await?; + let mut branches = db.branch_list().await?; + branches.sort(); + Ok(BranchListOutput { branches }) + } + } + } + + pub(crate) async fn snapshot(&self, branch: &str) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::GET, + format!("{}?branch={}", remote_url(base_url, "/snapshot"), branch), + None, + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri } => { + let db = Omnigraph::open(uri).await?; + let snapshot = db.snapshot_of(ReadTarget::branch(branch)).await?; + Ok(snapshot_payload(branch, &snapshot)) + } + } + } + + pub(crate) async fn schema_source(&self) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::GET, + remote_url(base_url, "/schema"), + None, + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri } => { + let db = Omnigraph::open(uri).await?; + Ok(SchemaOutput { + schema_source: db.schema_source().to_string(), + }) + } + } + } + + pub(crate) async fn list_commits(&self, branch: Option<&str>) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + let url = match branch { + Some(branch) => format!("{}?branch={}", remote_url(base_url, "/commits"), branch), + None => remote_url(base_url, "/commits"), + }; + remote_json(http, Method::GET, url, None, token.as_deref()).await + } + GraphClient::Embedded { uri } => { + let db = Omnigraph::open(uri).await?; + let commits = db + .list_commits(branch) + .await? + .iter() + .map(commit_output) + .collect::>(); + Ok(CommitListOutput { commits }) + } + } + } + + pub(crate) async fn get_commit(&self, commit_id: &str) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::GET, + remote_url(base_url, &format!("/commits/{commit_id}")), + None, + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri } => { + let db = Omnigraph::open(uri).await?; + Ok(commit_output(&db.get_commit(commit_id).await?)) + } + } + } +} diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 0b518eb..e979622 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -23,12 +23,11 @@ use omnigraph_compiler::{ json_params_to_param_map, lint_query_file, }; use omnigraph_api_types::{ - BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput, - BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitListOutput, CommitOutput, + BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, + BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitOutput, ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, IngestRequest, ReadOutput, - ReadRequest, SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotOutput, - SnapshotTableOutput, commit_output, ingest_output, read_output, schema_apply_output, - snapshot_payload, + ReadRequest, SchemaApplyOutput, SchemaApplyRequest, + SnapshotTableOutput, ingest_output, read_output, schema_apply_output, }; use omnigraph_server::queries::{QueryRegistry, check, format_check_breakages}; use omnigraph_server::{ @@ -50,6 +49,7 @@ use embed::{EmbedArgs, EmbedOutput, execute_embed}; use read_format::{ReadRenderOptions, render_read}; mod cli; +mod client; mod helpers; mod output; use cli::*; @@ -325,27 +325,14 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); - let payload = if graph.is_remote { - remote_json::( - &http_client, - Method::GET, - remote_url(&uri, "/branches"), - None, - bearer_token.as_deref(), - ) - .await? - } else { - let db = Omnigraph::open(&uri).await?; - let mut branches = db.branch_list().await?; - branches.sort(); - BranchListOutput { branches } - }; + let client = client::GraphClient::resolve( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + )?; + let payload = client.branch_list().await?; if json { print_json(&payload)?; } else { @@ -455,37 +442,18 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let uri = resolve_uri(&config, uri, target.as_deref())?; - let commits = if is_remote_uri(&uri) { - remote_json::( - &http_client, - Method::GET, - if let Some(branch) = branch.as_deref() { - format!("{}?branch={}", remote_url(&uri, "/commits"), branch) - } else { - remote_url(&uri, "/commits") - }, - None, - bearer_token.as_deref(), - ) - .await? - .commits - } else { - let db = Omnigraph::open(&uri).await?; - db.list_commits(branch.as_deref()) - .await? - .iter() - .map(commit_output) - .collect::>() - }; + let client = client::GraphClient::resolve( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + )?; + let payload = client.list_commits(branch.as_deref()).await?; if json { - print_json(&CommitListOutput { commits })?; + print_json(&payload)?; } else { - print_commit_list_human(&commits); + print_commit_list_human(&payload.commits); } } CommitCommand::Show { @@ -496,24 +464,14 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let uri = resolve_uri(&config, uri, target.as_deref())?; - let commit = if is_remote_uri(&uri) { - remote_json::( - &http_client, - Method::GET, - remote_url(&uri, &format!("/commits/{}", commit_id)), - None, - bearer_token.as_deref(), - ) - .await? - } else { - let db = Omnigraph::open(&uri).await?; - commit_output(&db.get_commit(&commit_id).await?) - }; + let client = client::GraphClient::resolve( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + )?; + let commit = client.get_commit(&commit_id).await?; if json { print_json(&commit)?; } else { @@ -620,26 +578,14 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let uri = resolve_uri(&config, uri, target.as_deref())?; - let output = if is_remote_uri(&uri) { - remote_json::( - &http_client, - Method::GET, - remote_url(&uri, "/schema"), - None, - bearer_token.as_deref(), - ) - .await? - } else { - let db = Omnigraph::open(&uri).await?; - SchemaOutput { - schema_source: db.schema_source().to_string(), - } - }; + let client = client::GraphClient::resolve( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + )?; + let output = client.schema_source().await?; if json { print_json(&output)?; } else { @@ -686,27 +632,15 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let uri = resolve_uri(&config, uri, target.as_deref())?; + let client = client::GraphClient::resolve( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + )?; let branch = resolve_branch(&config, branch, None, "main"); - let payload = if is_remote_uri(&uri) { - remote_json::( - &http_client, - Method::GET, - format!("{}?branch={}", remote_url(&uri, "/snapshot"), branch), - None, - bearer_token.as_deref(), - ) - .await? - } else { - let db = Omnigraph::open(&uri).await?; - let snapshot = db.snapshot_of(ReadTarget::branch(branch.as_str())).await?; - snapshot_payload(&branch, &snapshot) - }; - + let payload = client.snapshot(&branch).await?; if json { print_json(&payload)?; } else { From 81b66f9427953f4fc736cead9d4986421aad91c5 Mon Sep 17 00:00:00 2001 From: Andrew Altshuler Date: Sat, 13 Jun 2026 19:23:41 +0300 Subject: [PATCH 2/3] ci: run Test Workspace only on main, not on pull requests (#212) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The full workspace + failpoints suite was the slowest PR gate (~15min warm, up to the 75min cold ceiling) and dominated PR turnaround. Gate the `test` job with `if: github.event_name != 'pull_request'` so it runs only on push to `main` (post-merge), on `v*` tags, and on manual `workflow_dispatch`. `RustFS S3 Integration` needs `test`, so it becomes push-/dispatch-only by the same cascade. Drop `Test Workspace` from the required-check list in branch-protection.json: a required context that never reports on PRs (the job no longer runs there) would leave every PR permanently pending — the job-never-reports trap the policy already documents. Trade-off accepted deliberately (chosen by the maintainer): a regression the suite would catch now lands on `main` and reddens the post-merge run instead of being blocked pre-merge, so `main` can briefly break. Mitigations documented in ci.md: run `cargo test --workspace --locked` locally before merging non-trivial changes (or trigger the workflow on your branch via workflow_dispatch), and regenerate openapi.json locally for server/API changes (the auto-regen step lived in the now-PR-skipped test job). The fast PR gates remain: Classify Changes, Check AGENTS.md Links, the AWS-feature build/test, and the two CODEOWNERS checks. NOTE: an admin must run ./scripts/apply-branch-protection.sh after this merges, or GitHub keeps requiring the now-unreported Test Workspace context. Co-authored-by: Claude Opus 4.8 --- .github/branch-protection.json | 1 - .github/workflows/ci.yml | 20 ++++++++++++++++++++ docs/dev/branch-protection.md | 2 +- docs/dev/ci.md | 3 +++ 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/.github/branch-protection.json b/.github/branch-protection.json index c039e32..aa1ab19 100644 --- a/.github/branch-protection.json +++ b/.github/branch-protection.json @@ -5,7 +5,6 @@ "contexts": [ "Classify Changes", "Check AGENTS.md Links", - "Test Workspace", "Test omnigraph-server --features aws", "CODEOWNERS matches source", "CODEOWNERS not hand-edited" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 56ef3e3..fca08da 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -128,6 +128,23 @@ jobs: test: name: Test Workspace needs: classify_changes + # PR latency: the full workspace + failpoints build/test is the slowest + # gate (~15min warm, up to the 75min ceiling cold) and dominated PR + # turnaround. It now runs only on push to `main` (post-merge), on tags, + # and on manual `workflow_dispatch` — NOT on pull_request. Trade-off + # accepted deliberately: a regression is caught on the `main` run after + # merge rather than before it, so `main` can briefly go red. Mitigations: + # (1) `Test Workspace` is removed from required PR checks in + # `.github/branch-protection.json` (a required check that never + # reports would leave every PR permanently pending); + # (2) run the full suite locally before merging risky changes + # (`cargo test --workspace --locked`), or trigger this workflow via + # the Actions "Run workflow" button (workflow_dispatch) on your branch; + # (3) openapi.json is no longer auto-regenerated on PRs (that step lived + # here) — regenerate it locally for server/API changes + # (`OMNIGRAPH_UPDATE_OPENAPI=1 cargo test -p omnigraph-server --test openapi`) + # or the strict drift check fails the post-merge `main` run. + if: github.event_name != 'pull_request' runs-on: ubuntu-latest # 75, not 45: a cold rust-cache (every Cargo.lock change) costs a full # workspace + failpoints-feature build on a 2-core runner, which now @@ -274,6 +291,9 @@ jobs: rustfs_integration: name: RustFS S3 Integration + # `needs: test` means this is push-/dispatch-only too: on pull_request the + # `test` job is skipped, so this dependent is skipped with it. S3 + # integration runs post-merge on `main`, alongside the workspace suite. needs: - classify_changes - test diff --git a/docs/dev/branch-protection.md b/docs/dev/branch-protection.md index 2b6cc37..1d1c094 100644 --- a/docs/dev/branch-protection.md +++ b/docs/dev/branch-protection.md @@ -8,7 +8,7 @@ This page explains what the policy says and how to change it. | Setting | Value | Why | |---|---|---| -| **Required status checks (strict)** | `Classify Changes`, `Check AGENTS.md Links`, `Test Workspace`, `Test omnigraph-server --features aws`, `CODEOWNERS matches source`, `CODEOWNERS not hand-edited` | Every PR must pass workspace tests, AGENTS.md link integrity, and the CODEOWNERS hygiene checks. The two CODEOWNERS contexts must equal the job `name:` values in `.github/workflows/codeowners.yml` **verbatim** — a context naming a job that never reports (the old `CODEOWNERS / drift` used the job *id*, and the job was path-filtered) leaves every PR permanently pending and forces admin overrides. `strict: true` requires the branch to be up-to-date with `main` before merge. | +| **Required status checks (strict)** | `Classify Changes`, `Check AGENTS.md Links`, `Test omnigraph-server --features aws`, `CODEOWNERS matches source`, `CODEOWNERS not hand-edited` | Every PR must pass the AWS-feature build/test, AGENTS.md link integrity, and the CODEOWNERS hygiene checks. **`Test Workspace` is deliberately NOT required** — it runs only on push to `main` (post-merge), tags, and manual `workflow_dispatch`, to keep PR turnaround fast (it was the ~15min+ slow gate). It is therefore *not* listed here: a required check that never reports on PRs (the `test` job is `if: github.event_name != 'pull_request'`) would leave every PR permanently pending — the same job-never-reports trap the CODEOWNERS contexts call out below. The trade-off (a regression lands on `main` and is caught by the post-merge run, so `main` can briefly go red) and its mitigations are documented in [ci.md](ci.md). The two CODEOWNERS contexts must equal the job `name:` values in `.github/workflows/codeowners.yml` **verbatim** — a context naming a job that never reports (the old `CODEOWNERS / drift` used the job *id*, and the job was path-filtered) leaves every PR permanently pending and forces admin overrides. `strict: true` requires the branch to be up-to-date with `main` before merge. | | **Required approving reviews** | `1` | At least one reviewer. With a 2-person team, going higher would block all merges when one person is unavailable. | | **Require code-owner reviews** | `true` | The reviewer must be a code owner per `.github/CODEOWNERS`. This is what makes the codeowners chassis enforced. | | **Dismiss stale reviews on new commits** | `true` | A push after approval invalidates the prior review. Prevents the "approve, then sneak in unreviewed changes" pattern. | diff --git a/docs/dev/ci.md b/docs/dev/ci.md index 1124cb4..2e80f40 100644 --- a/docs/dev/ci.md +++ b/docs/dev/ci.md @@ -3,6 +3,9 @@ `.github/workflows/`: - **ci.yml**: text-only changes skip; otherwise `cargo test --workspace --locked` on ubuntu-latest with protobuf compiler. OpenAPI-drift check that auto-commits the regenerated `openapi.json` for same-repository PRs. Also runs the AGENTS.md cross-link integrity check (`scripts/check-agents-md.sh`). + - **`Test Workspace` does not run on pull requests.** The job is gated `if: github.event_name != 'pull_request'`, so the full workspace + failpoints suite runs only on push to `main` (post-merge), on `v*` tags, and on manual `workflow_dispatch`. This was a deliberate PR-latency trade-off — it was the slowest gate (~15min warm, up to the 75min cold ceiling). `RustFS S3 Integration` `needs: test`, so it is push-/dispatch-only for the same reason. The fast PR gates remain: `Classify Changes`, `Check AGENTS.md Links`, `Test omnigraph-server --features aws`, and the two CODEOWNERS checks. `Test Workspace` is correspondingly **not** in the required-check list (`.github/branch-protection.json`); see [branch-protection.md](branch-protection.md). + - **Consequences to internalize:** (1) a regression that the suite would catch now lands on `main` and turns the post-merge run red, rather than being blocked pre-merge — `main` can briefly break, so run `cargo test --workspace --locked` locally before merging anything non-trivial, or trigger this workflow on your branch via the Actions "Run workflow" button. (2) `openapi.json` is no longer auto-regenerated on PRs (that step is inside the `test` job); for server/API changes, regenerate it locally with `OMNIGRAPH_UPDATE_OPENAPI=1 cargo test -p omnigraph-server --test openapi` and commit it, or the strict drift check fails the post-merge `main` run. + - **Applying this policy:** removing `Test Workspace` from the JSON is inert until an admin runs `./scripts/apply-branch-protection.sh`. **Run it immediately after this change merges** — until then GitHub still requires a `Test Workspace` context that no longer reports on PRs, which leaves every open PR permanently pending (the job-never-reports trap). - **AWS feature build job**: `cargo build/test -p omnigraph-server --features aws` on ubuntu-latest. - **Windows binary build job**: `cargo build --release --locked -p omnigraph-cli -p omnigraph-server` on windows-latest with smoke checks for `omnigraph.exe version`, `omnigraph-server.exe --help`, and PowerShell installer syntax. - **RustFS S3 integration**: spins up RustFS in Docker, runs `s3_storage`, `server_opens_s3_graph_directly_and_serves_snapshot_and_read`, and `local_cli_s3_end_to_end_init_load_read_flow`. From d32c1ac191fbf88ef0fc249b1c6aa393cbd8cef1 Mon Sep 17 00:00:00 2001 From: Andrew Altshuler Date: Sat, 13 Jun 2026 19:25:57 +0300 Subject: [PATCH 3/3] refactor(cli): collapse write/query forks onto GraphClient (RFC-009 Phase 3b) (#211) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3a put the GraphClient enum in place and collapsed the five uniform read forks. 3b folds the remaining data-plane forks onto the same enum: load, ingest, mutate, query, branch create/delete/merge, and schema apply. The wrinkle 3a deferred was the local policy attachment. Reads and query open the local engine without a policy; writes open through open_local_db_with_policy and attribute a resolved actor. So the Embedded variant grows an optional policy context (graph/actor) filled by a second factory, resolve_with_policy; resolve() leaves it empty. open_embedded picks the open path from whether the context is present, preserving both of today's behaviors exactly. query still uses resolve() (no policy), as the read path did. apply_schema takes the catalog-validator closure as impl FnOnce(&Catalog) — the embedded arm runs it inside apply_schema_as_with_catalog_check, the remote arm ignores it (the server runs its own check). That non-object-safe closure is why GraphClient is an enum, not a trait. The stored-query registry is still built caller-side and only for the local path. load and ingest stay separate methods: same operation, but load surfaces the CLI LoadOutput (two distinct per-arm mappings preserved) while ingest surfaces the wire IngestOutput. The now-fully-dead execute_read/ execute_read_remote and execute_change/execute_change_remote pairs are retired (legacy_change_request_body stays — client.rs uses it); the export pair remains for 3c. The Phase-1 parity matrix is unchanged and green; full workspace tests pass. Co-authored-by: Claude Fable 5 --- crates/omnigraph-cli/src/client.rs | 465 ++++++++++++++++++++++++++-- crates/omnigraph-cli/src/helpers.rs | 95 ------ crates/omnigraph-cli/src/main.rs | 349 ++++++--------------- 3 files changed, 541 insertions(+), 368 deletions(-) diff --git a/crates/omnigraph-cli/src/client.rs b/crates/omnigraph-cli/src/client.rs index feeaf16..02daae4 100644 --- a/crates/omnigraph-cli/src/client.rs +++ b/crates/omnigraph-cli/src/client.rs @@ -4,38 +4,56 @@ //! 15 per-command `if graph.is_remote { … } else { … }` forks collapse //! into two arms here. //! -//! Phase 3a scope: the factory + the uniform read verbs (snapshot, -//! schema show, branch list, commit list/show — all of which open the -//! local engine WITHOUT policy today, preserved exactly). Write verbs -//! and the policy-bearing `query`/`mutate` arrive in 3b (the Embedded -//! variant will grow the policy context then); export + graphs-list in -//! 3c. Behavior is unchanged per verb — the Phase-1 parity matrix is the -//! referee and stays textually unchanged. +//! Phase 3a put the factory + the uniform read verbs in place. Phase 3b +//! adds the data-plane writes (`load`/`ingest`/`mutate`/`branch_*`/ +//! `apply_schema`) and `query`. The wrinkle 3a deferred: writes open the +//! local engine WITH policy (`open_local_db_with_policy`) and carry a +//! resolved actor, while reads/`query` open WITHOUT policy. So the +//! `Embedded` variant grows an optional policy context (`graph`/`actor`) +//! and a second factory (`resolve_with_policy`) fills it; `resolve()` +//! leaves it empty. The open path picks itself from whether `graph` is +//! set, preserving today's two behaviors exactly. Export + graphs-list +//! land in 3c. Behavior is unchanged per verb — the Phase-1 parity matrix +//! is the referee and stays textually unchanged. //! //! Enum, not a trait (RFC sketch said "trait"): only two variants ever, //! and inherent async methods sidestep `async_trait` boxing plus the //! `apply_schema` catalog-validator closure that is not object-safe. //! Same one-body-two-impls collapse, less ceremony. -use reqwest::Method; use color_eyre::Result; use omnigraph::db::{Omnigraph, ReadTarget}; use omnigraph_api_types::{ - BranchListOutput, CommitListOutput, CommitOutput, SchemaOutput, SnapshotOutput, commit_output, + BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput, + BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitListOutput, CommitOutput, + IngestOutput, IngestRequest, ReadOutput, ReadRequest, SchemaApplyOutput, SchemaApplyRequest, + SchemaOutput, SnapshotOutput, commit_output, ingest_output, read_output, schema_apply_output, snapshot_payload, }; +use omnigraph_compiler::catalog::Catalog; +use reqwest::Method; +use serde_json::Value; +use crate::cli::CliLoadMode; use crate::helpers::{ - apply_server_flag, build_http_client, is_remote_uri, remote_json, remote_url, - resolve_remote_bearer_token, + ResolvedCliGraph, apply_server_flag, build_http_client, is_remote_uri, + legacy_change_request_body, open_local_db_with_policy, query_params_from_json, remote_branch_url, + remote_json, remote_url, resolve_cli_actor, resolve_cli_graph, resolve_remote_bearer_token, + select_named_query, }; +use crate::output::{LoadOutput, load_output_from_result, load_output_from_tables}; use omnigraph_server::config::OmnigraphConfig; pub(crate) enum GraphClient { - /// Local engine at `uri`. Reads open the dataset per call (no policy - /// attached — matches today's read behavior; the write verbs in 3b - /// add a policy-bearing context). - Embedded { uri: String }, + /// Local engine at `uri`. Reads (`resolve()`) leave `graph`/`actor` + /// empty and open without policy; writes (`resolve_with_policy()`) + /// fill them, opening through `open_local_db_with_policy` and + /// attributing the resolved actor. + Embedded { + uri: String, + graph: Option, + actor: Option, + }, /// Remote HTTP server. The actor is resolved server-side from the /// token; the client never sets identity. Remote { @@ -50,7 +68,8 @@ impl GraphClient { /// and credential once, then pick the variant by URI scheme — the /// single branch point that replaces every per-command `is_remote` /// fork. Mirrors the read verbs' current preamble (`resolve_uri` - /// path, not the policy-bearing `resolve_cli_graph`). + /// path, not the policy-bearing `resolve_cli_graph`). Used by reads + /// and `query` (which opens without policy, like the reads). pub(crate) fn resolve( config: &OmnigraphConfig, server: Option<&str>, @@ -68,7 +87,76 @@ impl GraphClient { token, }) } else { - Ok(GraphClient::Embedded { uri }) + Ok(GraphClient::Embedded { + uri, + graph: None, + actor: None, + }) + } + } + + /// Write-path factory: the same addressing/credential resolution as + /// `resolve()`, but through the stricter `resolve_cli_graph` (which + /// carries `policy_file`/`graph_id`/`selected`), and with the actor + /// resolved up front. The embedded arm then opens WITH policy. The + /// resolution order matches the write arms exactly: server flag → + /// bearer token → graph. + pub(crate) fn resolve_with_policy( + config: &OmnigraphConfig, + server: Option<&str>, + graph: Option<&str>, + uri: Option, + target: Option<&str>, + cli_as: Option<&str>, + ) -> Result { + let uri = apply_server_flag(server, graph, uri, target)?; + let token = resolve_remote_bearer_token(config, uri.as_deref(), target)?; + let resolved = resolve_cli_graph(config, uri, target)?; + if resolved.is_remote { + Ok(GraphClient::Remote { + http: build_http_client()?, + base_url: resolved.uri, + token, + }) + } else { + let actor = resolve_cli_actor(cli_as, config)?; + Ok(GraphClient::Embedded { + uri: resolved.uri.clone(), + graph: Some(resolved), + actor, + }) + } + } + + /// The graph URI (local path / remote base URL) this client addresses. + pub(crate) fn uri(&self) -> &str { + match self { + GraphClient::Embedded { uri, .. } => uri, + GraphClient::Remote { base_url, .. } => base_url, + } + } + + /// The selected graph name, when a policy-bearing embedded client was + /// resolved against a named graph. `None` for remote and for reads. + pub(crate) fn selected(&self) -> Option<&str> { + match self { + GraphClient::Embedded { graph, .. } => graph.as_ref().and_then(ResolvedCliGraph::selected), + GraphClient::Remote { .. } => None, + } + } + + pub(crate) fn is_remote(&self) -> bool { + matches!(self, GraphClient::Remote { .. }) + } + + /// Open the local engine the way the resolved client demands: with + /// policy when a `graph` context is present (write path), bare + /// otherwise (read/`query` path). Captures today's two open paths in + /// one place so each verb stays a single match arm. + async fn open_embedded(uri: &str, graph: &Option) -> Result { + match graph { + Some(graph) => open_local_db_with_policy(graph).await, + None => Ok(Omnigraph::open(uri).await?), } } @@ -88,7 +176,7 @@ impl GraphClient { ) .await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; let mut branches = db.branch_list().await?; branches.sort(); @@ -113,7 +201,7 @@ impl GraphClient { ) .await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; let snapshot = db.snapshot_of(ReadTarget::branch(branch)).await?; Ok(snapshot_payload(branch, &snapshot)) @@ -137,7 +225,7 @@ impl GraphClient { ) .await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; Ok(SchemaOutput { schema_source: db.schema_source().to_string(), @@ -159,7 +247,7 @@ impl GraphClient { }; remote_json(http, Method::GET, url, None, token.as_deref()).await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; let commits = db .list_commits(branch) @@ -188,10 +276,343 @@ impl GraphClient { ) .await } - GraphClient::Embedded { uri } => { + GraphClient::Embedded { uri, .. } => { let db = Omnigraph::open(uri).await?; Ok(commit_output(&db.get_commit(commit_id).await?)) } } } + + /// `load` — bulk-load `data` (a file path) onto `branch`, forking from + /// `from` if missing. Returns the CLI `LoadOutput`; each arm keeps its + /// own mapping (remote sums the wire `IngestOutput.tables`, embedded + /// reads the richer `LoadResult` directly) — preserved exactly. + pub(crate) async fn load( + &self, + branch: &str, + from: Option<&str>, + data: &str, + mode: CliLoadMode, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + let data = std::fs::read_to_string(data)?; + let output = remote_json::( + http, + Method::POST, + remote_url(base_url, "/ingest"), + Some(serde_json::to_value(IngestRequest { + branch: Some(branch.to_string()), + from: from.map(ToOwned::to_owned), + mode: Some(mode.into()), + data, + })?), + token.as_deref(), + ) + .await?; + Ok(load_output_from_tables(base_url, branch, mode.as_str(), &output)) + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let result = db + .load_file_as(branch, from, data, mode.into(), actor.as_deref()) + .await?; + Ok(load_output_from_result(uri, branch, mode.as_str(), &result)) + } + } + } + + /// `ingest` — the deprecated alias of `load`. Same operation, but the + /// surfaced shape is the wire `IngestOutput` (printed by + /// `print_ingest_human`), so it is its own method. The embedded arm + /// echoes `actor_id: None` in the output exactly as the legacy arm did + /// (the actor is still attributed on the commit via `load_file_as`). + pub(crate) async fn ingest( + &self, + branch: &str, + from: &str, + data: &str, + mode: CliLoadMode, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + let data = std::fs::read_to_string(data)?; + remote_json( + http, + Method::POST, + remote_url(base_url, "/ingest"), + Some(serde_json::to_value(IngestRequest { + branch: Some(branch.to_string()), + from: Some(from.to_string()), + mode: Some(mode.into()), + data, + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let result = db + .load_file_as(branch, Some(from), data, mode.into(), actor.as_deref()) + .await?; + Ok(ingest_output(uri, &result, mode.into(), None)) + } + } + } + + /// `mutate` — run a change query against `branch`. Folds + /// `execute_change` / `execute_change_remote` + the legacy request body. + pub(crate) async fn mutate( + &self, + branch: &str, + query_source: &str, + query_name: Option<&str>, + params_json: Option<&Value>, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::POST, + remote_url(base_url, "/change"), + Some(legacy_change_request_body( + query_source, + query_name, + branch, + params_json, + )), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let (selected_name, query_params) = select_named_query(query_source, query_name)?; + let params = query_params_from_json(&query_params, params_json)?; + let db = Self::open_embedded(uri, graph).await?; + let actor = actor.as_deref(); + let result = db + .mutate_as(branch, query_source, &selected_name, ¶ms, actor) + .await?; + Ok(ChangeOutput { + branch: branch.to_string(), + query_name: selected_name, + affected_nodes: result.affected_nodes, + affected_edges: result.affected_edges, + actor_id: actor.map(String::from), + }) + } + } + } + + /// `query` — run a read query against `target`. Folds `execute_read` / + /// `execute_read_remote`; the embedded arm opens WITHOUT policy (reads + /// never attach one), so this verb resolves via `resolve()`. + pub(crate) async fn query( + &self, + target: ReadTarget, + query_source: &str, + query_name: Option<&str>, + params_json: Option<&Value>, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + let (branch, snapshot) = match &target { + ReadTarget::Branch(branch) => (Some(branch.clone()), None), + ReadTarget::Snapshot(snapshot) => (None, Some(snapshot.as_str().to_string())), + }; + remote_json( + http, + Method::POST, + remote_url(base_url, "/read"), + Some(serde_json::to_value(ReadRequest { + query_source: query_source.to_string(), + query_name: query_name.map(ToOwned::to_owned), + params: params_json.cloned(), + branch, + snapshot, + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, .. } => { + let (selected_name, query_params) = select_named_query(query_source, query_name)?; + let params = query_params_from_json(&query_params, params_json)?; + let db = Self::open_embedded(uri, graph).await?; + let result = db + .query(target.clone(), query_source, &selected_name, ¶ms) + .await?; + Ok(read_output(selected_name, &target, result)) + } + } + } + + pub(crate) async fn branch_create_from( + &self, + from: &str, + name: &str, + ) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::POST, + remote_url(base_url, "/branches"), + Some(serde_json::to_value(BranchCreateRequest { + from: Some(from.to_string()), + name: name.to_string(), + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let actor = actor.as_deref(); + db.branch_create_from_as(ReadTarget::branch(from), name, actor) + .await?; + Ok(BranchCreateOutput { + uri: uri.clone(), + from: from.to_string(), + name: name.to_string(), + actor_id: actor.map(String::from), + }) + } + } + } + + pub(crate) async fn branch_delete(&self, name: &str) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::DELETE, + remote_branch_url(base_url, name)?, + None, + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let actor = actor.as_deref(); + db.branch_delete_as(name, actor).await?; + Ok(BranchDeleteOutput { + uri: uri.clone(), + name: name.to_string(), + actor_id: actor.map(String::from), + }) + } + } + } + + pub(crate) async fn branch_merge(&self, source: &str, into: &str) -> Result { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + remote_json( + http, + Method::POST, + remote_url(base_url, "/branches/merge"), + Some(serde_json::to_value(BranchMergeRequest { + source: source.to_string(), + target: Some(into.to_string()), + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let actor = actor.as_deref(); + let outcome = db.branch_merge_as(source, into, actor).await?; + Ok(BranchMergeOutput { + source: source.to_string(), + target: into.to_string(), + outcome: outcome.into(), + actor_id: actor.map(String::from), + }) + } + } + } + + /// `apply_schema` — apply `schema_source`. The embedded arm runs the + /// caller's catalog validator (stored-query registry check) inside the + /// engine's `apply_schema_as_with_catalog_check`; the remote arm runs + /// the server's own check and IGNORES `validate`. The `impl FnOnce` + /// validator is exactly why this is an enum, not a trait (non-object- + /// safe). + pub(crate) async fn apply_schema( + &self, + schema_source: &str, + allow_data_loss: bool, + validate: F, + ) -> Result + where + F: FnOnce(&Catalog) -> omnigraph::error::Result<()>, + { + match self { + GraphClient::Remote { + http, + base_url, + token, + } => { + // MR-694 PR B: SchemaApplyRequest carries allow_data_loss so + // Hard-mode drops are no longer CLI-only; the server's + // `server_schema_apply` honors it (and runs its own catalog + // check, so `validate` does not apply here). + remote_json::( + http, + Method::POST, + remote_url(base_url, "/schema/apply"), + Some(serde_json::to_value(SchemaApplyRequest { + schema_source: schema_source.to_string(), + allow_data_loss, + })?), + token.as_deref(), + ) + .await + } + GraphClient::Embedded { uri, graph, actor } => { + let db = Self::open_embedded(uri, graph).await?; + let result = db + .apply_schema_as_with_catalog_check( + schema_source, + omnigraph::db::SchemaApplyOptions { allow_data_loss }, + actor.as_deref(), + validate, + ) + .await?; + Ok(schema_apply_output(uri, result)) + } + } + } } diff --git a/crates/omnigraph-cli/src/helpers.rs b/crates/omnigraph-cli/src/helpers.rs index 67fb6ea..e9dfcc1 100644 --- a/crates/omnigraph-cli/src/helpers.rs +++ b/crates/omnigraph-cli/src/helpers.rs @@ -979,77 +979,6 @@ pub(crate) fn execute_queries_list( Ok(()) } -pub(crate) async fn execute_read( - uri: &str, - query_source: &str, - query_name: Option<&str>, - target: ReadTarget, - params_json: Option<&Value>, -) -> Result { - let (selected_name, query_params) = select_named_query(query_source, query_name)?; - let params = query_params_from_json(&query_params, params_json)?; - let db = Omnigraph::open(uri).await?; - let result = db - .query(target.clone(), query_source, &selected_name, ¶ms) - .await?; - Ok(read_output(selected_name, &target, result)) -} - -pub(crate) async fn execute_read_remote( - client: &reqwest::Client, - uri: &str, - query_source: &str, - query_name: Option<&str>, - target: ReadTarget, - params_json: Option<&Value>, - bearer_token: Option<&str>, -) -> Result { - let (branch, snapshot) = match &target { - ReadTarget::Branch(branch) => (Some(branch.clone()), None), - ReadTarget::Snapshot(snapshot) => (None, Some(snapshot.as_str().to_string())), - }; - remote_json( - client, - Method::POST, - remote_url(uri, "/read"), - Some(serde_json::to_value(ReadRequest { - query_source: query_source.to_string(), - query_name: query_name.map(ToOwned::to_owned), - params: params_json.cloned(), - branch, - snapshot, - })?), - bearer_token, - ) - .await -} - -pub(crate) async fn execute_change( - graph: &ResolvedCliGraph, - query_source: &str, - query_name: Option<&str>, - branch: &str, - params_json: Option<&Value>, - config: &OmnigraphConfig, - cli_as_actor: Option<&str>, -) -> Result { - let (selected_name, query_params) = select_named_query(query_source, query_name)?; - let params = query_params_from_json(&query_params, params_json)?; - let db = open_local_db_with_policy(graph).await?; - let actor = resolve_cli_actor(cli_as_actor, config)?; - let actor = actor.as_deref(); - let result = db - .mutate_as(branch, query_source, &selected_name, ¶ms, actor) - .await?; - Ok(ChangeOutput { - branch: branch.to_string(), - query_name: selected_name, - affected_nodes: result.affected_nodes, - affected_edges: result.affected_edges, - actor_id: actor.map(String::from), - }) -} - pub(crate) fn legacy_change_request_body( query_source: &str, query_name: Option<&str>, @@ -1069,30 +998,6 @@ pub(crate) fn legacy_change_request_body( body } -pub(crate) async fn execute_change_remote( - client: &reqwest::Client, - uri: &str, - query_source: &str, - query_name: Option<&str>, - branch: &str, - params_json: Option<&Value>, - bearer_token: Option<&str>, -) -> Result { - remote_json( - client, - Method::POST, - remote_url(uri, "/change"), - Some(legacy_change_request_body( - query_source, - query_name, - branch, - params_json, - )), - bearer_token, - ) - .await -} - pub(crate) async fn execute_export_to_writer( uri: &str, branch: &str, diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index e979622..53eb4c7 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -23,11 +23,8 @@ use omnigraph_compiler::{ json_params_to_param_map, lint_query_file, }; use omnigraph_api_types::{ - BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, - BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitOutput, - ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, IngestRequest, ReadOutput, - ReadRequest, SchemaApplyOutput, SchemaApplyRequest, - SnapshotTableOutput, ingest_output, read_output, schema_apply_output, + ChangeOutput, CommitOutput, ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, + ReadOutput, SchemaApplyOutput, SnapshotTableOutput, }; use omnigraph_server::queries::{QueryRegistry, check, format_check_breakages}; use omnigraph_server::{ @@ -166,44 +163,18 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let branch = resolve_branch(&config, branch, None, "main"); - let payload = if graph.is_remote { - let data = fs::read_to_string(&data)?; - let output = remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/ingest"), - Some(serde_json::to_value(IngestRequest { - branch: Some(branch.clone()), - from: from.clone(), - mode: Some(mode.into()), - data, - })?), - bearer_token.as_deref(), - ) + let payload = client + .load(&branch, from.as_deref(), &data.to_string_lossy(), mode) .await?; - load_output_from_tables(&uri, &branch, mode.as_str(), &output) - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - let result = db - .load_file_as( - &branch, - from.as_deref(), - &data.to_string_lossy(), - mode.into(), - actor, - ) - .await?; - load_output_from_result(&uri, &branch, mode.as_str(), &result) - }; if json { print_json(&payload)?; } else { @@ -226,44 +197,19 @@ async fn main() -> Result<()> { use `omnigraph load --from --mode ` (ingest defaults: --from main --mode merge)" ); let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let branch = resolve_branch(&config, branch, None, "main"); let from = resolve_branch(&config, from, None, "main"); - let payload = if graph.is_remote { - let data = fs::read_to_string(&data)?; - remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/ingest"), - Some(serde_json::to_value(IngestRequest { - branch: Some(branch.clone()), - from: Some(from.clone()), - mode: Some(mode.into()), - data, - })?), - bearer_token.as_deref(), - ) - .await? - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - let result = db - .load_file_as( - &branch, - Some(&from), - &data.to_string_lossy(), - mode.into(), - actor, - ) - .await?; - ingest_output(&uri, &result, mode.into(), None) - }; + let payload = client + .ingest(&branch, &from, &data.to_string_lossy(), mode) + .await?; if json { print_json(&payload)?; } else { @@ -280,38 +226,16 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let from = resolve_branch(&config, from, None, "main"); - let payload = if graph.is_remote { - remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/branches"), - Some(serde_json::to_value(BranchCreateRequest { - from: Some(from.clone()), - name: name.clone(), - })?), - bearer_token.as_deref(), - ) - .await? - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - db.branch_create_from_as(ReadTarget::branch(&from), &name, actor) - .await?; - BranchCreateOutput { - uri: uri.clone(), - from: from.clone(), - name: name.clone(), - actor_id: actor.map(String::from), - } - }; + let payload = client.branch_create_from(&from, &name).await?; if json { print_json(&payload)?; } else { @@ -349,32 +273,15 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); - let payload = if graph.is_remote { - remote_json::( - &http_client, - Method::DELETE, - remote_branch_url(&uri, &name)?, - None, - bearer_token.as_deref(), - ) - .await? - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - db.branch_delete_as(&name, actor).await?; - BranchDeleteOutput { - uri: uri.clone(), - name: name.clone(), - actor_id: actor.map(String::from), - } - }; + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; + let payload = client.branch_delete(&name).await?; if json { print_json(&payload)?; } else { @@ -390,37 +297,16 @@ async fn main() -> Result<()> { json, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let into = resolve_branch(&config, into, None, "main"); - let payload = if graph.is_remote { - remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/branches/merge"), - Some(serde_json::to_value(BranchMergeRequest { - source: source.clone(), - target: Some(into.clone()), - })?), - bearer_token.as_deref(), - ) - .await? - } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - let outcome = db.branch_merge_as(&source, &into, actor).await?; - BranchMergeOutput { - source: source.clone(), - target: into.clone(), - outcome: outcome.into(), - actor_id: actor.map(String::from), - } - }; + let payload = client.branch_merge(&source, &into).await?; if json { print_json(&payload)?; } else { @@ -519,52 +405,34 @@ async fn main() -> Result<()> { allow_data_loss, } => { let config = load_cli_config(config.as_ref())?; - let uri = - apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?; - let bearer_token = - resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; - let graph = resolve_cli_graph(&config, uri, target.as_deref())?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target.as_deref(), + cli.as_actor.as_deref(), + )?; let schema_source = fs::read_to_string(&schema)?; - let output = if graph.is_remote { - // MR-694 PR B: SchemaApplyRequest gained an - // allow_data_loss field so Hard-mode drops are no - // longer CLI-only. The previous bail is gone; the - // field is forwarded into the JSON payload, and - // the server's `server_schema_apply` honors it. - remote_json::( - &http_client, - Method::POST, - remote_url(&uri, "/schema/apply"), - Some(serde_json::to_value(SchemaApplyRequest { - schema_source: schema_source.clone(), - allow_data_loss, - })?), - bearer_token.as_deref(), - ) - .await? + // The stored-query registry check is an embedded-only concern + // (the remote arm ignores the validator — the server runs its + // own check); build it only for the local path so the remote + // path keeps its no-registry-load behavior. + let registry = if client.is_remote() { + None } else { - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?; - let actor = actor.as_deref(); - let registry = load_registry_or_report(&config, graph.selected())?; - let registry = (!registry.is_empty()).then_some(registry); - let label = graph.selected().unwrap_or(&uri).to_string(); - let result = db - .apply_schema_as_with_catalog_check( - &schema_source, - omnigraph::db::SchemaApplyOptions { allow_data_loss }, - actor, - |catalog| { - if let Some(registry) = registry.as_ref() { - validate_registry_for_catalog(registry, catalog, &label)?; - } - Ok(()) - }, - ) - .await?; - schema_apply_output(&uri, result) + let registry = load_registry_or_report(&config, client.selected())?; + (!registry.is_empty()).then_some(registry) }; + let label = client.selected().unwrap_or(client.uri()).to_string(); + let output = client + .apply_schema(&schema_source, allow_data_loss, |catalog| { + if let Some(registry) = registry.as_ref() { + validate_registry_for_catalog(registry, catalog, &label)?; + } + Ok(()) + }) + .await?; if json { print_json(&output)?; } else { @@ -757,10 +625,13 @@ async fn main() -> Result<()> { let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.graph.as_deref())); - let uri = apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target_name)?; - let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?; - let graph = resolve_cli_graph(&config, uri, target_name)?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target_name, + )?; let query_source = resolve_query_source( &config, query.as_ref(), @@ -782,27 +653,14 @@ async fn main() -> Result<()> { alias_config.and_then(|alias| alias.branch.clone()), )?; let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone())); - let output = if graph.is_remote { - execute_read_remote( - &http_client, - &uri, + let output = client + .query( + target, &query_source, query_name.as_deref(), - target, - params_json.as_ref(), - bearer_token.as_deref(), - ) - .await? - } else { - execute_read( - &uri, - &query_source, - query_name.as_deref(), - target, params_json.as_ref(), ) - .await? - }; + .await?; let format = resolve_read_format( &config, format, @@ -844,10 +702,14 @@ async fn main() -> Result<()> { let target_name = target .as_deref() .or_else(|| alias_config.and_then(|alias| alias.graph.as_deref())); - let uri = apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target_name)?; - let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?; - let graph = resolve_cli_graph(&config, uri, target_name)?; - let uri = graph.uri.clone(); + let client = client::GraphClient::resolve_with_policy( + &config, + cli.server.as_deref(), + cli.graph.as_deref(), + uri, + target_name, + cli.as_actor.as_deref(), + )?; let query_source = resolve_query_source( &config, query.as_ref(), @@ -869,29 +731,14 @@ async fn main() -> Result<()> { "main", ); let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone())); - let output = if graph.is_remote { - execute_change_remote( - &http_client, - &uri, + let output = client + .mutate( + &branch, &query_source, query_name.as_deref(), - &branch, params_json.as_ref(), - bearer_token.as_deref(), ) - .await? - } else { - execute_change( - &graph, - &query_source, - query_name.as_deref(), - &branch, - params_json.as_ref(), - &config, - cli.as_actor.as_deref(), - ) - .await? - }; + .await?; if json { print_json(&output)?; } else {