refactor(cli): collapse export + graphs-list onto GraphClient (RFC-009 Phase 3c) (#213)

The last two embedded-vs-remote forks move onto the enum, so every such
`if` in the CLI now lives in client.rs — the point of the refactor.

- `export<W: Write>`: the streaming verb 3b deferred (writes to a writer,
  chunks the HTTP response body, rather than returning a DTO). Embedded
  calls db.export_jsonl_to_writer; Remote streams the chunked body through.
  Opens WITHOUT policy (like reads), so it routes via resolve().
- `list_graphs`: remote-only by design (no local enumeration endpoint), so
  the Embedded arm keeps the loud "requires a remote multi-graph server"
  bail verbatim. Routing it through the enum still buys the shared
  resolve() addressing/token preamble the arm hand-rolled.

Retire the now-orphaned execute_export_to_writer /
execute_export_remote_to_writer pair, and sweep two pre-existing dead fns
while in the files: inferred_config_path (helpers.rs) and yaml_string
(output.rs, shadowed by test-local copies).

parity_matrix gains one row, parity_export — the single intended matrix
change in this phase. Export is a JSONL stream, not a single --json doc,
so it compares the two arms' output line-wise (sorted; twin graphs are
byte-copies so rows need no scrubbing). graphs-list gets no row: its
remote-only behavior is a documented exclusion, not an equality case.

Full workspace tests pass; all 12 parity rows green.

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-06-13 21:03:45 +03:00 committed by GitHub
parent d32c1ac191
commit 45500a690a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 137 additions and 114 deletions

View file

@ -21,14 +21,17 @@
//! `apply_schema` catalog-validator closure that is not object-safe.
//! Same one-body-two-impls collapse, less ceremony.
use std::io::Write;
use color_eyre::Result;
use color_eyre::eyre::bail;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph_api_types::{
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitListOutput, CommitOutput,
IngestOutput, IngestRequest, ReadOutput, ReadRequest, SchemaApplyOutput, SchemaApplyRequest,
SchemaOutput, SnapshotOutput, commit_output, ingest_output, read_output, schema_apply_output,
snapshot_payload,
ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, IngestRequest, ReadOutput,
ReadRequest, SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotOutput, commit_output,
ingest_output, read_output, schema_apply_output, snapshot_payload,
};
use omnigraph_compiler::catalog::Catalog;
use reqwest::Method;
@ -36,7 +39,7 @@ use serde_json::Value;
use crate::cli::CliLoadMode;
use crate::helpers::{
ResolvedCliGraph, apply_server_flag, build_http_client, is_remote_uri,
ResolvedCliGraph, apply_bearer_token, apply_server_flag, build_http_client, is_remote_uri,
legacy_change_request_body, open_local_db_with_policy, query_params_from_json, remote_branch_url,
remote_json, remote_url, resolve_cli_actor, resolve_cli_graph, resolve_remote_bearer_token,
select_named_query,
@ -615,4 +618,86 @@ impl GraphClient {
}
}
}
/// `export` — stream the branch as JSONL into `writer`. The streaming
/// shape (a `W: Write`, not a returned DTO) is why this lands in 3c
/// rather than 3b. Opens WITHOUT policy (like reads), so it is reached
/// via `resolve()`; the Embedded arm opens bare. The Remote arm streams
/// the chunked response body straight through (no buffering the whole
/// export in memory).
pub(crate) async fn export<W: Write>(
&self,
branch: &str,
type_names: &[String],
table_keys: &[String],
writer: &mut W,
) -> Result<()> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
let request = apply_bearer_token(
http.request(Method::POST, remote_url(base_url, "/export")),
token.as_deref(),
)
.json(&ExportRequest {
branch: Some(branch.to_string()),
type_names: type_names.to_vec(),
table_keys: table_keys.to_vec(),
});
let mut response = request.send().await?;
let status = response.status();
if !status.is_success() {
let text = response.text().await?;
if let Ok(error) = serde_json::from_str::<ErrorOutput>(&text) {
bail!(error.error);
}
bail!("server returned {}: {}", status, text);
}
while let Some(chunk) = response.chunk().await? {
writer.write_all(&chunk)?;
}
writer.flush()?;
Ok(())
}
GraphClient::Embedded { uri, .. } => {
let db = Omnigraph::open(uri).await?;
db.export_jsonl_to_writer(branch, type_names, table_keys, writer)
.await?;
writer.flush()?;
Ok(())
}
}
}
/// `graphs list` — enumerate the graphs a remote multi-graph server
/// serves (`GET /graphs`). Remote-only by design: there is no local
/// enumeration endpoint, so the Embedded arm fails loudly pointing the
/// operator at `omnigraph.yaml`. Routing it through the enum still buys
/// the shared `resolve()` addressing/token preamble.
pub(crate) async fn list_graphs(&self) -> Result<GraphListResponse> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::GET,
remote_url(base_url, "/graphs"),
None,
token.as_deref(),
)
.await
}
GraphClient::Embedded { .. } => bail!(
"`omnigraph graphs list` requires a remote multi-graph server URL \
(http:// or https://). To enumerate local graphs, read `omnigraph.yaml` \
directly."
),
}
}
}

View file

@ -678,22 +678,6 @@ pub(crate) fn normalize_legacy_alias_uri(
}
pub(crate) fn inferred_config_path(uri: &str) -> Result<PathBuf> {
if uri.contains("://") {
return Ok(omnigraph_server::config::default_config_path());
}
let path = Path::new(uri);
let base = if path.is_absolute() {
path.parent()
.map(Path::to_path_buf)
.unwrap_or(std::env::current_dir()?)
} else {
std::env::current_dir()?.join(path.parent().unwrap_or_else(|| Path::new(".")))
};
Ok(base.join(omnigraph_server::config::DEFAULT_CONFIG_FILE))
}
pub(crate) fn read_target_from_cli(branch: Option<String>, snapshot: Option<String>) -> ReadTarget {
if let Some(snapshot) = snapshot {
ReadTarget::snapshot(SnapshotId::new(snapshot))
@ -998,55 +982,6 @@ pub(crate) fn legacy_change_request_body(
body
}
pub(crate) async fn execute_export_to_writer<W: Write>(
uri: &str,
branch: &str,
type_names: &[String],
table_keys: &[String],
writer: &mut W,
) -> Result<()> {
let db = Omnigraph::open(uri).await?;
db.export_jsonl_to_writer(branch, type_names, table_keys, writer)
.await?;
writer.flush()?;
Ok(())
}
pub(crate) async fn execute_export_remote_to_writer<W: Write>(
client: &reqwest::Client,
uri: &str,
branch: &str,
type_names: &[String],
table_keys: &[String],
bearer_token: Option<&str>,
writer: &mut W,
) -> Result<()> {
let request = apply_bearer_token(
client.request(Method::POST, remote_url(uri, "/export")),
bearer_token,
)
.json(&ExportRequest {
branch: Some(branch.to_string()),
type_names: type_names.to_vec(),
table_keys: table_keys.to_vec(),
});
let mut response = request.send().await?;
let status = response.status();
if !status.is_success() {
let text = response.text().await?;
if let Ok(error) = serde_json::from_str::<ErrorOutput>(&text) {
bail!(error.error);
}
bail!("server returned {}: {}", status, text);
}
while let Some(chunk) = response.chunk().await? {
writer.write_all(&chunk)?;
}
writer.flush()?;
Ok(())
}
pub(crate) fn rewrite_deprecated_argv(args: Vec<OsString>) -> Vec<OsString> {
if args.len() >= 3 {
let sub = args[1].to_str();

View file

@ -23,8 +23,8 @@ use omnigraph_compiler::{
json_params_to_param_map, lint_query_file,
};
use omnigraph_api_types::{
ChangeOutput, CommitOutput, ErrorOutput, ExportRequest, GraphListResponse, IngestOutput,
ReadOutput, SchemaApplyOutput, SnapshotTableOutput,
ChangeOutput, CommitOutput, ErrorOutput, IngestOutput, ReadOutput, SchemaApplyOutput,
SnapshotTableOutput,
};
use omnigraph_server::queries::{QueryRegistry, check, format_check_breakages};
use omnigraph_server::{
@ -525,11 +525,13 @@ async fn main() -> Result<()> {
table_keys,
} => {
let config = load_cli_config(config.as_ref())?;
let uri =
apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let client = client::GraphClient::resolve(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target.as_deref(),
)?;
let branch = resolve_branch(&config, branch, None, "main");
if jsonl {
eprintln!("warning: --jsonl is deprecated; `omnigraph export` always emits JSONL");
@ -537,21 +539,9 @@ async fn main() -> Result<()> {
let stdout = io::stdout();
let mut stdout = stdout.lock();
if is_remote_uri(&uri) {
execute_export_remote_to_writer(
&http_client,
&uri,
&branch,
&type_names,
&table_keys,
bearer_token.as_deref(),
&mut stdout,
)
client
.export(&branch, &type_names, &table_keys, &mut stdout)
.await?;
} else {
execute_export_to_writer(&uri, &branch, &type_names, &table_keys, &mut stdout)
.await?;
}
}
Command::Query {
uri,
@ -1047,26 +1037,14 @@ async fn main() -> Result<()> {
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri =
apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
if !is_remote_uri(&uri) {
bail!(
"`omnigraph graphs list` requires a remote multi-graph server URL \
(http:// or https://). To enumerate local graphs, read `omnigraph.yaml` \
directly."
);
}
let payload = remote_json::<GraphListResponse>(
&http_client,
Method::GET,
remote_url(&uri, "/graphs"),
None,
bearer_token.as_deref(),
)
.await?;
let client = client::GraphClient::resolve(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target.as_deref(),
)?;
let payload = client.list_graphs().await?;
if json {
print_json(&payload)?;
} else {

View file

@ -812,10 +812,6 @@ pub(crate) fn print_policy_explain(decision: &PolicyDecision, actor_id: &str, re
println!("message: {}", decision.message);
}
pub(crate) fn yaml_string(value: &str) -> String {
format!("'{}'", value.replace('\'', "''"))
}
#[derive(serde::Serialize)]
pub(crate) struct QueriesIssue {
pub(crate) query: String,