From 43d4e89fdea41ba68d76a0dd2c368c39deafd6b0 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 03:44:02 +0300 Subject: [PATCH 1/5] docs(execution): Overwrite loads are staged since MR-793, not inline-commit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The LoadMode table still described Overwrite as an inline-commit-per-type residual with a partial-truncation failure window. Since MR-793 Phase 2, Overwrite goes through the same MutationStaging accumulator as Append/Merge, staged as a Lance Operation::Overwrite transaction via stage_overwrite (table_store.rs) and committed with commit_staged + publisher CAS — a mid-load failure leaves Lance HEAD untouched in all three modes. Co-Authored-By: Claude Fable 5 --- docs/dev/execution.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/dev/execution.md b/docs/dev/execution.md index a3a9c01..9753696 100644 --- a/docs/dev/execution.md +++ b/docs/dev/execution.md @@ -162,11 +162,11 @@ Atomicity guarantee for multi-statement mutations: a mid-query failure leaves La | Mode | Semantics | Path (post-MR-794) | |---|---|---| -| `Overwrite` | Replace all data in the target tables on the branch | Inline-commit per type, then publisher CAS at end-of-load. Truncate-then-append doesn't fit the staged shape; documented residual. | +| `Overwrite` | Replace all data in the target tables on the branch | Same accumulator; one `stage_overwrite` + `commit_staged` per touched table at end-of-load (a staged Lance `Operation::Overwrite` transaction — HEAD does not advance until commit; MR-793 Phase 2); publisher CAS. | | `Append` | Strict insert; duplicates error | In-memory `MutationStaging` accumulator; one `stage_append` + `commit_staged` per touched table at end-of-load; publisher CAS. | | `Merge` | Upsert by `id` (`merge_insert`) | Same accumulator; one `stage_merge_insert` per touched table at end-of-load (Merge mode dedupes by `id`, last-write-wins); publisher CAS. | -For Append/Merge, a mid-load failure (RI / cardinality violation, validation error) leaves Lance HEAD untouched on the staged tables — the next load on the same tables proceeds normally with no `ExpectedVersionMismatch`. For Overwrite, a mid-load failure can still leave Lance HEAD on a partially-truncated table; the next overwrite replaces it. +For all three modes, a mid-load failure (RI / cardinality violation, validation error) leaves Lance HEAD untouched on the staged tables — the next load on the same tables proceeds normally with no `ExpectedVersionMismatch`. ## `load` vs `ingest` From e676c151bbb3674c4dd41b23ddcc7fb1e99cb433 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 03:53:22 +0300 Subject: [PATCH 2/5] =?UTF-8?q?feat(engine):=20unify=20load/ingest=20?= =?UTF-8?q?=E2=80=94=20load=5Fas=20gains=20an=20optional=20fork=20base?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit load_as/load_file_as gain a base: Option<&str> parameter: with Some(base) a missing target branch is forked from base first (the former ingest semantics); with None the target branch must exist — staging fails on an unknown branch, so a typo'd name can never create one. LoadResult gains branch/base_branch/branch_created metadata (additive). The ingest family (ingest, ingest_as, ingest_file, ingest_file_as) becomes #[deprecated] shims over load_as that preserve the historical contract exactly (from: None still means fork from main; base recorded even when no fork happened). IngestResult and to_ingest_tables stay for the shims and the server until the removal release. The layered policy check is unchanged: Change on the target branch always, BranchCreate additionally when a fork actually happens (enforced inside branch_create_from_as with the actor threaded through). Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 5 +- crates/omnigraph-server/src/lib.rs | 3 + crates/omnigraph-server/tests/server.rs | 1 + crates/omnigraph/src/loader/mod.rs | 223 ++++++++++++------ .../omnigraph/tests/policy_engine_chassis.rs | 6 + 5 files changed, 170 insertions(+), 68 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 62fff60..d1fbb99 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -2669,7 +2669,7 @@ async fn main() -> Result<()> { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let result = db - .load_file_as(&branch, &data.to_string_lossy(), mode.into(), actor) + .load_file_as(&branch, None, &data.to_string_lossy(), mode.into(), actor) .await?; let payload = LoadOutput { uri: &uri, @@ -2729,6 +2729,9 @@ async fn main() -> Result<()> { } else { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); + // Deprecated shim retained until the CLI ingest command + // becomes an alias of the unified `load` handler. + #[allow(deprecated)] let result = db .ingest_file_as( &branch, diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 3b9ff1d..26e4837 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -2722,6 +2722,9 @@ async fn server_ingest( .try_admit(&actor_arc, est_bytes) .map_err(ApiError::from_workload_reject)?; + // Deprecated shim retained until the from-absent semantics change + // lands; the handler then calls `load_as` directly. + #[allow(deprecated)] let result = { let db = &handle.engine; db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id) diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index bf99b8d..fd214bb 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -4731,6 +4731,7 @@ async fn build_parity_graph() -> (tempfile::TempDir, PathBuf, PathBuf) { .unwrap(); db.load_as( "feature", + None, r#"{"type":"Person","data":{"name":"ParityEve","age":29}}"#, LoadMode::Append, None, diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index febbabd..0fabaea 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -26,6 +26,14 @@ use crate::exec::staging::{MutationStaging, PendingMode}; /// Result of a load operation. #[derive(Debug, Clone, Default)] pub struct LoadResult { + /// Branch the load landed on (`"main"` when no branch was given). + pub branch: String, + /// Base branch a fork was requested from (the `base` parameter of + /// `load_as`), recorded verbatim even when the target branch already + /// existed and no fork happened. + pub base_branch: Option, + /// True when this load created `branch` by forking it from `base_branch`. + pub branch_created: bool, pub nodes_loaded: HashMap, pub edges_loaded: HashMap, } @@ -72,6 +80,9 @@ pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> } impl Omnigraph { + #[deprecated( + note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release" + )] pub async fn ingest( &self, branch: &str, @@ -79,9 +90,17 @@ impl Omnigraph { data: &str, mode: LoadMode, ) -> Result { + #[allow(deprecated)] self.ingest_as(branch, from, data, mode, None).await } + /// Deprecated shim over the unified `load_as`. Preserves the historical + /// ingest contract exactly: `from: None` means fork from `main`, and the + /// base branch is recorded in the result even when the target branch + /// already existed (no fork happened). + #[deprecated( + note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release" + )] pub async fn ingest_as( &self, branch: &str, @@ -90,22 +109,24 @@ impl Omnigraph { mode: LoadMode, actor_id: Option<&str>, ) -> Result { - // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is - // `Branch(branch)` for the data-write portion. If ingest creates - // a new branch as a side-effect (target branch doesn't exist), - // the inner `branch_create_from_as` call below additionally - // checks `BranchCreate` — both authorities are genuinely needed - // for "ingest into a fresh branch", so the layered check is - // correct, not redundant. - self.enforce( - omnigraph_policy::PolicyAction::Change, - &omnigraph_policy::ResourceScope::Branch(branch.to_string()), - actor_id, - )?; - self.ingest_with_current_actor(branch, from, data, mode, actor_id) - .await + let result = self + .load_as(branch, Some(from.unwrap_or("main")), data, mode, actor_id) + .await?; + Ok(IngestResult { + branch: result.branch.clone(), + base_branch: result + .base_branch + .clone() + .unwrap_or_else(|| "main".to_string()), + branch_created: result.branch_created, + mode, + tables: result.to_ingest_tables(), + }) } + #[deprecated( + note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release" + )] pub async fn ingest_file( &self, branch: &str, @@ -113,9 +134,13 @@ impl Omnigraph { path: &str, mode: LoadMode, ) -> Result { + #[allow(deprecated)] self.ingest_file_as(branch, from, path, mode, None).await } + #[deprecated( + note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release" + )] pub async fn ingest_file_as( &self, branch: &str, @@ -125,69 +150,35 @@ impl Omnigraph { actor_id: Option<&str>, ) -> Result { let data = std::fs::read_to_string(path).map_err(OmniError::Io)?; + #[allow(deprecated)] self.ingest_as(branch, from, &data, mode, actor_id).await } - async fn ingest_with_current_actor( - &self, - branch: &str, - from: Option<&str>, - data: &str, - mode: LoadMode, - actor_id: Option<&str>, - ) -> Result { - self.ensure_schema_state_valid().await?; - let target_branch = - Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string()); - let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))? - .unwrap_or_else(|| "main".to_string()); - let branch_created = !self - .branch_list() - .await? - .iter() - .any(|name| name == &target_branch); - if branch_created { - // Thread the actor through to the implicit BranchCreate so - // policy decisions match what an explicit `branch_create_from_as` - // call would see. Calling the no-actor variant here would - // bypass BranchCreate enforcement when policy is installed — - // the footgun guard catches that case too, but threading is - // the correct fix. - self.branch_create_from_as( - crate::db::ReadTarget::branch(&base_branch), - &target_branch, - actor_id, - ) - .await?; - } - - let result = self.load_as(&target_branch, data, mode, actor_id).await?; - Ok(IngestResult { - branch: target_branch, - base_branch, - branch_created, - mode, - tables: result.to_ingest_tables(), - }) - } - pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result { - self.load_as(branch, data, mode, None).await + self.load_as(branch, None, data, mode, None).await } + /// Load JSONL data onto `branch`. + /// + /// `base` selects the branch-creation behavior: with `Some(base)`, a + /// missing target branch is forked from `base` first (the former + /// `ingest` semantics); with `None`, the target branch must already + /// exist — staging fails on an unknown branch when it resolves the + /// manifest snapshot, so a typo'd branch name can never create one. pub async fn load_as( &self, branch: &str, + base: Option<&str>, data: &str, mode: LoadMode, actor_id: Option<&str>, ) -> Result { // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is // `Branch(branch)` to match the HTTP-layer Change convention. - // `ingest_as` also calls `load_as` after enforcing its own - // Change gate — that double-check is fine because both gates - // resolve to identical Cedar decisions for the same actor + - // branch (the second check is a structurally-correct no-op). + // When a fork happens below, `branch_create_from_as` additionally + // checks `BranchCreate` — both authorities are genuinely needed + // for "load into a fresh branch", so the layered check is + // correct, not redundant. self.enforce( omnigraph_policy::PolicyAction::Change, &omnigraph_policy::ResourceScope::Branch(branch.to_string()), @@ -205,15 +196,47 @@ impl Omnigraph { // `commit_prepared_updates_on_branch_with_expected`) and leave // `self.coordinator` with a stale manifest snapshot. let requested = Self::normalize_branch_name(branch)?; + let base_branch = match base { + Some(base) => { + Some(Self::normalize_branch_name(base)?.unwrap_or_else(|| "main".to_string())) + } + None => None, + }; + // Fork-if-missing only when a base branch was explicitly given. + // `requested == None` is `main`, which always exists. + let mut branch_created = false; + if let (Some(target), Some(base_name)) = (requested.as_deref(), base_branch.as_deref()) { + let exists = self.branch_list().await?.iter().any(|name| name == target); + if !exists { + // Thread the actor through to the implicit BranchCreate so + // policy decisions match what an explicit `branch_create_from_as` + // call would see. Calling the no-actor variant here would + // bypass BranchCreate enforcement when policy is installed — + // the footgun guard catches that case too, but threading is + // the correct fix. + self.branch_create_from_as( + crate::db::ReadTarget::branch(base_name), + target, + actor_id, + ) + .await?; + branch_created = true; + } + } // Direct-to-target writes: no Run state machine, no `__run__` staging // branch. Cross-table OCC is enforced by the publisher's // `expected_table_versions` CAS inside `load_jsonl_reader`. - self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id) - .await + let mut result = self + .load_direct_on_branch(requested.as_deref(), data, mode, actor_id) + .await?; + result.branch = requested.unwrap_or_else(|| "main".to_string()); + result.base_branch = base_branch; + result.branch_created = branch_created; + Ok(result) } pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result { - self.load_file_as(branch, path, mode, None).await + self.load_file_as(branch, None, path, mode, None).await } /// Read a file into memory and delegate to `load_as`. Used by the @@ -222,12 +245,13 @@ impl Omnigraph { pub async fn load_file_as( &self, branch: &str, + base: Option<&str>, path: &str, mode: LoadMode, actor_id: Option<&str>, ) -> Result { - let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?; - self.load_as(branch, &data, mode, actor_id).await + let data = std::fs::read_to_string(path).map_err(OmniError::Io)?; + self.load_as(branch, base, &data, mode, actor_id).await } async fn load_direct_on_branch( @@ -1824,6 +1848,7 @@ edge WorksAt: Person -> Company } #[tokio::test] + #[allow(deprecated)] async fn test_ingest_creates_branch_and_reports_tables() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -1868,6 +1893,7 @@ edge WorksAt: Person -> Company } #[tokio::test] + #[allow(deprecated)] async fn test_ingest_existing_branch_ignores_from_and_merges_data() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -1942,6 +1968,7 @@ edge WorksAt: Person -> Company } #[tokio::test] + #[allow(deprecated)] async fn test_ingest_as_stamps_actor_on_branch_head_commit() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -1967,6 +1994,68 @@ edge WorksAt: Person -> Company assert_eq!(head.actor_id.as_deref(), Some("act-andrew")); } + #[tokio::test] + async fn test_load_as_with_base_forks_missing_branch_and_stamps_metadata() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + let result = db + .load_as("feature", Some("main"), TEST_DATA, LoadMode::Merge, None) + .await + .unwrap(); + + assert_eq!(result.branch, "feature"); + assert_eq!(result.base_branch.as_deref(), Some("main")); + assert!(result.branch_created); + assert!( + db.branch_list() + .await + .unwrap() + .contains(&"feature".to_string()) + ); + + // Re-loading onto the now-existing branch records the base but + // performs no fork. + let again = db + .load_as( + "feature", + Some("main"), + r#"{"type":"Person","data":{"name":"Bob","age":26}}"#, + LoadMode::Merge, + None, + ) + .await + .unwrap(); + assert!(!again.branch_created); + assert_eq!(again.base_branch.as_deref(), Some("main")); + } + + #[tokio::test] + async fn test_load_as_without_base_errors_on_missing_branch() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + let result = db + .load_as("nonexistent", None, TEST_DATA, LoadMode::Merge, None) + .await; + assert!(result.is_err(), "load without base must not create branches"); + assert!( + !db.branch_list() + .await + .unwrap() + .contains(&"nonexistent".to_string()), + "failed load must not leave a branch behind" + ); + + // Loads to main carry the default branch metadata. + let main_load = db.load("main", TEST_DATA, LoadMode::Overwrite).await.unwrap(); + assert_eq!(main_load.branch, "main"); + assert_eq!(main_load.base_branch, None); + assert!(!main_load.branch_created); + } + #[test] fn test_range_constraint_rejects_nan() { use arrow_array::{Float64Array, RecordBatch, StringArray}; diff --git a/crates/omnigraph/tests/policy_engine_chassis.rs b/crates/omnigraph/tests/policy_engine_chassis.rs index def5349..8443940 100644 --- a/crates/omnigraph/tests/policy_engine_chassis.rs +++ b/crates/omnigraph/tests/policy_engine_chassis.rs @@ -243,6 +243,7 @@ async fn load_as_denies_when_policy_rejects_actor() { let result = db .load_as( "main", + None, ONE_PERSON_JSONL, LoadMode::Merge, Some("act-denied"), @@ -258,6 +259,7 @@ async fn load_as_allows_when_policy_permits_actor() { db.load_as( "main", + None, ONE_PERSON_JSONL, LoadMode::Merge, Some("act-allowed"), @@ -281,6 +283,7 @@ async fn load_file_as_denies_when_policy_rejects_actor() { let result = db .load_file_as( "main", + None, data_path.to_str().unwrap(), LoadMode::Merge, Some("act-denied"), @@ -298,6 +301,7 @@ async fn load_file_as_allows_when_policy_permits_actor() { db.load_file_as( "main", + None, data_path.to_str().unwrap(), LoadMode::Merge, Some("act-allowed"), @@ -307,6 +311,7 @@ async fn load_file_as_allows_when_policy_permits_actor() { } #[tokio::test] +#[allow(deprecated)] async fn ingest_as_denies_when_policy_rejects_actor() { let dir = tempfile::tempdir().unwrap(); let (db, _engine) = init_with_policy(&dir).await; @@ -324,6 +329,7 @@ async fn ingest_as_denies_when_policy_rejects_actor() { } #[tokio::test] +#[allow(deprecated)] async fn ingest_as_allows_when_policy_permits_actor() { let dir = tempfile::tempdir().unwrap(); let (db, _engine) = init_with_policy(&dir).await; From c236a4c2df059c4f5ba4c6e4141c2ca1ab48aef1 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 03:57:41 +0300 Subject: [PATCH 3/5] refactor(loader): load_jsonl helpers take &Omnigraph and document their role MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The free helpers needlessly demanded &mut Omnigraph (every load API takes &self) and read as leftovers. Rather than rewriting their ~200 call sites across the test suites — which would have to re-derive the active-branch resolution at each site — keep the one convenience and make it honest: borrow immutably (&mut callers coerce, no churn) and document it as the active-branch shorthand over Omnigraph::load. Co-Authored-By: Claude Fable 5 --- crates/omnigraph/src/loader/mod.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index 0fabaea..09c2f7c 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -65,15 +65,18 @@ pub enum LoadMode { Merge, } -/// Load JSONL data into an Omnigraph database. -pub async fn load_jsonl(db: &mut Omnigraph, data: &str, mode: LoadMode) -> Result { +/// Convenience: load JSONL data onto the database handle's *active branch* +/// (`main` when unbound). Equivalent to `db.load(active_branch, data, mode)`; +/// use `Omnigraph::load`/`load_as` directly when targeting an explicit branch +/// or when fork-from-base semantics are needed. +pub async fn load_jsonl(db: &Omnigraph, data: &str, mode: LoadMode) -> Result { let current_branch = db.active_branch().await; let branch = current_branch.as_deref().unwrap_or("main"); db.load(branch, data, mode).await } -/// Load JSONL data from a file path. -pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> Result { +/// Convenience: like [`load_jsonl`] but reading from a file path. +pub async fn load_jsonl_file(db: &Omnigraph, path: &str, mode: LoadMode) -> Result { let current_branch = db.active_branch().await; let branch = current_branch.as_deref().unwrap_or("main"); db.load_file(branch, path, mode).await From 90676ef52f9332815ffd94bba610992363392be3 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 04:05:29 +0300 Subject: [PATCH 4/5] feat(server)!: POST /ingest forks only when 'from' is present MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Branch creation becomes opt-in by presence of the request's 'from' field. Previously the handler defaulted from to 'main' and always auto-created a missing branch — a typo'd branch name silently forked main and landed the data there, with the client none the wiser. Now a request without 'from' against a missing branch returns 404 branch-not-found and creates nothing; with 'from' set, fork-if-missing behaves as before. The BranchCreate authority is only consulted when a fork will actually happen. The handler calls the unified load_as directly (the deprecated ingest_as shim is no longer used in the server). IngestOutput.base_branch becomes nullable: it echoes the request's 'from' and is null when absent. OpenAPI regenerated; the CLI's local ingest arm moves to load_file_as + the new converter shape. BREAKING CHANGE: clients that relied on implicit fork-from-main with 'from' omitted must now pass from='main' explicitly. IngestOutput.base_branch is now nullable. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 9 ++-- crates/omnigraph-server/src/api.rs | 28 ++++++---- crates/omnigraph-server/src/lib.rs | 46 +++++++++------- crates/omnigraph-server/tests/server.rs | 71 +++++++++++++++++++++++++ docs/user/server.md | 2 +- openapi.json | 15 +++--- 6 files changed, 131 insertions(+), 40 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index d1fbb99..d8123ce 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -1587,7 +1587,7 @@ fn print_ingest_human(output: &IngestOutput) { "ingested {} into branch {} from {} with {} ({})", output.uri, output.branch, - output.base_branch, + output.base_branch.as_deref().unwrap_or("main"), output.mode.as_str(), if output.branch_created { "branch created" @@ -2729,11 +2729,8 @@ async fn main() -> Result<()> { } else { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); - // Deprecated shim retained until the CLI ingest command - // becomes an alias of the unified `load` handler. - #[allow(deprecated)] let result = db - .ingest_file_as( + .load_file_as( &branch, Some(&from), &data.to_string_lossy(), @@ -2741,7 +2738,7 @@ async fn main() -> Result<()> { actor, ) .await?; - ingest_output(&uri, &result, None) + ingest_output(&uri, &result, mode.into(), None) }; if json { print_json(&payload)?; diff --git a/crates/omnigraph-server/src/api.rs b/crates/omnigraph-server/src/api.rs index 4a6024f..ff3cf67 100644 --- a/crates/omnigraph-server/src/api.rs +++ b/crates/omnigraph-server/src/api.rs @@ -1,6 +1,6 @@ use omnigraph::db::{GraphCommit, MergeOutcome, ReadTarget, SchemaApplyResult, Snapshot}; use omnigraph::error::{MergeConflict, MergeConflictKind}; -use omnigraph::loader::{IngestResult, LoadMode}; +use omnigraph::loader::{LoadMode, LoadResult}; use crate::queries::StoredQuery; use omnigraph_compiler::SchemaMigrationStep; use omnigraph_compiler::query::ast::Param; @@ -208,7 +208,9 @@ pub struct IngestTableOutput { pub struct IngestOutput { pub uri: String, pub branch: String, - pub base_branch: String, + /// Base branch a fork was requested from (the request's `from`), echoed + /// even when the branch already existed. `null` when `from` was absent. + pub base_branch: Option, pub branch_created: bool, #[schema(value_type = LoadModeSchema)] pub mode: LoadMode, @@ -493,9 +495,12 @@ pub struct SchemaOutput { #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct IngestRequest { - /// Target branch. Created from `from` if it does not yet exist. Defaults to `main`. + /// Target branch. Defaults to `main`. Without `from`, the branch must + /// already exist — a missing branch is a 404, never an implicit fork. pub branch: Option, - /// Parent branch used to create `branch` if it does not exist. Defaults to `main`. + /// Parent branch used to create `branch` if it does not exist. Branch + /// creation is opt-in by presence of this field; omit it to require an + /// existing branch. pub from: Option, /// How existing rows are handled. Defaults to `merge`. #[schema(value_type = Option)] @@ -642,18 +647,23 @@ pub fn read_output(query_name: String, target: &ReadTarget, result: QueryResult) } } -pub fn ingest_output(uri: &str, result: &IngestResult, actor_id: Option) -> IngestOutput { +pub fn ingest_output( + uri: &str, + result: &LoadResult, + mode: LoadMode, + actor_id: Option, +) -> IngestOutput { IngestOutput { uri: uri.to_string(), branch: result.branch.clone(), base_branch: result.base_branch.clone(), branch_created: result.branch_created, - mode: result.mode, + mode, tables: result - .tables - .iter() + .to_ingest_tables() + .into_iter() .map(|table| IngestTableOutput { - table_key: table.table_key.clone(), + table_key: table.table_key, rows_loaded: table.rows_loaded, }) .collect(), diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 26e4837..0038674 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -2663,13 +2663,15 @@ async fn server_schema_apply( ), security(("bearer_token" = [])), )] -/// Bulk-ingest NDJSON data into a branch. +/// Bulk-load NDJSON data into a branch. /// /// `data` is NDJSON with one record per line. `mode` controls behavior on /// existing rows: `merge` upserts by id (default), `append` blindly inserts, -/// `overwrite` replaces table contents. If `branch` does not exist it is -/// created from `from` (defaults to `main`). **Destructive** when `mode` is -/// `overwrite` or when ingest produces conflicting writes. +/// `overwrite` replaces table contents. Branch creation is opt-in by +/// presence of `from`: with `from` set, a missing `branch` is created from +/// it; without `from`, `branch` must already exist — a missing branch is a +/// 404, never an implicit fork. **Destructive** when `mode` is `overwrite` +/// or when the load produces conflicting writes. async fn server_ingest( State(state): State, Extension(handle): Extension>, @@ -2677,7 +2679,7 @@ async fn server_ingest( Json(request): Json, ) -> std::result::Result, ApiError> { let branch = request.branch.unwrap_or_else(|| "main".to_string()); - let from = request.from.unwrap_or_else(|| "main".to_string()); + let from = request.from; let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge); let actor_arc = actor .as_ref() @@ -2697,15 +2699,25 @@ async fn server_ingest( }; if !branch_exists { - authorize_request( - actor.as_ref().map(|Extension(actor)| actor), - handle.policy.as_deref(), - PolicyRequest { - action: PolicyAction::BranchCreate, - branch: Some(from.clone()), - target_branch: Some(branch.clone()), - }, - )?; + match from.as_deref() { + // Fork-if-missing is opt-in by presence of `from`; without it a + // typo'd branch name must surface as an error, not silently + // create a fork and land the data there. + None => { + return Err(ApiError::not_found(format!( + "branch '{branch}' not found; pass `from` to create it" + ))); + } + Some(from) => authorize_request( + actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), + PolicyRequest { + action: PolicyAction::BranchCreate, + branch: Some(from.to_string()), + target_branch: Some(branch.clone()), + }, + )?, + } } authorize_request( actor.as_ref().map(|Extension(actor)| actor), @@ -2722,12 +2734,9 @@ async fn server_ingest( .try_admit(&actor_arc, est_bytes) .map_err(ApiError::from_workload_reject)?; - // Deprecated shim retained until the from-absent semantics change - // lands; the handler then calls `load_as` directly. - #[allow(deprecated)] let result = { let db = &handle.engine; - db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id) + db.load_as(&branch, from.as_deref(), &request.data, mode, actor_id) .await .map_err(ApiError::from_omni)? }; @@ -2735,6 +2744,7 @@ async fn server_ingest( Ok(Json(ingest_output( handle.uri.as_str(), &result, + mode, actor_id.map(str::to_string), ))) } diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index fd214bb..7858587 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2265,6 +2265,77 @@ async fn ingest_existing_branch_skips_branch_create_policy_check() { assert_eq!(body["base_branch"], "other-base"); } +/// Regression: branch creation is opt-in by presence of `from`. A request +/// without `from` against a branch that doesn't exist must 404 — not +/// silently fork `main` and land the data on the typo'd branch. +#[tokio::test(flavor = "multi_thread")] +async fn ingest_without_from_returns_404_for_missing_branch_and_creates_nothing() { + let (temp, app) = app_for_loaded_graph().await; + let graph = graph_path(temp.path()); + let ingest = IngestRequest { + branch: Some("feature-typo".to_string()), + from: None, + mode: Some(LoadMode::Merge), + data: r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#.to_string(), + }; + + let (status, body) = json_response( + &app, + Request::builder() + .uri("/ingest") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&ingest).unwrap())) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::NOT_FOUND); + let error: ErrorOutput = serde_json::from_value(body).unwrap(); + assert_eq!(error.code, Some(omnigraph_server::api::ErrorCode::NotFound)); + + let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap(); + assert!( + !db.branch_list() + .await + .unwrap() + .contains(&"feature-typo".to_string()), + "a 404'd ingest must not create the branch" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn ingest_without_from_loads_into_existing_branch() { + let (temp, app) = app_for_loaded_graph().await; + let graph = graph_path(temp.path()); + { + let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap(); + db.branch_create_from(ReadTarget::branch("main"), "feature") + .await + .unwrap(); + } + let ingest = IngestRequest { + branch: Some("feature".to_string()), + from: None, + mode: Some(LoadMode::Merge), + data: r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#.to_string(), + }; + + let (status, body) = json_response( + &app, + Request::builder() + .uri("/ingest") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&ingest).unwrap())) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body["branch"], "feature"); + assert_eq!(body["branch_created"], false); + assert_eq!(body["base_branch"], serde_json::Value::Null); +} + #[tokio::test(flavor = "multi_thread")] async fn ingest_denies_missing_branch_without_branch_create_permission() { let (_temp, app) = app_for_loaded_graph_with_auth_tokens_and_policy( diff --git a/docs/user/server.md b/docs/user/server.md index 60988ca..0922e74 100644 --- a/docs/user/server.md +++ b/docs/user/server.md @@ -56,7 +56,7 @@ Per-graph endpoints — same body shape across modes; URLs differ: | POST | `/queries/{name}` | `/graphs/{id}/queries/{name}` | bearer + `invoke_query` (+ `change` for a stored mutation) | invoke a named query from the `queries:` registry; deny == 404 | `server_invoke_query` | | GET | `/schema` | `/graphs/{id}/schema` | bearer + `read` | get current `.pg` source | `server_schema_get` | | POST | `/schema/apply` | `/graphs/{id}/schema/apply` | bearer + `schema_apply` (target=`main`) | migrate | `server_schema_apply` | -| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (if new) + `change` | bulk load | `server_ingest` (32 MB body limit) | +| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (only when `from` is set and the branch is created) + `change` | bulk load; branch creation is opt-in via `from` — without it a missing `branch` is a 404, never an implicit fork | `server_ingest` (32 MB body limit) | | GET | `/branches` | `/graphs/{id}/branches` | bearer + `read` | list branches | `server_branch_list` | | POST | `/branches` | `/graphs/{id}/branches` | bearer + `branch_create` | create | `server_branch_create` | | DELETE | `/branches/{branch}` | `/graphs/{id}/branches/{branch}` | bearer + `branch_delete` | delete | `server_branch_delete` | diff --git a/openapi.json b/openapi.json index 335c0bc..85c5b8d 100644 --- a/openapi.json +++ b/openapi.json @@ -670,8 +670,8 @@ "tags": [ "mutations" ], - "summary": "Bulk-ingest NDJSON data into a branch.", - "description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. If `branch` does not exist it is\ncreated from `from` (defaults to `main`). **Destructive** when `mode` is\n`overwrite` or when ingest produces conflicting writes.", + "summary": "Bulk-load NDJSON data into a branch.", + "description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. Branch creation is opt-in by\npresence of `from`: with `from` set, a missing `branch` is created from\nit; without `from`, `branch` must already exist — a missing branch is a\n404, never an implicit fork. **Destructive** when `mode` is `overwrite`\nor when the load produces conflicting writes.", "operationId": "ingest", "requestBody": { "content": { @@ -1710,7 +1710,6 @@ "required": [ "uri", "branch", - "base_branch", "branch_created", "mode", "tables" @@ -1723,7 +1722,11 @@ ] }, "base_branch": { - "type": "string" + "type": [ + "string", + "null" + ], + "description": "Base branch a fork was requested from (the request's `from`), echoed\neven when the branch already existed. `null` when `from` was absent." }, "branch": { "type": "string" @@ -1756,7 +1759,7 @@ "string", "null" ], - "description": "Target branch. Created from `from` if it does not yet exist. Defaults to `main`." + "description": "Target branch. Defaults to `main`. Without `from`, the branch must\nalready exist — a missing branch is a 404, never an implicit fork." }, "data": { "type": "string", @@ -1768,7 +1771,7 @@ "string", "null" ], - "description": "Parent branch used to create `branch` if it does not exist. Defaults to `main`." + "description": "Parent branch used to create `branch` if it does not exist. Branch\ncreation is opt-in by presence of this field; omit it to require an\nexisting branch." }, "mode": { "oneOf": [ From fa6af775c1c3d16d04d156e8b2fe3444bdf3fcef Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 04:18:00 +0300 Subject: [PATCH 5/5] feat(cli)!: unified load command; deprecate ingest as an alias MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit omnigraph load is now the single data-write command: - works against remote graphs (POSTs the server's /ingest endpoint with the same bearer/actor resolution as other remote commands) — previously load was the only data command forced to open Lance storage directly - --from opts into fork-if-missing for --branch (the former ingest semantics); without --from a missing branch is an error, never a fork - --mode is now required: overwrite is destructive, so there is no implicit default (the old silent default was overwrite) - output gains base_branch/branch_created (and table sums on remote loads) omnigraph ingest stays as a deprecated alias (defaults preserved: --from main --mode merge) that prints a one-line warning to stderr, matching the read/change deprecation convention; removal in a later release. Docs updated in the same change: cli.md, cli-reference.md, policy.md, audit.md, execution.md (unified load section), AGENTS.md quick-flow, README.md. BREAKING CHANGE: scripts running omnigraph load without --mode must now pass it explicitly (previously defaulted to the destructive overwrite). Co-Authored-By: Claude Fable 5 --- AGENTS.md | 7 +- README.md | 4 +- crates/omnigraph-cli/src/main.rs | 159 ++++++++++++++------ crates/omnigraph-cli/tests/cli.rs | 14 +- crates/omnigraph-cli/tests/support/mod.rs | 10 +- crates/omnigraph-cli/tests/system_local.rs | 126 +++++++++++++++- crates/omnigraph-cli/tests/system_remote.rs | 67 +++++++++ docs/dev/execution.md | 10 +- docs/user/audit.md | 2 +- docs/user/cli-reference.md | 4 +- docs/user/cli.md | 2 +- docs/user/policy.md | 5 +- 12 files changed, 342 insertions(+), 68 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 60276ad..b335955 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -200,9 +200,8 @@ omnigraph init --schema ./schema.pg s3://my-bucket/graph.omni # Bulk load omnigraph load --data ./seed.jsonl --mode overwrite s3://my-bucket/graph.omni -# Branch + ingest a review batch -omnigraph branch create --from main review/2026-04-25 s3://my-bucket/graph.omni -omnigraph ingest --branch review/2026-04-25 --data ./batch.jsonl s3://my-bucket/graph.omni +# Load a review batch onto its own branch (--from forks it if missing) +omnigraph load --branch review/2026-04-25 --from main --mode merge --data ./batch.jsonl s3://my-bucket/graph.omni # Run a hybrid (vector + BM25) query omnigraph read --query ./queries.gq --name find_similar \ @@ -258,7 +257,7 @@ omnigraph policy explain --actor act-alice --action change --branch main | Per-query atomic writes | — | In-memory `MutationStaging.pending` accumulator + `stage_*` / `commit_staged` per touched table at end-of-query + publisher CAS via `commit_with_expected` (single manifest commit per `mutate_as` / `load`); D₂ parse-time rule keeps inserts/updates and deletes from mixing | | Three-way row-level merge | — | `OrderedTableCursor` + `StagedTableWriter`, structured `MergeConflictKind` | | Change feeds | — | `diff_between` / `diff_commits` with manifest fast path + ID streaming | -| Cedar policy | — | Per-graph actions plus server-scoped actions (see [docs/user/policy.md](docs/user/policy.md) for the current list), branch / target_branch / protected scopes, validate/test/explain CLI. **Engine-wide enforcement** (MR-722): every `_as` writer (`apply_schema_as`, `mutate_as`, `load_as`, `ingest_as`, `branch_create_as` / `branch_create_from_as`, `branch_delete_as`, `branch_merge_as`) calls `Omnigraph::enforce(action, scope, actor)` — HTTP, CLI, embedded SDK all hit the same gate. | +| Cedar policy | — | Per-graph actions plus server-scoped actions (see [docs/user/policy.md](docs/user/policy.md) for the current list), branch / target_branch / protected scopes, validate/test/explain CLI. **Engine-wide enforcement** (MR-722): every `_as` writer (`apply_schema_as`, `mutate_as`, `load_as` — the deprecated `ingest_as` shims route through it — `branch_create_as` / `branch_create_from_as`, `branch_delete_as`, `branch_merge_as`) calls `Omnigraph::enforce(action, scope, actor)` — HTTP, CLI, embedded SDK all hit the same gate. | | HTTP server | — | Axum, OpenAPI via utoipa, bearer auth (SHA-256, AWS Secrets Manager option), `authorize_request` at the HTTP boundary (resolves bearer→actor, applies admission control), NDJSON streaming export, **multi-graph mode (v0.6.0+) with cluster routes + read-only `GET /graphs` enumeration + per-graph + server-level Cedar policies. Add/remove graphs by editing `omnigraph.yaml` and restarting.** | | CLI with config | — | `omnigraph.yaml`, aliases, multi-format output (json/jsonl/csv/kv/table) | | Audit / actor tracking | — | `_as` write APIs + actor map in commit graph | diff --git a/README.md b/README.md index 0f6ebea..a75a839 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ omnigraph branch create --from main feature-x ./graph.omni omnigraph branch merge feature-x --into main ./graph.omni ``` -See [docs/user/cli.md](docs/user/cli.md) for schema apply, snapshots, ingest, commits, and policy commands. +See [docs/user/cli.md](docs/user/cli.md) for schema apply, snapshots, data loading, commits, and policy commands. ## Clients @@ -132,7 +132,7 @@ Notes: - `crates/omnigraph-compiler`: shared schema/query parser, typechecker, catalog, and IR lowering - `crates/omnigraph`: storage/runtime, branching, merge, change detection, and query execution -- `crates/omnigraph-cli`: CLI for graph lifecycle (init/load/ingest), query/mutate, branch/commit/merge, schema/lint, snapshot/export, policy, and maintenance (optimize/cleanup) +- `crates/omnigraph-cli`: CLI for graph lifecycle (init/load), query/mutate, branch/commit/merge, schema/lint, snapshot/export, policy, and maintenance (optimize/cleanup) - `crates/omnigraph-server`: Axum HTTP server for remote reads, changes, ingest, export, branches, and commits ## Contributing diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index d8123ce..da3cc44 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -89,7 +89,7 @@ enum Command { #[arg(long)] force: bool, }, - /// Load data into a graph + /// Load data into a graph (local or remote) Load { /// Graph URI uri: Option, @@ -99,14 +99,21 @@ enum Command { config: Option, #[arg(long)] data: PathBuf, + /// Target branch (defaults to main). Without --from it must exist. #[arg(long)] branch: Option, - #[arg(long, default_value = "overwrite")] + /// Base branch to fork --branch from when it doesn't exist yet. + /// Without this flag a missing branch is an error, never a fork. + #[arg(long)] + from: Option, + /// How existing rows are handled: overwrite | append | merge. + /// Required — overwrite is destructive, so there is no default. + #[arg(long)] mode: CliLoadMode, #[arg(long)] json: bool, }, - /// Ingest data into a reviewable named branch + /// Deprecated alias of `load --from ` (defaults: --mode merge, --from main) Ingest { /// Graph URI uri: Option, @@ -686,16 +693,55 @@ impl CliLoadMode { } #[derive(Debug, Serialize)] -struct LoadOutput<'a> { - uri: &'a str, - branch: &'a str, - mode: &'a str, +struct LoadOutput { + uri: String, + branch: String, + mode: &'static str, + /// Present only when `--from` was given; echoes the requested base. + #[serde(skip_serializing_if = "Option::is_none")] + base_branch: Option, + branch_created: bool, nodes_loaded: usize, edges_loaded: usize, node_types_loaded: usize, edge_types_loaded: usize, } +/// Map a remote `/ingest` response onto the CLI's load output. Table keys +/// carry `node:`/`edge:` prefixes, so the per-kind sums are derivable +/// client-side without the catalog. +fn load_output_from_tables( + uri: &str, + branch: &str, + mode: CliLoadMode, + output: &IngestOutput, +) -> LoadOutput { + let mut nodes_loaded = 0; + let mut edges_loaded = 0; + let mut node_types_loaded = 0; + let mut edge_types_loaded = 0; + for table in &output.tables { + if table.table_key.starts_with("node:") { + nodes_loaded += table.rows_loaded; + node_types_loaded += 1; + } else if table.table_key.starts_with("edge:") { + edges_loaded += table.rows_loaded; + edge_types_loaded += 1; + } + } + LoadOutput { + uri: uri.to_string(), + branch: branch.to_string(), + mode: mode.as_str(), + base_branch: output.base_branch.clone(), + branch_created: output.branch_created, + nodes_loaded, + edges_loaded, + node_types_loaded, + edge_types_loaded, + } +} + #[derive(Debug, Serialize)] struct SchemaPlanOutput<'a> { uri: &'a str, @@ -1561,25 +1607,22 @@ fn merged_params_json( } } -fn print_load_human( - uri: &str, - branch: &str, - mode: CliLoadMode, - nodes_loaded: usize, - edges_loaded: usize, - node_types_loaded: usize, - edge_types_loaded: usize, -) { +fn print_load_human(payload: &LoadOutput) { println!( "loaded {} on branch {} with {}: {} nodes across {} node types, {} edges across {} edge types", - uri, - branch, - mode.as_str(), - nodes_loaded, - node_types_loaded, - edges_loaded, - edge_types_loaded + payload.uri, + payload.branch, + payload.mode, + payload.nodes_loaded, + payload.node_types_loaded, + payload.edges_loaded, + payload.edge_types_loaded ); + if payload.branch_created { + if let Some(base) = &payload.base_branch { + println!("branch {} created from {}", payload.branch, base); + } + } } fn print_ingest_human(output: &IngestOutput) { @@ -2659,39 +2702,60 @@ async fn main() -> Result<()> { config, data, branch, + from, mode, json, } => { let config = load_cli_config(config.as_ref())?; - let graph = resolve_local_graph(&config, uri, target.as_deref(), "load")?; + let bearer_token = + resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; + let graph = resolve_cli_graph(&config, uri, target.as_deref())?; let uri = graph.uri.clone(); let branch = resolve_branch(&config, branch, None, "main"); - let db = open_local_db_with_policy(&graph).await?; - let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); - let result = db - .load_file_as(&branch, None, &data.to_string_lossy(), mode.into(), actor) + let payload = if graph.is_remote { + let data = fs::read_to_string(&data)?; + let output = remote_json::( + &http_client, + Method::POST, + remote_url(&uri, "/ingest"), + Some(serde_json::to_value(IngestRequest { + branch: Some(branch.clone()), + from: from.clone(), + mode: Some(mode.into()), + data, + })?), + bearer_token.as_deref(), + ) .await?; - let payload = LoadOutput { - uri: &uri, - branch: &branch, - mode: mode.as_str(), - nodes_loaded: result.nodes_loaded.values().sum(), - edges_loaded: result.edges_loaded.values().sum(), - node_types_loaded: result.nodes_loaded.len(), - edge_types_loaded: result.edges_loaded.len(), + load_output_from_tables(&uri, &branch, mode, &output) + } else { + let db = open_local_db_with_policy(&graph).await?; + let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); + let result = db + .load_file_as( + &branch, + from.as_deref(), + &data.to_string_lossy(), + mode.into(), + actor, + ) + .await?; + LoadOutput { + uri: uri.clone(), + branch: branch.clone(), + mode: mode.as_str(), + base_branch: result.base_branch.clone(), + branch_created: result.branch_created, + nodes_loaded: result.nodes_loaded.values().sum(), + edges_loaded: result.edges_loaded.values().sum(), + node_types_loaded: result.nodes_loaded.len(), + edge_types_loaded: result.edges_loaded.len(), + } }; if json { print_json(&payload)?; } else { - print_load_human( - &uri, - &branch, - mode, - payload.nodes_loaded, - payload.edges_loaded, - payload.node_types_loaded, - payload.edge_types_loaded, - ); + print_load_human(&payload); } } Command::Ingest { @@ -2704,6 +2768,11 @@ async fn main() -> Result<()> { mode, json, } => { + // stderr so `--json` consumers reading stdout are unaffected. + eprintln!( + "warning: `omnigraph ingest` is deprecated and will be removed in a future release; \ + use `omnigraph load --from --mode ` (ingest defaults: --from main --mode merge)" + ); let config = load_cli_config(config.as_ref())?; let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?; diff --git a/crates/omnigraph-cli/tests/cli.rs b/crates/omnigraph-cli/tests/cli.rs index ab3c23b..3f21b8a 100644 --- a/crates/omnigraph-cli/tests/cli.rs +++ b/crates/omnigraph-cli/tests/cli.rs @@ -2650,6 +2650,8 @@ fn load_json_outputs_summary_for_main_branch() { let output = output_success( cli() .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--data") .arg(&data) .arg("--json") @@ -2984,7 +2986,15 @@ fn read_alias_uses_alias_target_without_cli_default_and_accepts_url_like_arg() { &data, r#"{"type":"Person","data":{"name":"https://example.com","age":30}}"#, ); - output_success(cli().arg("load").arg("--data").arg(&data).arg(&graph)); + output_success( + cli() + .arg("load") + .arg("--mode") + .arg("overwrite") + .arg("--data") + .arg(&data) + .arg(&graph), + ); write_query_file( &query, &std::fs::read_to_string(fixture("test.gq")).unwrap(), @@ -3748,6 +3758,8 @@ fn cli_fails_for_missing_schema_or_data_file() { let load_output = output_failure( cli() .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--data") .arg(&missing_data) .arg(&graph), diff --git a/crates/omnigraph-cli/tests/support/mod.rs b/crates/omnigraph-cli/tests/support/mod.rs index c30ed28..653be11 100644 --- a/crates/omnigraph-cli/tests/support/mod.rs +++ b/crates/omnigraph-cli/tests/support/mod.rs @@ -93,7 +93,15 @@ pub fn init_graph(graph: &Path) { pub fn load_fixture(graph: &Path) { let data = fixture("test.jsonl"); - output_success(cli().arg("load").arg("--data").arg(&data).arg(graph)); + output_success( + cli() + .arg("load") + .arg("--mode") + .arg("overwrite") + .arg("--data") + .arg(&data) + .arg(graph), + ); } pub fn write_jsonl(path: &Path, rows: &str) { diff --git a/crates/omnigraph-cli/tests/system_local.rs b/crates/omnigraph-cli/tests/system_local.rs index adb5dc8..46f6fcf 100644 --- a/crates/omnigraph-cli/tests/system_local.rs +++ b/crates/omnigraph-cli/tests/system_local.rs @@ -221,6 +221,8 @@ fn local_cli_end_to_end_init_load_read_change_read_flow() { output_success( cli() .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--data") .arg(fixture("test.jsonl")) .arg(graph.path()), @@ -397,7 +399,7 @@ fn local_cli_ingest_creates_review_branch_and_keeps_it_readable() { {"type":"Person","data":{"name":"Bob","age":26}}"#, ); - let ingest_payload = parse_stdout_json(&output_success( + let ingest_output = output_success( cli() .arg("ingest") .arg("--data") @@ -406,7 +408,13 @@ fn local_cli_ingest_creates_review_branch_and_keeps_it_readable() { .arg("feature-ingest") .arg(graph.path()) .arg("--json"), - )); + ); + // The deprecation warning goes to stderr so --json stdout stays clean. + assert!( + String::from_utf8_lossy(&ingest_output.stderr).contains("deprecated"), + "ingest must warn about its deprecation on stderr" + ); + let ingest_payload = parse_stdout_json(&ingest_output); assert_eq!(ingest_payload["branch"], "feature-ingest"); assert_eq!(ingest_payload["base_branch"], "main"); assert_eq!(ingest_payload["branch_created"], true); @@ -459,6 +467,88 @@ fn local_cli_ingest_creates_review_branch_and_keeps_it_readable() { assert_eq!(bob["rows"][0]["p.age"], 26); } +/// The unified `load` subsumes ingest: `--from` opts into fork-if-missing, +/// while without it a missing branch is an error — never an implicit fork. +#[test] +fn local_cli_load_from_forks_branch_and_missing_branch_errors_without_from() { + let graph = SystemGraph::loaded(); + let extra = graph.write_jsonl( + "system-local-load-from.jsonl", + r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#, + ); + + // Without --from, a missing branch must fail and create nothing. + let failure = output_failure( + cli() + .arg("load") + .arg("--mode") + .arg("merge") + .arg("--data") + .arg(&extra) + .arg("--branch") + .arg("feature-load") + .arg(graph.path()), + ); + assert!( + String::from_utf8_lossy(&failure.stderr).contains("feature-load"), + "error should name the missing branch" + ); + + // With --from, the branch is forked and the load lands on it. + let payload = parse_stdout_json(&output_success( + cli() + .arg("load") + .arg("--mode") + .arg("merge") + .arg("--data") + .arg(&extra) + .arg("--branch") + .arg("feature-load") + .arg("--from") + .arg("main") + .arg(graph.path()) + .arg("--json"), + )); + assert_eq!(payload["branch"], "feature-load"); + assert_eq!(payload["base_branch"], "main"); + assert_eq!(payload["branch_created"], true); + assert_eq!(payload["mode"], "merge"); + assert_eq!(payload["nodes_loaded"], 1); + + let snapshot = parse_stdout_json(&output_success( + cli() + .arg("snapshot") + .arg(graph.path()) + .arg("--branch") + .arg("feature-load") + .arg("--json"), + )); + assert_eq!(snapshot["branch"], "feature-load"); +} + +/// `--mode` is required: overwrite is destructive, so the unified `load` +/// has no implicit default. +#[test] +fn local_cli_load_requires_mode_flag() { + let graph = SystemGraph::loaded(); + let extra = graph.write_jsonl( + "system-local-load-no-mode.jsonl", + r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#, + ); + + let failure = output_failure( + cli() + .arg("load") + .arg("--data") + .arg(&extra) + .arg(graph.path()), + ); + assert!( + String::from_utf8_lossy(&failure.stderr).contains("--mode"), + "clap should demand the missing --mode flag" + ); +} + #[test] fn local_cli_export_round_trips_full_branch_graph() { let graph = SystemGraph::loaded(); @@ -512,6 +602,8 @@ fn local_cli_export_round_trips_full_branch_graph() { output_success( cli() .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--data") .arg(&export_path) .arg(&imported_graph), @@ -610,6 +702,8 @@ policy: {{}} cli() .current_dir(query_root) .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--data") .arg(fixture("test.jsonl")) .arg(&graph_uri), @@ -867,7 +961,15 @@ query get_task($slug: String) { ); output_success(cli().arg("init").arg("--schema").arg(&schema).arg(&graph)); - output_success(cli().arg("load").arg("--data").arg(&data).arg(&graph)); + output_success( + cli() + .arg("load") + .arg("--mode") + .arg("overwrite") + .arg("--data") + .arg(&data) + .arg(&graph), + ); let filtered = parse_stdout_json(&output_success( cli() @@ -997,7 +1099,15 @@ query vector_search($q: String) { ); output_success(cli().arg("init").arg("--schema").arg(&schema).arg(&graph)); - output_success(cli().arg("load").arg("--data").arg(&data).arg(&graph)); + output_success( + cli() + .arg("load") + .arg("--mode") + .arg("overwrite") + .arg("--data") + .arg(&data) + .arg(&graph), + ); let result = parse_stdout_json(&output_success( cli() @@ -1221,6 +1331,8 @@ fn local_cli_load_enforces_engine_layer_policy() { .arg("--as") .arg("act-bruno") .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--config") .arg(&config) .arg("--data") @@ -1239,6 +1351,8 @@ fn local_cli_load_enforces_engine_layer_policy() { .arg("--as") .arg("act-ragnor") .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--config") .arg(&config) .arg("--data") @@ -1684,6 +1798,8 @@ graphs: std::fs::write(&data, "{\"type\":\"Person\",\"data\":{\"name\":\"Ada\"}}\n").unwrap(); let output = cli() .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--data") .arg(&data) .arg(temp.path().join("graphs/knowledge.omni")) @@ -1796,6 +1912,8 @@ fn seed_graph(dir: &std::path::Path, graph: &str, row: &str) { std::fs::write(&data, row).unwrap(); let output = cli() .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--data") .arg(&data) .arg(dir.join(format!("graphs/{graph}.omni"))) diff --git a/crates/omnigraph-cli/tests/system_remote.rs b/crates/omnigraph-cli/tests/system_remote.rs index 45bf502..95a53e7 100644 --- a/crates/omnigraph-cli/tests/system_remote.rs +++ b/crates/omnigraph-cli/tests/system_remote.rs @@ -652,6 +652,8 @@ query add_friend($from: String, $to: String) { output_success( cli() .arg("load") + .arg("--mode") + .arg("overwrite") .arg("--data") .arg(&export_path) .arg(&imported_graph), @@ -755,6 +757,71 @@ fn remote_ingest_creates_review_branch_and_keeps_it_readable() { assert_eq!(zoe["rows"][0]["p.name"], "Zoe"); } +/// The unified `load` works against remote graphs through the server's +/// `/ingest` endpoint: without `--from` a missing branch is a hard error +/// (no implicit fork), with `--from` it forks like ingest did. +#[test] +#[ignore = "requires loopback socket permissions in sandboxed runners"] +fn remote_load_round_trips_and_requires_from_for_new_branches() { + let graph = SystemGraph::loaded(); + let server = graph.spawn_server(); + let config = graph.write_config("omnigraph.yaml", &remote_yaml_config(&server.base_url)); + let extra = graph.write_jsonl( + "system-remote-load.jsonl", + r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#, + ); + + // Missing branch without --from: refused remotely, nothing created. + let failure = output_failure( + cli() + .arg("load") + .arg("--config") + .arg(&config) + .arg("--mode") + .arg("merge") + .arg("--data") + .arg(&extra) + .arg("--branch") + .arg("feature-load"), + ); + assert!( + String::from_utf8_lossy(&failure.stderr).contains("feature-load"), + "error should name the missing branch" + ); + + // With --from, the remote load forks and lands the rows. + let payload = parse_stdout_json(&output_success( + cli() + .arg("load") + .arg("--config") + .arg(&config) + .arg("--mode") + .arg("merge") + .arg("--data") + .arg(&extra) + .arg("--branch") + .arg("feature-load") + .arg("--from") + .arg("main") + .arg("--json"), + )); + assert_eq!(payload["branch"], "feature-load"); + assert_eq!(payload["base_branch"], "main"); + assert_eq!(payload["branch_created"], true); + assert_eq!(payload["nodes_loaded"], 1); + + let snapshot = parse_stdout_json(&output_success( + cli() + .arg("snapshot") + .arg("--config") + .arg(&config) + .arg("--branch") + .arg("feature-load") + .arg("--json"), + )); + assert_eq!(snapshot["branch"], "feature-load"); +} + #[test] #[ignore = "requires loopback socket permissions in sandboxed runners"] fn remote_ingest_reuses_existing_branch_and_merges_updates() { diff --git a/docs/dev/execution.md b/docs/dev/execution.md index 9753696..0e8e3fc 100644 --- a/docs/dev/execution.md +++ b/docs/dev/execution.md @@ -168,12 +168,12 @@ Atomicity guarantee for multi-statement mutations: a mid-query failure leaves La For all three modes, a mid-load failure (RI / cardinality violation, validation error) leaves Lance HEAD untouched on the staged tables — the next load on the same tables proceeds normally with no `ExpectedVersionMismatch`. -## `load` vs `ingest` +## `load` and the deprecated `ingest` shims -- `load(branch, data, mode)` — direct load to a branch (single publisher commit per call). -- `ingest(branch, from, data, mode)` — branch-creating wrapper: if `branch` doesn't exist, fork it from `from` (default `main`) via `branch_create_from`, then call `load(branch, data, mode)`. -- Returns `IngestResult { branch, base_branch, branch_created, mode, tables[] }`. -- `ingest_as(actor_id)` records the actor on the resulting commit. +- `load_as(branch, base, data, mode, actor)` — the unified entry (single publisher commit per call). `base: Some(b)` forks a missing `branch` from `b` first (via `branch_create_from_as`, which enforces `BranchCreate`); `base: None` requires the branch to exist — staging fails on an unknown branch, so a typo'd name can never create one. +- `load(branch, data, mode)` — convenience wrapper with `base: None` and no actor. +- Returns `LoadResult { branch, base_branch, branch_created, nodes_loaded, edges_loaded }`. +- `ingest{,_as,_file,_file_as}` are `#[deprecated]` shims over `load_as` preserving the historical contract (`from: None` forks from `main`; returns `IngestResult`); they are slated for removal. The CLI `ingest` command is a deprecated alias of `load --from `. ## Embeddings during load diff --git a/docs/user/audit.md b/docs/user/audit.md index ab028ac..52cecde 100644 --- a/docs/user/audit.md +++ b/docs/user/audit.md @@ -1,7 +1,7 @@ # Audit / Actor tracking - `Omnigraph::audit_actor_id: Option` is the actor in effect. -- `_as` variants of every write API let callers override the actor: `mutate_as`, `ingest_as`, `branch_merge_as`, `apply_schema_as`, etc. +- `_as` variants of every write API let callers override the actor: `mutate_as`, `load_as`, `branch_merge_as`, `apply_schema_as`, etc. - Actor IDs are persisted on `GraphCommit.actor_id` with split storage in `_graph_commit_actors.lance` (the commit graph is split into `_graph_commits.lance` for the linkage and `_graph_commit_actors.lance` for the actor map). - HTTP server uses the bearer-token actor automatically; CLI uses the local user / explicit env (no implicit actor). - Pre-v0.4.0 graphs also stored actor IDs on `RunRecord.actor_id` in `_graph_runs.lance` / `_graph_run_actors.lance`. The Run state machine was removed in MR-771; those files are inert post-v0.4.0. The v2→v3 manifest migration sweeps any stale `__run__*` branches on first write-open (MR-770); the inert dataset bytes remain until a `delete_prefix` primitive lands. diff --git a/docs/user/cli-reference.md b/docs/user/cli-reference.md index fb12dd8..74d772f 100644 --- a/docs/user/cli-reference.md +++ b/docs/user/cli-reference.md @@ -9,8 +9,8 @@ Top-level command families and subcommands. Graph-targeting commands accept eith | Command | Purpose | |---|---| | `init` | `--schema ` → initialize a graph (also scaffolds `omnigraph.yaml` if missing) | -| `load` | bulk load a branch (`--mode overwrite\|append\|merge`) | -| `ingest` | branch-creating transactional load (`--from `) | +| `load` | bulk load a branch, local or remote (`--mode overwrite\|append\|merge` is **required** — overwrite is destructive, so there is no default). Without `--from` the target branch must exist; `--from ` forks a missing `--branch` from `` first | +| `ingest` | deprecated alias of `load --from ` (defaults: `--from main --mode merge`); prints a one-line warning to stderr | | `query` (alias: `read`) | run named read query; source via `--query `, `-e`/`--query-string `, or `--alias ` (exactly one). `read` is the deprecated previous name and prints a one-line warning to stderr | | `mutate` (alias: `change`) | run mutation query; same `--query` / `-e` / `--alias` mutual-exclusion as `query`. `change` is the deprecated previous name and prints a one-line warning to stderr | | `snapshot` | print current snapshot (per-table version + row count) | diff --git a/docs/user/cli.md b/docs/user/cli.md index 5c4297a..a6ce442 100644 --- a/docs/user/cli.md +++ b/docs/user/cli.md @@ -42,7 +42,7 @@ omnigraph branch create --uri graph.omni --from main feature-x omnigraph branch list --uri graph.omni omnigraph branch merge --uri graph.omni feature-x --into main -omnigraph ingest --data batch.jsonl --branch review/import-2026-04-09 graph.omni +omnigraph load --data batch.jsonl --branch review/import-2026-04-09 --from main --mode merge graph.omni omnigraph export graph.omni --branch main --type Person > people.jsonl omnigraph commit list graph.omni --branch main --json omnigraph commit show --uri graph.omni --json diff --git a/docs/user/policy.md b/docs/user/policy.md index 9c484ba..91684d8 100644 --- a/docs/user/policy.md +++ b/docs/user/policy.md @@ -105,12 +105,13 @@ is validated/tested/explained as the anonymous policy. - `omnigraph policy validate` — parse + count actors, exit 1 on parse error. - `omnigraph policy test` — run cases in `policy.tests.yaml`, exit 1 on any expectation mismatch. - `omnigraph policy explain --actor … --action … [--branch …] [--target-branch …]` — show decision and matched rule. -- `omnigraph --as ` — set the actor for the duration of one invocation. Effective for `change`, `load`, `ingest`, `branch create|delete|merge`, and `schema apply` against local URIs. No-op against remote HTTP URIs (actor is bearer-token-resolved server-side). +- `omnigraph --as ` — set the actor for the duration of one invocation. Effective for `change`, `load` (and its deprecated `ingest` alias), `branch create|delete|merge`, and `schema apply` against local URIs. No-op against remote HTTP URIs (actor is bearer-token-resolved server-side). ## Enforcement Policy is a property of the **engine**, not the transport. Every mutating -write — `mutate_as`, `load_as`, `ingest_as`, `apply_schema_as`, +write — `mutate_as`, `load_as` (the deprecated `ingest_as` shims route +through it), `apply_schema_as`, `branch_create_as`, `branch_create_from_as`, `branch_delete_as`, `branch_merge_as` — calls `Omnigraph::enforce(action, scope, actor)` at the head of the method. The gate fires identically whether the call