feat(cli)!: unified load command; deprecate ingest as an alias

omnigraph load is now the single data-write command:
- works against remote graphs (POSTs the server's /ingest endpoint with the
  same bearer/actor resolution as other remote commands) — previously load
  was the only data command forced to open Lance storage directly
- --from <base> opts into fork-if-missing for --branch (the former ingest
  semantics); without --from a missing branch is an error, never a fork
- --mode is now required: overwrite is destructive, so there is no implicit
  default (the old silent default was overwrite)
- output gains base_branch/branch_created (and table sums on remote loads)

omnigraph ingest stays as a deprecated alias (defaults preserved: --from
main --mode merge) that prints a one-line warning to stderr, matching the
read/change deprecation convention; removal in a later release.

Docs updated in the same change: cli.md, cli-reference.md, policy.md,
audit.md, execution.md (unified load section), AGENTS.md quick-flow,
README.md.

BREAKING CHANGE: scripts running omnigraph load without --mode must now
pass it explicitly (previously defaulted to the destructive overwrite).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-11 04:18:00 +03:00
parent 90676ef52f
commit fa6af775c1
12 changed files with 342 additions and 68 deletions

View file

@ -89,7 +89,7 @@ enum Command {
#[arg(long)]
force: bool,
},
/// Load data into a graph
/// Load data into a graph (local or remote)
Load {
/// Graph URI
uri: Option<String>,
@ -99,14 +99,21 @@ enum Command {
config: Option<PathBuf>,
#[arg(long)]
data: PathBuf,
/// Target branch (defaults to main). Without --from it must exist.
#[arg(long)]
branch: Option<String>,
#[arg(long, default_value = "overwrite")]
/// Base branch to fork --branch from when it doesn't exist yet.
/// Without this flag a missing branch is an error, never a fork.
#[arg(long)]
from: Option<String>,
/// How existing rows are handled: overwrite | append | merge.
/// Required — overwrite is destructive, so there is no default.
#[arg(long)]
mode: CliLoadMode,
#[arg(long)]
json: bool,
},
/// Ingest data into a reviewable named branch
/// Deprecated alias of `load --from <base>` (defaults: --mode merge, --from main)
Ingest {
/// Graph URI
uri: Option<String>,
@ -686,16 +693,55 @@ impl CliLoadMode {
}
#[derive(Debug, Serialize)]
struct LoadOutput<'a> {
uri: &'a str,
branch: &'a str,
mode: &'a str,
struct LoadOutput {
uri: String,
branch: String,
mode: &'static str,
/// Present only when `--from` was given; echoes the requested base.
#[serde(skip_serializing_if = "Option::is_none")]
base_branch: Option<String>,
branch_created: bool,
nodes_loaded: usize,
edges_loaded: usize,
node_types_loaded: usize,
edge_types_loaded: usize,
}
/// Map a remote `/ingest` response onto the CLI's load output. Table keys
/// carry `node:`/`edge:` prefixes, so the per-kind sums are derivable
/// client-side without the catalog.
fn load_output_from_tables(
uri: &str,
branch: &str,
mode: CliLoadMode,
output: &IngestOutput,
) -> LoadOutput {
let mut nodes_loaded = 0;
let mut edges_loaded = 0;
let mut node_types_loaded = 0;
let mut edge_types_loaded = 0;
for table in &output.tables {
if table.table_key.starts_with("node:") {
nodes_loaded += table.rows_loaded;
node_types_loaded += 1;
} else if table.table_key.starts_with("edge:") {
edges_loaded += table.rows_loaded;
edge_types_loaded += 1;
}
}
LoadOutput {
uri: uri.to_string(),
branch: branch.to_string(),
mode: mode.as_str(),
base_branch: output.base_branch.clone(),
branch_created: output.branch_created,
nodes_loaded,
edges_loaded,
node_types_loaded,
edge_types_loaded,
}
}
#[derive(Debug, Serialize)]
struct SchemaPlanOutput<'a> {
uri: &'a str,
@ -1561,25 +1607,22 @@ fn merged_params_json(
}
}
fn print_load_human(
uri: &str,
branch: &str,
mode: CliLoadMode,
nodes_loaded: usize,
edges_loaded: usize,
node_types_loaded: usize,
edge_types_loaded: usize,
) {
fn print_load_human(payload: &LoadOutput) {
println!(
"loaded {} on branch {} with {}: {} nodes across {} node types, {} edges across {} edge types",
uri,
branch,
mode.as_str(),
nodes_loaded,
node_types_loaded,
edges_loaded,
edge_types_loaded
payload.uri,
payload.branch,
payload.mode,
payload.nodes_loaded,
payload.node_types_loaded,
payload.edges_loaded,
payload.edge_types_loaded
);
if payload.branch_created {
if let Some(base) = &payload.base_branch {
println!("branch {} created from {}", payload.branch, base);
}
}
}
fn print_ingest_human(output: &IngestOutput) {
@ -2659,39 +2702,60 @@ async fn main() -> Result<()> {
config,
data,
branch,
from,
mode,
json,
} => {
let config = load_cli_config(config.as_ref())?;
let graph = resolve_local_graph(&config, uri, target.as_deref(), "load")?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let branch = resolve_branch(&config, branch, None, "main");
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
let result = db
.load_file_as(&branch, None, &data.to_string_lossy(), mode.into(), actor)
let payload = if graph.is_remote {
let data = fs::read_to_string(&data)?;
let output = remote_json::<IngestOutput>(
&http_client,
Method::POST,
remote_url(&uri, "/ingest"),
Some(serde_json::to_value(IngestRequest {
branch: Some(branch.clone()),
from: from.clone(),
mode: Some(mode.into()),
data,
})?),
bearer_token.as_deref(),
)
.await?;
let payload = LoadOutput {
uri: &uri,
branch: &branch,
mode: mode.as_str(),
nodes_loaded: result.nodes_loaded.values().sum(),
edges_loaded: result.edges_loaded.values().sum(),
node_types_loaded: result.nodes_loaded.len(),
edge_types_loaded: result.edges_loaded.len(),
load_output_from_tables(&uri, &branch, mode, &output)
} else {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
let result = db
.load_file_as(
&branch,
from.as_deref(),
&data.to_string_lossy(),
mode.into(),
actor,
)
.await?;
LoadOutput {
uri: uri.clone(),
branch: branch.clone(),
mode: mode.as_str(),
base_branch: result.base_branch.clone(),
branch_created: result.branch_created,
nodes_loaded: result.nodes_loaded.values().sum(),
edges_loaded: result.edges_loaded.values().sum(),
node_types_loaded: result.nodes_loaded.len(),
edge_types_loaded: result.edges_loaded.len(),
}
};
if json {
print_json(&payload)?;
} else {
print_load_human(
&uri,
&branch,
mode,
payload.nodes_loaded,
payload.edges_loaded,
payload.node_types_loaded,
payload.edge_types_loaded,
);
print_load_human(&payload);
}
}
Command::Ingest {
@ -2704,6 +2768,11 @@ async fn main() -> Result<()> {
mode,
json,
} => {
// stderr so `--json` consumers reading stdout are unaffected.
eprintln!(
"warning: `omnigraph ingest` is deprecated and will be removed in a future release; \
use `omnigraph load --from <base> --mode <mode>` (ingest defaults: --from main --mode merge)"
);
let config = load_cli_config(config.as_ref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;

View file

@ -2650,6 +2650,8 @@ fn load_json_outputs_summary_for_main_branch() {
let output = output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg("--json")
@ -2984,7 +2986,15 @@ fn read_alias_uses_alias_target_without_cli_default_and_accepts_url_like_arg() {
&data,
r#"{"type":"Person","data":{"name":"https://example.com","age":30}}"#,
);
output_success(cli().arg("load").arg("--data").arg(&data).arg(&graph));
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(&graph),
);
write_query_file(
&query,
&std::fs::read_to_string(fixture("test.gq")).unwrap(),
@ -3748,6 +3758,8 @@ fn cli_fails_for_missing_schema_or_data_file() {
let load_output = output_failure(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&missing_data)
.arg(&graph),

View file

@ -93,7 +93,15 @@ pub fn init_graph(graph: &Path) {
pub fn load_fixture(graph: &Path) {
let data = fixture("test.jsonl");
output_success(cli().arg("load").arg("--data").arg(&data).arg(graph));
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(graph),
);
}
pub fn write_jsonl(path: &Path, rows: &str) {

View file

@ -221,6 +221,8 @@ fn local_cli_end_to_end_init_load_read_change_read_flow() {
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(fixture("test.jsonl"))
.arg(graph.path()),
@ -397,7 +399,7 @@ fn local_cli_ingest_creates_review_branch_and_keeps_it_readable() {
{"type":"Person","data":{"name":"Bob","age":26}}"#,
);
let ingest_payload = parse_stdout_json(&output_success(
let ingest_output = output_success(
cli()
.arg("ingest")
.arg("--data")
@ -406,7 +408,13 @@ fn local_cli_ingest_creates_review_branch_and_keeps_it_readable() {
.arg("feature-ingest")
.arg(graph.path())
.arg("--json"),
));
);
// The deprecation warning goes to stderr so --json stdout stays clean.
assert!(
String::from_utf8_lossy(&ingest_output.stderr).contains("deprecated"),
"ingest must warn about its deprecation on stderr"
);
let ingest_payload = parse_stdout_json(&ingest_output);
assert_eq!(ingest_payload["branch"], "feature-ingest");
assert_eq!(ingest_payload["base_branch"], "main");
assert_eq!(ingest_payload["branch_created"], true);
@ -459,6 +467,88 @@ fn local_cli_ingest_creates_review_branch_and_keeps_it_readable() {
assert_eq!(bob["rows"][0]["p.age"], 26);
}
/// The unified `load` subsumes ingest: `--from` opts into fork-if-missing,
/// while without it a missing branch is an error — never an implicit fork.
#[test]
fn local_cli_load_from_forks_branch_and_missing_branch_errors_without_from() {
let graph = SystemGraph::loaded();
let extra = graph.write_jsonl(
"system-local-load-from.jsonl",
r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#,
);
// Without --from, a missing branch must fail and create nothing.
let failure = output_failure(
cli()
.arg("load")
.arg("--mode")
.arg("merge")
.arg("--data")
.arg(&extra)
.arg("--branch")
.arg("feature-load")
.arg(graph.path()),
);
assert!(
String::from_utf8_lossy(&failure.stderr).contains("feature-load"),
"error should name the missing branch"
);
// With --from, the branch is forked and the load lands on it.
let payload = parse_stdout_json(&output_success(
cli()
.arg("load")
.arg("--mode")
.arg("merge")
.arg("--data")
.arg(&extra)
.arg("--branch")
.arg("feature-load")
.arg("--from")
.arg("main")
.arg(graph.path())
.arg("--json"),
));
assert_eq!(payload["branch"], "feature-load");
assert_eq!(payload["base_branch"], "main");
assert_eq!(payload["branch_created"], true);
assert_eq!(payload["mode"], "merge");
assert_eq!(payload["nodes_loaded"], 1);
let snapshot = parse_stdout_json(&output_success(
cli()
.arg("snapshot")
.arg(graph.path())
.arg("--branch")
.arg("feature-load")
.arg("--json"),
));
assert_eq!(snapshot["branch"], "feature-load");
}
/// `--mode` is required: overwrite is destructive, so the unified `load`
/// has no implicit default.
#[test]
fn local_cli_load_requires_mode_flag() {
let graph = SystemGraph::loaded();
let extra = graph.write_jsonl(
"system-local-load-no-mode.jsonl",
r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#,
);
let failure = output_failure(
cli()
.arg("load")
.arg("--data")
.arg(&extra)
.arg(graph.path()),
);
assert!(
String::from_utf8_lossy(&failure.stderr).contains("--mode"),
"clap should demand the missing --mode flag"
);
}
#[test]
fn local_cli_export_round_trips_full_branch_graph() {
let graph = SystemGraph::loaded();
@ -512,6 +602,8 @@ fn local_cli_export_round_trips_full_branch_graph() {
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&export_path)
.arg(&imported_graph),
@ -610,6 +702,8 @@ policy: {{}}
cli()
.current_dir(query_root)
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(fixture("test.jsonl"))
.arg(&graph_uri),
@ -867,7 +961,15 @@ query get_task($slug: String) {
);
output_success(cli().arg("init").arg("--schema").arg(&schema).arg(&graph));
output_success(cli().arg("load").arg("--data").arg(&data).arg(&graph));
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(&graph),
);
let filtered = parse_stdout_json(&output_success(
cli()
@ -997,7 +1099,15 @@ query vector_search($q: String) {
);
output_success(cli().arg("init").arg("--schema").arg(&schema).arg(&graph));
output_success(cli().arg("load").arg("--data").arg(&data).arg(&graph));
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(&graph),
);
let result = parse_stdout_json(&output_success(
cli()
@ -1221,6 +1331,8 @@ fn local_cli_load_enforces_engine_layer_policy() {
.arg("--as")
.arg("act-bruno")
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--config")
.arg(&config)
.arg("--data")
@ -1239,6 +1351,8 @@ fn local_cli_load_enforces_engine_layer_policy() {
.arg("--as")
.arg("act-ragnor")
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--config")
.arg(&config)
.arg("--data")
@ -1684,6 +1798,8 @@ graphs:
std::fs::write(&data, "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n").unwrap();
let output = cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(temp.path().join("graphs/knowledge.omni"))
@ -1796,6 +1912,8 @@ fn seed_graph(dir: &std::path::Path, graph: &str, row: &str) {
std::fs::write(&data, row).unwrap();
let output = cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(dir.join(format!("graphs/{graph}.omni")))

View file

@ -652,6 +652,8 @@ query add_friend($from: String, $to: String) {
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&export_path)
.arg(&imported_graph),
@ -755,6 +757,71 @@ fn remote_ingest_creates_review_branch_and_keeps_it_readable() {
assert_eq!(zoe["rows"][0]["p.name"], "Zoe");
}
/// The unified `load` works against remote graphs through the server's
/// `/ingest` endpoint: without `--from` a missing branch is a hard error
/// (no implicit fork), with `--from` it forks like ingest did.
#[test]
#[ignore = "requires loopback socket permissions in sandboxed runners"]
fn remote_load_round_trips_and_requires_from_for_new_branches() {
let graph = SystemGraph::loaded();
let server = graph.spawn_server();
let config = graph.write_config("omnigraph.yaml", &remote_yaml_config(&server.base_url));
let extra = graph.write_jsonl(
"system-remote-load.jsonl",
r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#,
);
// Missing branch without --from: refused remotely, nothing created.
let failure = output_failure(
cli()
.arg("load")
.arg("--config")
.arg(&config)
.arg("--mode")
.arg("merge")
.arg("--data")
.arg(&extra)
.arg("--branch")
.arg("feature-load"),
);
assert!(
String::from_utf8_lossy(&failure.stderr).contains("feature-load"),
"error should name the missing branch"
);
// With --from, the remote load forks and lands the rows.
let payload = parse_stdout_json(&output_success(
cli()
.arg("load")
.arg("--config")
.arg(&config)
.arg("--mode")
.arg("merge")
.arg("--data")
.arg(&extra)
.arg("--branch")
.arg("feature-load")
.arg("--from")
.arg("main")
.arg("--json"),
));
assert_eq!(payload["branch"], "feature-load");
assert_eq!(payload["base_branch"], "main");
assert_eq!(payload["branch_created"], true);
assert_eq!(payload["nodes_loaded"], 1);
let snapshot = parse_stdout_json(&output_success(
cli()
.arg("snapshot")
.arg("--config")
.arg(&config)
.arg("--branch")
.arg("feature-load")
.arg("--json"),
));
assert_eq!(snapshot["branch"], "feature-load");
}
#[test]
#[ignore = "requires loopback socket permissions in sandboxed runners"]
fn remote_ingest_reuses_existing_branch_and_merges_updates() {