Merge pull request #184 from ModernRelay/refactor/load-ingest-unification

refactor: unify load/ingest — load survives, ingest deprecated
This commit is contained in:
Andrew Altshuler 2026-06-11 04:43:36 +03:00 committed by GitHub
commit 328bfef6fb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 645 additions and 175 deletions

View file

@ -200,9 +200,8 @@ omnigraph init --schema ./schema.pg s3://my-bucket/graph.omni
# Bulk load
omnigraph load --data ./seed.jsonl --mode overwrite s3://my-bucket/graph.omni
# Branch + ingest a review batch
omnigraph branch create --from main review/2026-04-25 s3://my-bucket/graph.omni
omnigraph ingest --branch review/2026-04-25 --data ./batch.jsonl s3://my-bucket/graph.omni
# Load a review batch onto its own branch (--from forks it if missing)
omnigraph load --branch review/2026-04-25 --from main --mode merge --data ./batch.jsonl s3://my-bucket/graph.omni
# Run a hybrid (vector + BM25) query
omnigraph read --query ./queries.gq --name find_similar \
@ -258,7 +257,7 @@ omnigraph policy explain --actor act-alice --action change --branch main
| Per-query atomic writes | — | In-memory `MutationStaging.pending` accumulator + `stage_*` / `commit_staged` per touched table at end-of-query + publisher CAS via `commit_with_expected` (single manifest commit per `mutate_as` / `load`); D₂ parse-time rule keeps inserts/updates and deletes from mixing |
| Three-way row-level merge | — | `OrderedTableCursor` + `StagedTableWriter`, structured `MergeConflictKind` |
| Change feeds | — | `diff_between` / `diff_commits` with manifest fast path + ID streaming |
| Cedar policy | — | Per-graph actions plus server-scoped actions (see [docs/user/policy.md](docs/user/policy.md) for the current list), branch / target_branch / protected scopes, validate/test/explain CLI. **Engine-wide enforcement** (MR-722): every `_as` writer (`apply_schema_as`, `mutate_as`, `load_as`, `ingest_as`, `branch_create_as` / `branch_create_from_as`, `branch_delete_as`, `branch_merge_as`) calls `Omnigraph::enforce(action, scope, actor)` — HTTP, CLI, embedded SDK all hit the same gate. |
| Cedar policy | — | Per-graph actions plus server-scoped actions (see [docs/user/policy.md](docs/user/policy.md) for the current list), branch / target_branch / protected scopes, validate/test/explain CLI. **Engine-wide enforcement** (MR-722): every `_as` writer (`apply_schema_as`, `mutate_as`, `load_as` — the deprecated `ingest_as` shims route through it — `branch_create_as` / `branch_create_from_as`, `branch_delete_as`, `branch_merge_as`) calls `Omnigraph::enforce(action, scope, actor)` — HTTP, CLI, embedded SDK all hit the same gate. |
| HTTP server | — | Axum, OpenAPI via utoipa, bearer auth (SHA-256, AWS Secrets Manager option), `authorize_request` at the HTTP boundary (resolves bearer→actor, applies admission control), NDJSON streaming export, **multi-graph mode (v0.6.0+) with cluster routes + read-only `GET /graphs` enumeration + per-graph + server-level Cedar policies. Add/remove graphs by editing `omnigraph.yaml` and restarting.** |
| CLI with config | — | `omnigraph.yaml`, aliases, multi-format output (json/jsonl/csv/kv/table) |
| Audit / actor tracking | — | `_as` write APIs + actor map in commit graph |

View file

@ -88,7 +88,7 @@ omnigraph branch create --from main feature-x ./graph.omni
omnigraph branch merge feature-x --into main ./graph.omni
```
See [docs/user/cli.md](docs/user/cli.md) for schema apply, snapshots, ingest, commits, and policy commands.
See [docs/user/cli.md](docs/user/cli.md) for schema apply, snapshots, data loading, commits, and policy commands.
## Clients
@ -132,7 +132,7 @@ Notes:
- `crates/omnigraph-compiler`: shared schema/query parser, typechecker, catalog, and IR lowering
- `crates/omnigraph`: storage/runtime, branching, merge, change detection, and query execution
- `crates/omnigraph-cli`: CLI for graph lifecycle (init/load/ingest), query/mutate, branch/commit/merge, schema/lint, snapshot/export, policy, and maintenance (optimize/cleanup)
- `crates/omnigraph-cli`: CLI for graph lifecycle (init/load), query/mutate, branch/commit/merge, schema/lint, snapshot/export, policy, and maintenance (optimize/cleanup)
- `crates/omnigraph-server`: Axum HTTP server for remote reads, changes, ingest, export, branches, and commits
## Contributing

View file

@ -89,7 +89,7 @@ enum Command {
#[arg(long)]
force: bool,
},
/// Load data into a graph
/// Load data into a graph (local or remote)
Load {
/// Graph URI
uri: Option<String>,
@ -99,14 +99,21 @@ enum Command {
config: Option<PathBuf>,
#[arg(long)]
data: PathBuf,
/// Target branch (defaults to main). Without --from it must exist.
#[arg(long)]
branch: Option<String>,
#[arg(long, default_value = "overwrite")]
/// Base branch to fork --branch from when it doesn't exist yet.
/// Without this flag a missing branch is an error, never a fork.
#[arg(long)]
from: Option<String>,
/// How existing rows are handled: overwrite | append | merge.
/// Required — overwrite is destructive, so there is no default.
#[arg(long)]
mode: CliLoadMode,
#[arg(long)]
json: bool,
},
/// Ingest data into a reviewable named branch
/// Deprecated alias of `load --from <base>` (defaults: --mode merge, --from main)
Ingest {
/// Graph URI
uri: Option<String>,
@ -686,16 +693,55 @@ impl CliLoadMode {
}
#[derive(Debug, Serialize)]
struct LoadOutput<'a> {
uri: &'a str,
branch: &'a str,
mode: &'a str,
struct LoadOutput {
uri: String,
branch: String,
mode: &'static str,
/// Present only when `--from` was given; echoes the requested base.
#[serde(skip_serializing_if = "Option::is_none")]
base_branch: Option<String>,
branch_created: bool,
nodes_loaded: usize,
edges_loaded: usize,
node_types_loaded: usize,
edge_types_loaded: usize,
}
/// Map a remote `/ingest` response onto the CLI's load output. Table keys
/// carry `node:`/`edge:` prefixes, so the per-kind sums are derivable
/// client-side without the catalog.
fn load_output_from_tables(
uri: &str,
branch: &str,
mode: CliLoadMode,
output: &IngestOutput,
) -> LoadOutput {
let mut nodes_loaded = 0;
let mut edges_loaded = 0;
let mut node_types_loaded = 0;
let mut edge_types_loaded = 0;
for table in &output.tables {
if table.table_key.starts_with("node:") {
nodes_loaded += table.rows_loaded;
node_types_loaded += 1;
} else if table.table_key.starts_with("edge:") {
edges_loaded += table.rows_loaded;
edge_types_loaded += 1;
}
}
LoadOutput {
uri: uri.to_string(),
branch: branch.to_string(),
mode: mode.as_str(),
base_branch: output.base_branch.clone(),
branch_created: output.branch_created,
nodes_loaded,
edges_loaded,
node_types_loaded,
edge_types_loaded,
}
}
#[derive(Debug, Serialize)]
struct SchemaPlanOutput<'a> {
uri: &'a str,
@ -1561,25 +1607,22 @@ fn merged_params_json(
}
}
fn print_load_human(
uri: &str,
branch: &str,
mode: CliLoadMode,
nodes_loaded: usize,
edges_loaded: usize,
node_types_loaded: usize,
edge_types_loaded: usize,
) {
fn print_load_human(payload: &LoadOutput) {
println!(
"loaded {} on branch {} with {}: {} nodes across {} node types, {} edges across {} edge types",
uri,
branch,
mode.as_str(),
nodes_loaded,
node_types_loaded,
edges_loaded,
edge_types_loaded
payload.uri,
payload.branch,
payload.mode,
payload.nodes_loaded,
payload.node_types_loaded,
payload.edges_loaded,
payload.edge_types_loaded
);
if payload.branch_created {
if let Some(base) = &payload.base_branch {
println!("branch {} created from {}", payload.branch, base);
}
}
}
fn print_ingest_human(output: &IngestOutput) {
@ -1587,7 +1630,7 @@ fn print_ingest_human(output: &IngestOutput) {
"ingested {} into branch {} from {} with {} ({})",
output.uri,
output.branch,
output.base_branch,
output.base_branch.as_deref().unwrap_or("main"),
output.mode.as_str(),
if output.branch_created {
"branch created"
@ -2659,39 +2702,60 @@ async fn main() -> Result<()> {
config,
data,
branch,
from,
mode,
json,
} => {
let config = load_cli_config(config.as_ref())?;
let graph = resolve_local_graph(&config, uri, target.as_deref(), "load")?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let branch = resolve_branch(&config, branch, None, "main");
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
let result = db
.load_file_as(&branch, &data.to_string_lossy(), mode.into(), actor)
let payload = if graph.is_remote {
let data = fs::read_to_string(&data)?;
let output = remote_json::<IngestOutput>(
&http_client,
Method::POST,
remote_url(&uri, "/ingest"),
Some(serde_json::to_value(IngestRequest {
branch: Some(branch.clone()),
from: from.clone(),
mode: Some(mode.into()),
data,
})?),
bearer_token.as_deref(),
)
.await?;
let payload = LoadOutput {
uri: &uri,
branch: &branch,
mode: mode.as_str(),
nodes_loaded: result.nodes_loaded.values().sum(),
edges_loaded: result.edges_loaded.values().sum(),
node_types_loaded: result.nodes_loaded.len(),
edge_types_loaded: result.edges_loaded.len(),
load_output_from_tables(&uri, &branch, mode, &output)
} else {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
let result = db
.load_file_as(
&branch,
from.as_deref(),
&data.to_string_lossy(),
mode.into(),
actor,
)
.await?;
LoadOutput {
uri: uri.clone(),
branch: branch.clone(),
mode: mode.as_str(),
base_branch: result.base_branch.clone(),
branch_created: result.branch_created,
nodes_loaded: result.nodes_loaded.values().sum(),
edges_loaded: result.edges_loaded.values().sum(),
node_types_loaded: result.nodes_loaded.len(),
edge_types_loaded: result.edges_loaded.len(),
}
};
if json {
print_json(&payload)?;
} else {
print_load_human(
&uri,
&branch,
mode,
payload.nodes_loaded,
payload.edges_loaded,
payload.node_types_loaded,
payload.edge_types_loaded,
);
print_load_human(&payload);
}
}
Command::Ingest {
@ -2704,6 +2768,11 @@ async fn main() -> Result<()> {
mode,
json,
} => {
// stderr so `--json` consumers reading stdout are unaffected.
eprintln!(
"warning: `omnigraph ingest` is deprecated and will be removed in a future release; \
use `omnigraph load --from <base> --mode <mode>` (ingest defaults: --from main --mode merge)"
);
let config = load_cli_config(config.as_ref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
@ -2730,7 +2799,7 @@ async fn main() -> Result<()> {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
let result = db
.ingest_file_as(
.load_file_as(
&branch,
Some(&from),
&data.to_string_lossy(),
@ -2738,7 +2807,7 @@ async fn main() -> Result<()> {
actor,
)
.await?;
ingest_output(&uri, &result, None)
ingest_output(&uri, &result, mode.into(), None)
};
if json {
print_json(&payload)?;

View file

@ -2650,6 +2650,8 @@ fn load_json_outputs_summary_for_main_branch() {
let output = output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg("--json")
@ -2984,7 +2986,15 @@ fn read_alias_uses_alias_target_without_cli_default_and_accepts_url_like_arg() {
&data,
r#"{"type":"Person","data":{"name":"https://example.com","age":30}}"#,
);
output_success(cli().arg("load").arg("--data").arg(&data).arg(&graph));
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(&graph),
);
write_query_file(
&query,
&std::fs::read_to_string(fixture("test.gq")).unwrap(),
@ -3748,6 +3758,8 @@ fn cli_fails_for_missing_schema_or_data_file() {
let load_output = output_failure(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&missing_data)
.arg(&graph),

View file

@ -93,7 +93,15 @@ pub fn init_graph(graph: &Path) {
pub fn load_fixture(graph: &Path) {
let data = fixture("test.jsonl");
output_success(cli().arg("load").arg("--data").arg(&data).arg(graph));
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(graph),
);
}
pub fn write_jsonl(path: &Path, rows: &str) {

View file

@ -221,6 +221,8 @@ fn local_cli_end_to_end_init_load_read_change_read_flow() {
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(fixture("test.jsonl"))
.arg(graph.path()),
@ -397,7 +399,7 @@ fn local_cli_ingest_creates_review_branch_and_keeps_it_readable() {
{"type":"Person","data":{"name":"Bob","age":26}}"#,
);
let ingest_payload = parse_stdout_json(&output_success(
let ingest_output = output_success(
cli()
.arg("ingest")
.arg("--data")
@ -406,7 +408,13 @@ fn local_cli_ingest_creates_review_branch_and_keeps_it_readable() {
.arg("feature-ingest")
.arg(graph.path())
.arg("--json"),
));
);
// The deprecation warning goes to stderr so --json stdout stays clean.
assert!(
String::from_utf8_lossy(&ingest_output.stderr).contains("deprecated"),
"ingest must warn about its deprecation on stderr"
);
let ingest_payload = parse_stdout_json(&ingest_output);
assert_eq!(ingest_payload["branch"], "feature-ingest");
assert_eq!(ingest_payload["base_branch"], "main");
assert_eq!(ingest_payload["branch_created"], true);
@ -459,6 +467,88 @@ fn local_cli_ingest_creates_review_branch_and_keeps_it_readable() {
assert_eq!(bob["rows"][0]["p.age"], 26);
}
/// The unified `load` subsumes ingest: `--from` opts into fork-if-missing,
/// while without it a missing branch is an error — never an implicit fork.
#[test]
fn local_cli_load_from_forks_branch_and_missing_branch_errors_without_from() {
let graph = SystemGraph::loaded();
let extra = graph.write_jsonl(
"system-local-load-from.jsonl",
r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#,
);
// Without --from, a missing branch must fail and create nothing.
let failure = output_failure(
cli()
.arg("load")
.arg("--mode")
.arg("merge")
.arg("--data")
.arg(&extra)
.arg("--branch")
.arg("feature-load")
.arg(graph.path()),
);
assert!(
String::from_utf8_lossy(&failure.stderr).contains("feature-load"),
"error should name the missing branch"
);
// With --from, the branch is forked and the load lands on it.
let payload = parse_stdout_json(&output_success(
cli()
.arg("load")
.arg("--mode")
.arg("merge")
.arg("--data")
.arg(&extra)
.arg("--branch")
.arg("feature-load")
.arg("--from")
.arg("main")
.arg(graph.path())
.arg("--json"),
));
assert_eq!(payload["branch"], "feature-load");
assert_eq!(payload["base_branch"], "main");
assert_eq!(payload["branch_created"], true);
assert_eq!(payload["mode"], "merge");
assert_eq!(payload["nodes_loaded"], 1);
let snapshot = parse_stdout_json(&output_success(
cli()
.arg("snapshot")
.arg(graph.path())
.arg("--branch")
.arg("feature-load")
.arg("--json"),
));
assert_eq!(snapshot["branch"], "feature-load");
}
/// `--mode` is required: overwrite is destructive, so the unified `load`
/// has no implicit default.
#[test]
fn local_cli_load_requires_mode_flag() {
let graph = SystemGraph::loaded();
let extra = graph.write_jsonl(
"system-local-load-no-mode.jsonl",
r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#,
);
let failure = output_failure(
cli()
.arg("load")
.arg("--data")
.arg(&extra)
.arg(graph.path()),
);
assert!(
String::from_utf8_lossy(&failure.stderr).contains("--mode"),
"clap should demand the missing --mode flag"
);
}
#[test]
fn local_cli_export_round_trips_full_branch_graph() {
let graph = SystemGraph::loaded();
@ -512,6 +602,8 @@ fn local_cli_export_round_trips_full_branch_graph() {
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&export_path)
.arg(&imported_graph),
@ -610,6 +702,8 @@ policy: {{}}
cli()
.current_dir(query_root)
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(fixture("test.jsonl"))
.arg(&graph_uri),
@ -867,7 +961,15 @@ query get_task($slug: String) {
);
output_success(cli().arg("init").arg("--schema").arg(&schema).arg(&graph));
output_success(cli().arg("load").arg("--data").arg(&data).arg(&graph));
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(&graph),
);
let filtered = parse_stdout_json(&output_success(
cli()
@ -997,7 +1099,15 @@ query vector_search($q: String) {
);
output_success(cli().arg("init").arg("--schema").arg(&schema).arg(&graph));
output_success(cli().arg("load").arg("--data").arg(&data).arg(&graph));
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(&graph),
);
let result = parse_stdout_json(&output_success(
cli()
@ -1221,6 +1331,8 @@ fn local_cli_load_enforces_engine_layer_policy() {
.arg("--as")
.arg("act-bruno")
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--config")
.arg(&config)
.arg("--data")
@ -1239,6 +1351,8 @@ fn local_cli_load_enforces_engine_layer_policy() {
.arg("--as")
.arg("act-ragnor")
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--config")
.arg(&config)
.arg("--data")
@ -1684,6 +1798,8 @@ graphs:
std::fs::write(&data, "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n").unwrap();
let output = cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(temp.path().join("graphs/knowledge.omni"))
@ -1796,6 +1912,8 @@ fn seed_graph(dir: &std::path::Path, graph: &str, row: &str) {
std::fs::write(&data, row).unwrap();
let output = cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&data)
.arg(dir.join(format!("graphs/{graph}.omni")))

View file

@ -652,6 +652,8 @@ query add_friend($from: String, $to: String) {
output_success(
cli()
.arg("load")
.arg("--mode")
.arg("overwrite")
.arg("--data")
.arg(&export_path)
.arg(&imported_graph),
@ -755,6 +757,71 @@ fn remote_ingest_creates_review_branch_and_keeps_it_readable() {
assert_eq!(zoe["rows"][0]["p.name"], "Zoe");
}
/// The unified `load` works against remote graphs through the server's
/// `/ingest` endpoint: without `--from` a missing branch is a hard error
/// (no implicit fork), with `--from` it forks like ingest did.
#[test]
#[ignore = "requires loopback socket permissions in sandboxed runners"]
fn remote_load_round_trips_and_requires_from_for_new_branches() {
let graph = SystemGraph::loaded();
let server = graph.spawn_server();
let config = graph.write_config("omnigraph.yaml", &remote_yaml_config(&server.base_url));
let extra = graph.write_jsonl(
"system-remote-load.jsonl",
r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#,
);
// Missing branch without --from: refused remotely, nothing created.
let failure = output_failure(
cli()
.arg("load")
.arg("--config")
.arg(&config)
.arg("--mode")
.arg("merge")
.arg("--data")
.arg(&extra)
.arg("--branch")
.arg("feature-load"),
);
assert!(
String::from_utf8_lossy(&failure.stderr).contains("feature-load"),
"error should name the missing branch"
);
// With --from, the remote load forks and lands the rows.
let payload = parse_stdout_json(&output_success(
cli()
.arg("load")
.arg("--config")
.arg(&config)
.arg("--mode")
.arg("merge")
.arg("--data")
.arg(&extra)
.arg("--branch")
.arg("feature-load")
.arg("--from")
.arg("main")
.arg("--json"),
));
assert_eq!(payload["branch"], "feature-load");
assert_eq!(payload["base_branch"], "main");
assert_eq!(payload["branch_created"], true);
assert_eq!(payload["nodes_loaded"], 1);
let snapshot = parse_stdout_json(&output_success(
cli()
.arg("snapshot")
.arg("--config")
.arg(&config)
.arg("--branch")
.arg("feature-load")
.arg("--json"),
));
assert_eq!(snapshot["branch"], "feature-load");
}
#[test]
#[ignore = "requires loopback socket permissions in sandboxed runners"]
fn remote_ingest_reuses_existing_branch_and_merges_updates() {

View file

@ -1,6 +1,6 @@
use omnigraph::db::{GraphCommit, MergeOutcome, ReadTarget, SchemaApplyResult, Snapshot};
use omnigraph::error::{MergeConflict, MergeConflictKind};
use omnigraph::loader::{IngestResult, LoadMode};
use omnigraph::loader::{LoadMode, LoadResult};
use crate::queries::StoredQuery;
use omnigraph_compiler::SchemaMigrationStep;
use omnigraph_compiler::query::ast::Param;
@ -208,7 +208,9 @@ pub struct IngestTableOutput {
pub struct IngestOutput {
pub uri: String,
pub branch: String,
pub base_branch: String,
/// Base branch a fork was requested from (the request's `from`), echoed
/// even when the branch already existed. `null` when `from` was absent.
pub base_branch: Option<String>,
pub branch_created: bool,
#[schema(value_type = LoadModeSchema)]
pub mode: LoadMode,
@ -493,9 +495,12 @@ pub struct SchemaOutput {
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct IngestRequest {
/// Target branch. Created from `from` if it does not yet exist. Defaults to `main`.
/// Target branch. Defaults to `main`. Without `from`, the branch must
/// already exist — a missing branch is a 404, never an implicit fork.
pub branch: Option<String>,
/// Parent branch used to create `branch` if it does not exist. Defaults to `main`.
/// Parent branch used to create `branch` if it does not exist. Branch
/// creation is opt-in by presence of this field; omit it to require an
/// existing branch.
pub from: Option<String>,
/// How existing rows are handled. Defaults to `merge`.
#[schema(value_type = Option<LoadModeSchema>)]
@ -642,18 +647,23 @@ pub fn read_output(query_name: String, target: &ReadTarget, result: QueryResult)
}
}
pub fn ingest_output(uri: &str, result: &IngestResult, actor_id: Option<String>) -> IngestOutput {
pub fn ingest_output(
uri: &str,
result: &LoadResult,
mode: LoadMode,
actor_id: Option<String>,
) -> IngestOutput {
IngestOutput {
uri: uri.to_string(),
branch: result.branch.clone(),
base_branch: result.base_branch.clone(),
branch_created: result.branch_created,
mode: result.mode,
mode,
tables: result
.tables
.iter()
.to_ingest_tables()
.into_iter()
.map(|table| IngestTableOutput {
table_key: table.table_key.clone(),
table_key: table.table_key,
rows_loaded: table.rows_loaded,
})
.collect(),

View file

@ -2663,13 +2663,15 @@ async fn server_schema_apply(
),
security(("bearer_token" = [])),
)]
/// Bulk-ingest NDJSON data into a branch.
/// Bulk-load NDJSON data into a branch.
///
/// `data` is NDJSON with one record per line. `mode` controls behavior on
/// existing rows: `merge` upserts by id (default), `append` blindly inserts,
/// `overwrite` replaces table contents. If `branch` does not exist it is
/// created from `from` (defaults to `main`). **Destructive** when `mode` is
/// `overwrite` or when ingest produces conflicting writes.
/// `overwrite` replaces table contents. Branch creation is opt-in by
/// presence of `from`: with `from` set, a missing `branch` is created from
/// it; without `from`, `branch` must already exist — a missing branch is a
/// 404, never an implicit fork. **Destructive** when `mode` is `overwrite`
/// or when the load produces conflicting writes.
async fn server_ingest(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
@ -2677,7 +2679,7 @@ async fn server_ingest(
Json(request): Json<IngestRequest>,
) -> std::result::Result<Json<IngestOutput>, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let from = request.from.unwrap_or_else(|| "main".to_string());
let from = request.from;
let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
let actor_arc = actor
.as_ref()
@ -2697,15 +2699,25 @@ async fn server_ingest(
};
if !branch_exists {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
branch: Some(from.clone()),
target_branch: Some(branch.clone()),
},
)?;
match from.as_deref() {
// Fork-if-missing is opt-in by presence of `from`; without it a
// typo'd branch name must surface as an error, not silently
// create a fork and land the data there.
None => {
return Err(ApiError::not_found(format!(
"branch '{branch}' not found; pass `from` to create it"
)));
}
Some(from) => authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
branch: Some(from.to_string()),
target_branch: Some(branch.clone()),
},
)?,
}
}
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
@ -2724,7 +2736,7 @@ async fn server_ingest(
let result = {
let db = &handle.engine;
db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
db.load_as(&branch, from.as_deref(), &request.data, mode, actor_id)
.await
.map_err(ApiError::from_omni)?
};
@ -2732,6 +2744,7 @@ async fn server_ingest(
Ok(Json(ingest_output(
handle.uri.as_str(),
&result,
mode,
actor_id.map(str::to_string),
)))
}

View file

@ -2265,6 +2265,77 @@ async fn ingest_existing_branch_skips_branch_create_policy_check() {
assert_eq!(body["base_branch"], "other-base");
}
/// Regression: branch creation is opt-in by presence of `from`. A request
/// without `from` against a branch that doesn't exist must 404 — not
/// silently fork `main` and land the data on the typo'd branch.
#[tokio::test(flavor = "multi_thread")]
async fn ingest_without_from_returns_404_for_missing_branch_and_creates_nothing() {
let (temp, app) = app_for_loaded_graph().await;
let graph = graph_path(temp.path());
let ingest = IngestRequest {
branch: Some("feature-typo".to_string()),
from: None,
mode: Some(LoadMode::Merge),
data: r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#.to_string(),
};
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&ingest).unwrap()))
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
let error: ErrorOutput = serde_json::from_value(body).unwrap();
assert_eq!(error.code, Some(omnigraph_server::api::ErrorCode::NotFound));
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
assert!(
!db.branch_list()
.await
.unwrap()
.contains(&"feature-typo".to_string()),
"a 404'd ingest must not create the branch"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn ingest_without_from_loads_into_existing_branch() {
let (temp, app) = app_for_loaded_graph().await;
let graph = graph_path(temp.path());
{
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
db.branch_create_from(ReadTarget::branch("main"), "feature")
.await
.unwrap();
}
let ingest = IngestRequest {
branch: Some("feature".to_string()),
from: None,
mode: Some(LoadMode::Merge),
data: r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#.to_string(),
};
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&ingest).unwrap()))
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["branch"], "feature");
assert_eq!(body["branch_created"], false);
assert_eq!(body["base_branch"], serde_json::Value::Null);
}
#[tokio::test(flavor = "multi_thread")]
async fn ingest_denies_missing_branch_without_branch_create_permission() {
let (_temp, app) = app_for_loaded_graph_with_auth_tokens_and_policy(
@ -4731,6 +4802,7 @@ async fn build_parity_graph() -> (tempfile::TempDir, PathBuf, PathBuf) {
.unwrap();
db.load_as(
"feature",
None,
r#"{"type":"Person","data":{"name":"ParityEve","age":29}}"#,
LoadMode::Append,
None,

View file

@ -26,6 +26,14 @@ use crate::exec::staging::{MutationStaging, PendingMode};
/// Result of a load operation.
#[derive(Debug, Clone, Default)]
pub struct LoadResult {
/// Branch the load landed on (`"main"` when no branch was given).
pub branch: String,
/// Base branch a fork was requested from (the `base` parameter of
/// `load_as`), recorded verbatim even when the target branch already
/// existed and no fork happened.
pub base_branch: Option<String>,
/// True when this load created `branch` by forking it from `base_branch`.
pub branch_created: bool,
pub nodes_loaded: HashMap<String, usize>,
pub edges_loaded: HashMap<String, usize>,
}
@ -57,21 +65,27 @@ pub enum LoadMode {
Merge,
}
/// Load JSONL data into an Omnigraph database.
pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
/// Convenience: load JSONL data onto the database handle's *active branch*
/// (`main` when unbound). Equivalent to `db.load(active_branch, data, mode)`;
/// use `Omnigraph::load`/`load_as` directly when targeting an explicit branch
/// or when fork-from-base semantics are needed.
pub async fn load_jsonl(db: &Omnigraph, data: &str, mode: LoadMode) -> Result<LoadResult> {
let current_branch = db.active_branch().await;
let branch = current_branch.as_deref().unwrap_or("main");
db.load(branch, data, mode).await
}
/// Load JSONL data from a file path.
pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
/// Convenience: like [`load_jsonl`] but reading from a file path.
pub async fn load_jsonl_file(db: &Omnigraph, path: &str, mode: LoadMode) -> Result<LoadResult> {
let current_branch = db.active_branch().await;
let branch = current_branch.as_deref().unwrap_or("main");
db.load_file(branch, path, mode).await
}
impl Omnigraph {
#[deprecated(
note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
)]
pub async fn ingest(
&self,
branch: &str,
@ -79,9 +93,17 @@ impl Omnigraph {
data: &str,
mode: LoadMode,
) -> Result<IngestResult> {
#[allow(deprecated)]
self.ingest_as(branch, from, data, mode, None).await
}
/// Deprecated shim over the unified `load_as`. Preserves the historical
/// ingest contract exactly: `from: None` means fork from `main`, and the
/// base branch is recorded in the result even when the target branch
/// already existed (no fork happened).
#[deprecated(
note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release"
)]
pub async fn ingest_as(
&self,
branch: &str,
@ -90,22 +112,24 @@ impl Omnigraph {
mode: LoadMode,
actor_id: Option<&str>,
) -> Result<IngestResult> {
// Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
// `Branch(branch)` for the data-write portion. If ingest creates
// a new branch as a side-effect (target branch doesn't exist),
// the inner `branch_create_from_as` call below additionally
// checks `BranchCreate` — both authorities are genuinely needed
// for "ingest into a fresh branch", so the layered check is
// correct, not redundant.
self.enforce(
omnigraph_policy::PolicyAction::Change,
&omnigraph_policy::ResourceScope::Branch(branch.to_string()),
actor_id,
)?;
self.ingest_with_current_actor(branch, from, data, mode, actor_id)
.await
let result = self
.load_as(branch, Some(from.unwrap_or("main")), data, mode, actor_id)
.await?;
Ok(IngestResult {
branch: result.branch.clone(),
base_branch: result
.base_branch
.clone()
.unwrap_or_else(|| "main".to_string()),
branch_created: result.branch_created,
mode,
tables: result.to_ingest_tables(),
})
}
#[deprecated(
note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
)]
pub async fn ingest_file(
&self,
branch: &str,
@ -113,9 +137,13 @@ impl Omnigraph {
path: &str,
mode: LoadMode,
) -> Result<IngestResult> {
#[allow(deprecated)]
self.ingest_file_as(branch, from, path, mode, None).await
}
#[deprecated(
note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release"
)]
pub async fn ingest_file_as(
&self,
branch: &str,
@ -125,69 +153,35 @@ impl Omnigraph {
actor_id: Option<&str>,
) -> Result<IngestResult> {
let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
#[allow(deprecated)]
self.ingest_as(branch, from, &data, mode, actor_id).await
}
async fn ingest_with_current_actor(
&self,
branch: &str,
from: Option<&str>,
data: &str,
mode: LoadMode,
actor_id: Option<&str>,
) -> Result<IngestResult> {
self.ensure_schema_state_valid().await?;
let target_branch =
Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string());
let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))?
.unwrap_or_else(|| "main".to_string());
let branch_created = !self
.branch_list()
.await?
.iter()
.any(|name| name == &target_branch);
if branch_created {
// Thread the actor through to the implicit BranchCreate so
// policy decisions match what an explicit `branch_create_from_as`
// call would see. Calling the no-actor variant here would
// bypass BranchCreate enforcement when policy is installed —
// the footgun guard catches that case too, but threading is
// the correct fix.
self.branch_create_from_as(
crate::db::ReadTarget::branch(&base_branch),
&target_branch,
actor_id,
)
.await?;
}
let result = self.load_as(&target_branch, data, mode, actor_id).await?;
Ok(IngestResult {
branch: target_branch,
base_branch,
branch_created,
mode,
tables: result.to_ingest_tables(),
})
}
pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result<LoadResult> {
self.load_as(branch, data, mode, None).await
self.load_as(branch, None, data, mode, None).await
}
/// Load JSONL data onto `branch`.
///
/// `base` selects the branch-creation behavior: with `Some(base)`, a
/// missing target branch is forked from `base` first (the former
/// `ingest` semantics); with `None`, the target branch must already
/// exist — staging fails on an unknown branch when it resolves the
/// manifest snapshot, so a typo'd branch name can never create one.
pub async fn load_as(
&self,
branch: &str,
base: Option<&str>,
data: &str,
mode: LoadMode,
actor_id: Option<&str>,
) -> Result<LoadResult> {
// Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is
// `Branch(branch)` to match the HTTP-layer Change convention.
// `ingest_as` also calls `load_as` after enforcing its own
// Change gate — that double-check is fine because both gates
// resolve to identical Cedar decisions for the same actor +
// branch (the second check is a structurally-correct no-op).
// When a fork happens below, `branch_create_from_as` additionally
// checks `BranchCreate` — both authorities are genuinely needed
// for "load into a fresh branch", so the layered check is
// correct, not redundant.
self.enforce(
omnigraph_policy::PolicyAction::Change,
&omnigraph_policy::ResourceScope::Branch(branch.to_string()),
@ -205,15 +199,47 @@ impl Omnigraph {
// `commit_prepared_updates_on_branch_with_expected`) and leave
// `self.coordinator` with a stale manifest snapshot.
let requested = Self::normalize_branch_name(branch)?;
let base_branch = match base {
Some(base) => {
Some(Self::normalize_branch_name(base)?.unwrap_or_else(|| "main".to_string()))
}
None => None,
};
// Fork-if-missing only when a base branch was explicitly given.
// `requested == None` is `main`, which always exists.
let mut branch_created = false;
if let (Some(target), Some(base_name)) = (requested.as_deref(), base_branch.as_deref()) {
let exists = self.branch_list().await?.iter().any(|name| name == target);
if !exists {
// Thread the actor through to the implicit BranchCreate so
// policy decisions match what an explicit `branch_create_from_as`
// call would see. Calling the no-actor variant here would
// bypass BranchCreate enforcement when policy is installed —
// the footgun guard catches that case too, but threading is
// the correct fix.
self.branch_create_from_as(
crate::db::ReadTarget::branch(base_name),
target,
actor_id,
)
.await?;
branch_created = true;
}
}
// Direct-to-target writes: no Run state machine, no `__run__` staging
// branch. Cross-table OCC is enforced by the publisher's
// `expected_table_versions` CAS inside `load_jsonl_reader`.
self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
.await
let mut result = self
.load_direct_on_branch(requested.as_deref(), data, mode, actor_id)
.await?;
result.branch = requested.unwrap_or_else(|| "main".to_string());
result.base_branch = base_branch;
result.branch_created = branch_created;
Ok(result)
}
pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result<LoadResult> {
self.load_file_as(branch, path, mode, None).await
self.load_file_as(branch, None, path, mode, None).await
}
/// Read a file into memory and delegate to `load_as`. Used by the
@ -222,12 +248,13 @@ impl Omnigraph {
pub async fn load_file_as(
&self,
branch: &str,
base: Option<&str>,
path: &str,
mode: LoadMode,
actor_id: Option<&str>,
) -> Result<LoadResult> {
let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?;
self.load_as(branch, &data, mode, actor_id).await
let data = std::fs::read_to_string(path).map_err(OmniError::Io)?;
self.load_as(branch, base, &data, mode, actor_id).await
}
async fn load_direct_on_branch(
@ -1824,6 +1851,7 @@ edge WorksAt: Person -> Company
}
#[tokio::test]
#[allow(deprecated)]
async fn test_ingest_creates_branch_and_reports_tables() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -1868,6 +1896,7 @@ edge WorksAt: Person -> Company
}
#[tokio::test]
#[allow(deprecated)]
async fn test_ingest_existing_branch_ignores_from_and_merges_data() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -1942,6 +1971,7 @@ edge WorksAt: Person -> Company
}
#[tokio::test]
#[allow(deprecated)]
async fn test_ingest_as_stamps_actor_on_branch_head_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
@ -1967,6 +1997,68 @@ edge WorksAt: Person -> Company
assert_eq!(head.actor_id.as_deref(), Some("act-andrew"));
}
#[tokio::test]
async fn test_load_as_with_base_forks_missing_branch_and_stamps_metadata() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let result = db
.load_as("feature", Some("main"), TEST_DATA, LoadMode::Merge, None)
.await
.unwrap();
assert_eq!(result.branch, "feature");
assert_eq!(result.base_branch.as_deref(), Some("main"));
assert!(result.branch_created);
assert!(
db.branch_list()
.await
.unwrap()
.contains(&"feature".to_string())
);
// Re-loading onto the now-existing branch records the base but
// performs no fork.
let again = db
.load_as(
"feature",
Some("main"),
r#"{"type":"Person","data":{"name":"Bob","age":26}}"#,
LoadMode::Merge,
None,
)
.await
.unwrap();
assert!(!again.branch_created);
assert_eq!(again.base_branch.as_deref(), Some("main"));
}
#[tokio::test]
async fn test_load_as_without_base_errors_on_missing_branch() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let result = db
.load_as("nonexistent", None, TEST_DATA, LoadMode::Merge, None)
.await;
assert!(result.is_err(), "load without base must not create branches");
assert!(
!db.branch_list()
.await
.unwrap()
.contains(&"nonexistent".to_string()),
"failed load must not leave a branch behind"
);
// Loads to main carry the default branch metadata.
let main_load = db.load("main", TEST_DATA, LoadMode::Overwrite).await.unwrap();
assert_eq!(main_load.branch, "main");
assert_eq!(main_load.base_branch, None);
assert!(!main_load.branch_created);
}
#[test]
fn test_range_constraint_rejects_nan() {
use arrow_array::{Float64Array, RecordBatch, StringArray};

View file

@ -243,6 +243,7 @@ async fn load_as_denies_when_policy_rejects_actor() {
let result = db
.load_as(
"main",
None,
ONE_PERSON_JSONL,
LoadMode::Merge,
Some("act-denied"),
@ -258,6 +259,7 @@ async fn load_as_allows_when_policy_permits_actor() {
db.load_as(
"main",
None,
ONE_PERSON_JSONL,
LoadMode::Merge,
Some("act-allowed"),
@ -281,6 +283,7 @@ async fn load_file_as_denies_when_policy_rejects_actor() {
let result = db
.load_file_as(
"main",
None,
data_path.to_str().unwrap(),
LoadMode::Merge,
Some("act-denied"),
@ -298,6 +301,7 @@ async fn load_file_as_allows_when_policy_permits_actor() {
db.load_file_as(
"main",
None,
data_path.to_str().unwrap(),
LoadMode::Merge,
Some("act-allowed"),
@ -307,6 +311,7 @@ async fn load_file_as_allows_when_policy_permits_actor() {
}
#[tokio::test]
#[allow(deprecated)]
async fn ingest_as_denies_when_policy_rejects_actor() {
let dir = tempfile::tempdir().unwrap();
let (db, _engine) = init_with_policy(&dir).await;
@ -324,6 +329,7 @@ async fn ingest_as_denies_when_policy_rejects_actor() {
}
#[tokio::test]
#[allow(deprecated)]
async fn ingest_as_allows_when_policy_permits_actor() {
let dir = tempfile::tempdir().unwrap();
let (db, _engine) = init_with_policy(&dir).await;

View file

@ -162,18 +162,18 @@ Atomicity guarantee for multi-statement mutations: a mid-query failure leaves La
| Mode | Semantics | Path (post-MR-794) |
|---|---|---|
| `Overwrite` | Replace all data in the target tables on the branch | Inline-commit per type, then publisher CAS at end-of-load. Truncate-then-append doesn't fit the staged shape; documented residual. |
| `Overwrite` | Replace all data in the target tables on the branch | Same accumulator; one `stage_overwrite` + `commit_staged` per touched table at end-of-load (a staged Lance `Operation::Overwrite` transaction — HEAD does not advance until commit; MR-793 Phase 2); publisher CAS. |
| `Append` | Strict insert; duplicates error | In-memory `MutationStaging` accumulator; one `stage_append` + `commit_staged` per touched table at end-of-load; publisher CAS. |
| `Merge` | Upsert by `id` (`merge_insert`) | Same accumulator; one `stage_merge_insert` per touched table at end-of-load (Merge mode dedupes by `id`, last-write-wins); publisher CAS. |
For Append/Merge, a mid-load failure (RI / cardinality violation, validation error) leaves Lance HEAD untouched on the staged tables — the next load on the same tables proceeds normally with no `ExpectedVersionMismatch`. For Overwrite, a mid-load failure can still leave Lance HEAD on a partially-truncated table; the next overwrite replaces it.
For all three modes, a mid-load failure (RI / cardinality violation, validation error) leaves Lance HEAD untouched on the staged tables — the next load on the same tables proceeds normally with no `ExpectedVersionMismatch`.
## `load` vs `ingest`
## `load` and the deprecated `ingest` shims
- `load(branch, data, mode)` — direct load to a branch (single publisher commit per call).
- `ingest(branch, from, data, mode)` — branch-creating wrapper: if `branch` doesn't exist, fork it from `from` (default `main`) via `branch_create_from`, then call `load(branch, data, mode)`.
- Returns `IngestResult { branch, base_branch, branch_created, mode, tables[] }`.
- `ingest_as(actor_id)` records the actor on the resulting commit.
- `load_as(branch, base, data, mode, actor)` — the unified entry (single publisher commit per call). `base: Some(b)` forks a missing `branch` from `b` first (via `branch_create_from_as`, which enforces `BranchCreate`); `base: None` requires the branch to exist — staging fails on an unknown branch, so a typo'd name can never create one.
- `load(branch, data, mode)` — convenience wrapper with `base: None` and no actor.
- Returns `LoadResult { branch, base_branch, branch_created, nodes_loaded, edges_loaded }`.
- `ingest{,_as,_file,_file_as}` are `#[deprecated]` shims over `load_as` preserving the historical contract (`from: None` forks from `main`; returns `IngestResult`); they are slated for removal. The CLI `ingest` command is a deprecated alias of `load --from <base>`.
## Embeddings during load

View file

@ -1,7 +1,7 @@
# Audit / Actor tracking
- `Omnigraph::audit_actor_id: Option<String>` is the actor in effect.
- `_as` variants of every write API let callers override the actor: `mutate_as`, `ingest_as`, `branch_merge_as`, `apply_schema_as`, etc.
- `_as` variants of every write API let callers override the actor: `mutate_as`, `load_as`, `branch_merge_as`, `apply_schema_as`, etc.
- Actor IDs are persisted on `GraphCommit.actor_id` with split storage in `_graph_commit_actors.lance` (the commit graph is split into `_graph_commits.lance` for the linkage and `_graph_commit_actors.lance` for the actor map).
- HTTP server uses the bearer-token actor automatically; CLI uses the local user / explicit env (no implicit actor).
- Pre-v0.4.0 graphs also stored actor IDs on `RunRecord.actor_id` in `_graph_runs.lance` / `_graph_run_actors.lance`. The Run state machine was removed in MR-771; those files are inert post-v0.4.0. The v2→v3 manifest migration sweeps any stale `__run__*` branches on first write-open (MR-770); the inert dataset bytes remain until a `delete_prefix` primitive lands.

View file

@ -9,8 +9,8 @@ Top-level command families and subcommands. Graph-targeting commands accept eith
| Command | Purpose |
|---|---|
| `init` | `--schema <pg>` → initialize a graph (also scaffolds `omnigraph.yaml` if missing) |
| `load` | bulk load a branch (`--mode overwrite\|append\|merge`) |
| `ingest` | branch-creating transactional load (`--from <base>`) |
| `load` | bulk load a branch, local or remote (`--mode overwrite\|append\|merge` is **required** — overwrite is destructive, so there is no default). Without `--from` the target branch must exist; `--from <base>` forks a missing `--branch` from `<base>` first |
| `ingest` | deprecated alias of `load --from <base>` (defaults: `--from main --mode merge`); prints a one-line warning to stderr |
| `query` (alias: `read`) | run named read query; source via `--query <path>`, `-e`/`--query-string <GQ>`, or `--alias <name>` (exactly one). `read` is the deprecated previous name and prints a one-line warning to stderr |
| `mutate` (alias: `change`) | run mutation query; same `--query` / `-e` / `--alias` mutual-exclusion as `query`. `change` is the deprecated previous name and prints a one-line warning to stderr |
| `snapshot` | print current snapshot (per-table version + row count) |

View file

@ -42,7 +42,7 @@ omnigraph branch create --uri graph.omni --from main feature-x
omnigraph branch list --uri graph.omni
omnigraph branch merge --uri graph.omni feature-x --into main
omnigraph ingest --data batch.jsonl --branch review/import-2026-04-09 graph.omni
omnigraph load --data batch.jsonl --branch review/import-2026-04-09 --from main --mode merge graph.omni
omnigraph export graph.omni --branch main --type Person > people.jsonl
omnigraph commit list graph.omni --branch main --json
omnigraph commit show --uri graph.omni <commit-id> --json

View file

@ -105,12 +105,13 @@ is validated/tested/explained as the anonymous policy.
- `omnigraph policy validate` — parse + count actors, exit 1 on parse error.
- `omnigraph policy test` — run cases in `policy.tests.yaml`, exit 1 on any expectation mismatch.
- `omnigraph policy explain --actor … --action … [--branch …] [--target-branch …]` — show decision and matched rule.
- `omnigraph --as <ACTOR> <subcommand>` — set the actor for the duration of one invocation. Effective for `change`, `load`, `ingest`, `branch create|delete|merge`, and `schema apply` against local URIs. No-op against remote HTTP URIs (actor is bearer-token-resolved server-side).
- `omnigraph --as <ACTOR> <subcommand>` — set the actor for the duration of one invocation. Effective for `change`, `load` (and its deprecated `ingest` alias), `branch create|delete|merge`, and `schema apply` against local URIs. No-op against remote HTTP URIs (actor is bearer-token-resolved server-side).
## Enforcement
Policy is a property of the **engine**, not the transport. Every mutating
write — `mutate_as`, `load_as`, `ingest_as`, `apply_schema_as`,
write — `mutate_as`, `load_as` (the deprecated `ingest_as` shims route
through it), `apply_schema_as`,
`branch_create_as`, `branch_create_from_as`, `branch_delete_as`,
`branch_merge_as` — calls `Omnigraph::enforce(action, scope, actor)` at
the head of the method. The gate fires identically whether the call

View file

@ -56,7 +56,7 @@ Per-graph endpoints — same body shape across modes; URLs differ:
| POST | `/queries/{name}` | `/graphs/{id}/queries/{name}` | bearer + `invoke_query` (+ `change` for a stored mutation) | invoke a named query from the `queries:` registry; deny == 404 | `server_invoke_query` |
| GET | `/schema` | `/graphs/{id}/schema` | bearer + `read` | get current `.pg` source | `server_schema_get` |
| POST | `/schema/apply` | `/graphs/{id}/schema/apply` | bearer + `schema_apply` (target=`main`) | migrate | `server_schema_apply` |
| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (if new) + `change` | bulk load | `server_ingest` (32 MB body limit) |
| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (only when `from` is set and the branch is created) + `change` | bulk load; branch creation is opt-in via `from` — without it a missing `branch` is a 404, never an implicit fork | `server_ingest` (32 MB body limit) |
| GET | `/branches` | `/graphs/{id}/branches` | bearer + `read` | list branches | `server_branch_list` |
| POST | `/branches` | `/graphs/{id}/branches` | bearer + `branch_create` | create | `server_branch_create` |
| DELETE | `/branches/{branch}` | `/graphs/{id}/branches/{branch}` | bearer + `branch_delete` | delete | `server_branch_delete` |

View file

@ -670,8 +670,8 @@
"tags": [
"mutations"
],
"summary": "Bulk-ingest NDJSON data into a branch.",
"description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. If `branch` does not exist it is\ncreated from `from` (defaults to `main`). **Destructive** when `mode` is\n`overwrite` or when ingest produces conflicting writes.",
"summary": "Bulk-load NDJSON data into a branch.",
"description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. Branch creation is opt-in by\npresence of `from`: with `from` set, a missing `branch` is created from\nit; without `from`, `branch` must already exist — a missing branch is a\n404, never an implicit fork. **Destructive** when `mode` is `overwrite`\nor when the load produces conflicting writes.",
"operationId": "ingest",
"requestBody": {
"content": {
@ -1710,7 +1710,6 @@
"required": [
"uri",
"branch",
"base_branch",
"branch_created",
"mode",
"tables"
@ -1723,7 +1722,11 @@
]
},
"base_branch": {
"type": "string"
"type": [
"string",
"null"
],
"description": "Base branch a fork was requested from (the request's `from`), echoed\neven when the branch already existed. `null` when `from` was absent."
},
"branch": {
"type": "string"
@ -1756,7 +1759,7 @@
"string",
"null"
],
"description": "Target branch. Created from `from` if it does not yet exist. Defaults to `main`."
"description": "Target branch. Defaults to `main`. Without `from`, the branch must\nalready exist — a missing branch is a 404, never an implicit fork."
},
"data": {
"type": "string",
@ -1768,7 +1771,7 @@
"string",
"null"
],
"description": "Parent branch used to create `branch` if it does not exist. Defaults to `main`."
"description": "Parent branch used to create `branch` if it does not exist. Branch\ncreation is opt-in by presence of this field; omit it to require an\nexisting branch."
},
"mode": {
"oneOf": [