refactor(cli): collapse write/query forks onto GraphClient (RFC-009 Phase 3b) (#211)

Phase 3a put the GraphClient enum in place and collapsed the five uniform
read forks. 3b folds the remaining data-plane forks onto the same enum:
load, ingest, mutate, query, branch create/delete/merge, and schema apply.

The wrinkle 3a deferred was the local policy attachment. Reads and query
open the local engine without a policy; writes open through
open_local_db_with_policy and attribute a resolved actor. So the Embedded
variant grows an optional policy context (graph/actor) filled by a second
factory, resolve_with_policy; resolve() leaves it empty. open_embedded
picks the open path from whether the context is present, preserving both
of today's behaviors exactly. query still uses resolve() (no policy), as
the read path did.

apply_schema takes the catalog-validator closure as impl FnOnce(&Catalog)
— the embedded arm runs it inside apply_schema_as_with_catalog_check, the
remote arm ignores it (the server runs its own check). That non-object-safe
closure is why GraphClient is an enum, not a trait. The stored-query
registry is still built caller-side and only for the local path.

load and ingest stay separate methods: same operation, but load surfaces
the CLI LoadOutput (two distinct per-arm mappings preserved) while ingest
surfaces the wire IngestOutput. The now-fully-dead execute_read/
execute_read_remote and execute_change/execute_change_remote pairs are
retired (legacy_change_request_body stays — client.rs uses it); the export
pair remains for 3c.

The Phase-1 parity matrix is unchanged and green; full workspace tests pass.

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Andrew Altshuler 2026-06-13 19:25:57 +03:00 committed by GitHub
parent 81b66f9427
commit d32c1ac191
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 541 additions and 368 deletions

View file

@ -23,11 +23,8 @@ use omnigraph_compiler::{
json_params_to_param_map, lint_query_file,
};
use omnigraph_api_types::{
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput,
BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitOutput,
ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, IngestRequest, ReadOutput,
ReadRequest, SchemaApplyOutput, SchemaApplyRequest,
SnapshotTableOutput, ingest_output, read_output, schema_apply_output,
ChangeOutput, CommitOutput, ErrorOutput, ExportRequest, GraphListResponse, IngestOutput,
ReadOutput, SchemaApplyOutput, SnapshotTableOutput,
};
use omnigraph_server::queries::{QueryRegistry, check, format_check_breakages};
use omnigraph_server::{
@ -166,44 +163,18 @@ async fn main() -> Result<()> {
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri =
apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let client = client::GraphClient::resolve_with_policy(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target.as_deref(),
cli.as_actor.as_deref(),
)?;
let branch = resolve_branch(&config, branch, None, "main");
let payload = if graph.is_remote {
let data = fs::read_to_string(&data)?;
let output = remote_json::<IngestOutput>(
&http_client,
Method::POST,
remote_url(&uri, "/ingest"),
Some(serde_json::to_value(IngestRequest {
branch: Some(branch.clone()),
from: from.clone(),
mode: Some(mode.into()),
data,
})?),
bearer_token.as_deref(),
)
let payload = client
.load(&branch, from.as_deref(), &data.to_string_lossy(), mode)
.await?;
load_output_from_tables(&uri, &branch, mode.as_str(), &output)
} else {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?;
let actor = actor.as_deref();
let result = db
.load_file_as(
&branch,
from.as_deref(),
&data.to_string_lossy(),
mode.into(),
actor,
)
.await?;
load_output_from_result(&uri, &branch, mode.as_str(), &result)
};
if json {
print_json(&payload)?;
} else {
@ -226,44 +197,19 @@ async fn main() -> Result<()> {
use `omnigraph load --from <base> --mode <mode>` (ingest defaults: --from main --mode merge)"
);
let config = load_cli_config(config.as_ref())?;
let uri =
apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let client = client::GraphClient::resolve_with_policy(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target.as_deref(),
cli.as_actor.as_deref(),
)?;
let branch = resolve_branch(&config, branch, None, "main");
let from = resolve_branch(&config, from, None, "main");
let payload = if graph.is_remote {
let data = fs::read_to_string(&data)?;
remote_json::<IngestOutput>(
&http_client,
Method::POST,
remote_url(&uri, "/ingest"),
Some(serde_json::to_value(IngestRequest {
branch: Some(branch.clone()),
from: Some(from.clone()),
mode: Some(mode.into()),
data,
})?),
bearer_token.as_deref(),
)
.await?
} else {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?;
let actor = actor.as_deref();
let result = db
.load_file_as(
&branch,
Some(&from),
&data.to_string_lossy(),
mode.into(),
actor,
)
.await?;
ingest_output(&uri, &result, mode.into(), None)
};
let payload = client
.ingest(&branch, &from, &data.to_string_lossy(), mode)
.await?;
if json {
print_json(&payload)?;
} else {
@ -280,38 +226,16 @@ async fn main() -> Result<()> {
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri =
apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let client = client::GraphClient::resolve_with_policy(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target.as_deref(),
cli.as_actor.as_deref(),
)?;
let from = resolve_branch(&config, from, None, "main");
let payload = if graph.is_remote {
remote_json::<BranchCreateOutput>(
&http_client,
Method::POST,
remote_url(&uri, "/branches"),
Some(serde_json::to_value(BranchCreateRequest {
from: Some(from.clone()),
name: name.clone(),
})?),
bearer_token.as_deref(),
)
.await?
} else {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?;
let actor = actor.as_deref();
db.branch_create_from_as(ReadTarget::branch(&from), &name, actor)
.await?;
BranchCreateOutput {
uri: uri.clone(),
from: from.clone(),
name: name.clone(),
actor_id: actor.map(String::from),
}
};
let payload = client.branch_create_from(&from, &name).await?;
if json {
print_json(&payload)?;
} else {
@ -349,32 +273,15 @@ async fn main() -> Result<()> {
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri =
apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let payload = if graph.is_remote {
remote_json::<BranchDeleteOutput>(
&http_client,
Method::DELETE,
remote_branch_url(&uri, &name)?,
None,
bearer_token.as_deref(),
)
.await?
} else {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?;
let actor = actor.as_deref();
db.branch_delete_as(&name, actor).await?;
BranchDeleteOutput {
uri: uri.clone(),
name: name.clone(),
actor_id: actor.map(String::from),
}
};
let client = client::GraphClient::resolve_with_policy(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target.as_deref(),
cli.as_actor.as_deref(),
)?;
let payload = client.branch_delete(&name).await?;
if json {
print_json(&payload)?;
} else {
@ -390,37 +297,16 @@ async fn main() -> Result<()> {
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri =
apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let client = client::GraphClient::resolve_with_policy(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target.as_deref(),
cli.as_actor.as_deref(),
)?;
let into = resolve_branch(&config, into, None, "main");
let payload = if graph.is_remote {
remote_json::<BranchMergeOutput>(
&http_client,
Method::POST,
remote_url(&uri, "/branches/merge"),
Some(serde_json::to_value(BranchMergeRequest {
source: source.clone(),
target: Some(into.clone()),
})?),
bearer_token.as_deref(),
)
.await?
} else {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?;
let actor = actor.as_deref();
let outcome = db.branch_merge_as(&source, &into, actor).await?;
BranchMergeOutput {
source: source.clone(),
target: into.clone(),
outcome: outcome.into(),
actor_id: actor.map(String::from),
}
};
let payload = client.branch_merge(&source, &into).await?;
if json {
print_json(&payload)?;
} else {
@ -519,52 +405,34 @@ async fn main() -> Result<()> {
allow_data_loss,
} => {
let config = load_cli_config(config.as_ref())?;
let uri =
apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target.as_deref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let client = client::GraphClient::resolve_with_policy(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target.as_deref(),
cli.as_actor.as_deref(),
)?;
let schema_source = fs::read_to_string(&schema)?;
let output = if graph.is_remote {
// MR-694 PR B: SchemaApplyRequest gained an
// allow_data_loss field so Hard-mode drops are no
// longer CLI-only. The previous bail is gone; the
// field is forwarded into the JSON payload, and
// the server's `server_schema_apply` honors it.
remote_json::<SchemaApplyOutput>(
&http_client,
Method::POST,
remote_url(&uri, "/schema/apply"),
Some(serde_json::to_value(SchemaApplyRequest {
schema_source: schema_source.clone(),
allow_data_loss,
})?),
bearer_token.as_deref(),
)
.await?
// The stored-query registry check is an embedded-only concern
// (the remote arm ignores the validator — the server runs its
// own check); build it only for the local path so the remote
// path keeps its no-registry-load behavior.
let registry = if client.is_remote() {
None
} else {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config)?;
let actor = actor.as_deref();
let registry = load_registry_or_report(&config, graph.selected())?;
let registry = (!registry.is_empty()).then_some(registry);
let label = graph.selected().unwrap_or(&uri).to_string();
let result = db
.apply_schema_as_with_catalog_check(
&schema_source,
omnigraph::db::SchemaApplyOptions { allow_data_loss },
actor,
|catalog| {
if let Some(registry) = registry.as_ref() {
validate_registry_for_catalog(registry, catalog, &label)?;
}
Ok(())
},
)
.await?;
schema_apply_output(&uri, result)
let registry = load_registry_or_report(&config, client.selected())?;
(!registry.is_empty()).then_some(registry)
};
let label = client.selected().unwrap_or(client.uri()).to_string();
let output = client
.apply_schema(&schema_source, allow_data_loss, |catalog| {
if let Some(registry) = registry.as_ref() {
validate_registry_for_catalog(registry, catalog, &label)?;
}
Ok(())
})
.await?;
if json {
print_json(&output)?;
} else {
@ -757,10 +625,13 @@ async fn main() -> Result<()> {
let target_name = target
.as_deref()
.or_else(|| alias_config.and_then(|alias| alias.graph.as_deref()));
let uri = apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target_name)?;
let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?;
let graph = resolve_cli_graph(&config, uri, target_name)?;
let uri = graph.uri.clone();
let client = client::GraphClient::resolve(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target_name,
)?;
let query_source = resolve_query_source(
&config,
query.as_ref(),
@ -782,27 +653,14 @@ async fn main() -> Result<()> {
alias_config.and_then(|alias| alias.branch.clone()),
)?;
let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone()));
let output = if graph.is_remote {
execute_read_remote(
&http_client,
&uri,
let output = client
.query(
target,
&query_source,
query_name.as_deref(),
target,
params_json.as_ref(),
bearer_token.as_deref(),
)
.await?
} else {
execute_read(
&uri,
&query_source,
query_name.as_deref(),
target,
params_json.as_ref(),
)
.await?
};
.await?;
let format = resolve_read_format(
&config,
format,
@ -844,10 +702,14 @@ async fn main() -> Result<()> {
let target_name = target
.as_deref()
.or_else(|| alias_config.and_then(|alias| alias.graph.as_deref()));
let uri = apply_server_flag(cli.server.as_deref(), cli.graph.as_deref(), uri, target_name)?;
let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?;
let graph = resolve_cli_graph(&config, uri, target_name)?;
let uri = graph.uri.clone();
let client = client::GraphClient::resolve_with_policy(
&config,
cli.server.as_deref(),
cli.graph.as_deref(),
uri,
target_name,
cli.as_actor.as_deref(),
)?;
let query_source = resolve_query_source(
&config,
query.as_ref(),
@ -869,29 +731,14 @@ async fn main() -> Result<()> {
"main",
);
let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone()));
let output = if graph.is_remote {
execute_change_remote(
&http_client,
&uri,
let output = client
.mutate(
&branch,
&query_source,
query_name.as_deref(),
&branch,
params_json.as_ref(),
bearer_token.as_deref(),
)
.await?
} else {
execute_change(
&graph,
&query_source,
query_name.as_deref(),
&branch,
params_json.as_ref(),
&config,
cli.as_actor.as_deref(),
)
.await?
};
.await?;
if json {
print_json(&output)?;
} else {