diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index 62fff60..d1fbb99 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -2669,7 +2669,7 @@ async fn main() -> Result<()> { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); let result = db - .load_file_as(&branch, &data.to_string_lossy(), mode.into(), actor) + .load_file_as(&branch, None, &data.to_string_lossy(), mode.into(), actor) .await?; let payload = LoadOutput { uri: &uri, @@ -2729,6 +2729,9 @@ async fn main() -> Result<()> { } else { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); + // Deprecated shim retained until the CLI ingest command + // becomes an alias of the unified `load` handler. + #[allow(deprecated)] let result = db .ingest_file_as( &branch, diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 3b9ff1d..26e4837 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -2722,6 +2722,9 @@ async fn server_ingest( .try_admit(&actor_arc, est_bytes) .map_err(ApiError::from_workload_reject)?; + // Deprecated shim retained until the from-absent semantics change + // lands; the handler then calls `load_as` directly. + #[allow(deprecated)] let result = { let db = &handle.engine; db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id) diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index bf99b8d..fd214bb 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -4731,6 +4731,7 @@ async fn build_parity_graph() -> (tempfile::TempDir, PathBuf, PathBuf) { .unwrap(); db.load_as( "feature", + None, r#"{"type":"Person","data":{"name":"ParityEve","age":29}}"#, LoadMode::Append, None, diff --git a/crates/omnigraph/src/loader/mod.rs b/crates/omnigraph/src/loader/mod.rs index febbabd..0fabaea 100644 --- a/crates/omnigraph/src/loader/mod.rs +++ b/crates/omnigraph/src/loader/mod.rs @@ -26,6 +26,14 @@ use crate::exec::staging::{MutationStaging, PendingMode}; /// Result of a load operation. #[derive(Debug, Clone, Default)] pub struct LoadResult { + /// Branch the load landed on (`"main"` when no branch was given). + pub branch: String, + /// Base branch a fork was requested from (the `base` parameter of + /// `load_as`), recorded verbatim even when the target branch already + /// existed and no fork happened. + pub base_branch: Option, + /// True when this load created `branch` by forking it from `base_branch`. + pub branch_created: bool, pub nodes_loaded: HashMap, pub edges_loaded: HashMap, } @@ -72,6 +80,9 @@ pub async fn load_jsonl_file(db: &mut Omnigraph, path: &str, mode: LoadMode) -> } impl Omnigraph { + #[deprecated( + note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release" + )] pub async fn ingest( &self, branch: &str, @@ -79,9 +90,17 @@ impl Omnigraph { data: &str, mode: LoadMode, ) -> Result { + #[allow(deprecated)] self.ingest_as(branch, from, data, mode, None).await } + /// Deprecated shim over the unified `load_as`. Preserves the historical + /// ingest contract exactly: `from: None` means fork from `main`, and the + /// base branch is recorded in the result even when the target branch + /// already existed (no fork happened). + #[deprecated( + note = "use `load_as` with an explicit `base` instead; the ingest family will be removed in a future release" + )] pub async fn ingest_as( &self, branch: &str, @@ -90,22 +109,24 @@ impl Omnigraph { mode: LoadMode, actor_id: Option<&str>, ) -> Result { - // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is - // `Branch(branch)` for the data-write portion. If ingest creates - // a new branch as a side-effect (target branch doesn't exist), - // the inner `branch_create_from_as` call below additionally - // checks `BranchCreate` — both authorities are genuinely needed - // for "ingest into a fresh branch", so the layered check is - // correct, not redundant. - self.enforce( - omnigraph_policy::PolicyAction::Change, - &omnigraph_policy::ResourceScope::Branch(branch.to_string()), - actor_id, - )?; - self.ingest_with_current_actor(branch, from, data, mode, actor_id) - .await + let result = self + .load_as(branch, Some(from.unwrap_or("main")), data, mode, actor_id) + .await?; + Ok(IngestResult { + branch: result.branch.clone(), + base_branch: result + .base_branch + .clone() + .unwrap_or_else(|| "main".to_string()), + branch_created: result.branch_created, + mode, + tables: result.to_ingest_tables(), + }) } + #[deprecated( + note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release" + )] pub async fn ingest_file( &self, branch: &str, @@ -113,9 +134,13 @@ impl Omnigraph { path: &str, mode: LoadMode, ) -> Result { + #[allow(deprecated)] self.ingest_file_as(branch, from, path, mode, None).await } + #[deprecated( + note = "use `load_file_as` with an explicit `base` instead; the ingest family will be removed in a future release" + )] pub async fn ingest_file_as( &self, branch: &str, @@ -125,69 +150,35 @@ impl Omnigraph { actor_id: Option<&str>, ) -> Result { let data = std::fs::read_to_string(path).map_err(OmniError::Io)?; + #[allow(deprecated)] self.ingest_as(branch, from, &data, mode, actor_id).await } - async fn ingest_with_current_actor( - &self, - branch: &str, - from: Option<&str>, - data: &str, - mode: LoadMode, - actor_id: Option<&str>, - ) -> Result { - self.ensure_schema_state_valid().await?; - let target_branch = - Self::normalize_branch_name(branch)?.unwrap_or_else(|| "main".to_string()); - let base_branch = Self::normalize_branch_name(from.unwrap_or("main"))? - .unwrap_or_else(|| "main".to_string()); - let branch_created = !self - .branch_list() - .await? - .iter() - .any(|name| name == &target_branch); - if branch_created { - // Thread the actor through to the implicit BranchCreate so - // policy decisions match what an explicit `branch_create_from_as` - // call would see. Calling the no-actor variant here would - // bypass BranchCreate enforcement when policy is installed — - // the footgun guard catches that case too, but threading is - // the correct fix. - self.branch_create_from_as( - crate::db::ReadTarget::branch(&base_branch), - &target_branch, - actor_id, - ) - .await?; - } - - let result = self.load_as(&target_branch, data, mode, actor_id).await?; - Ok(IngestResult { - branch: target_branch, - base_branch, - branch_created, - mode, - tables: result.to_ingest_tables(), - }) - } - pub async fn load(&self, branch: &str, data: &str, mode: LoadMode) -> Result { - self.load_as(branch, data, mode, None).await + self.load_as(branch, None, data, mode, None).await } + /// Load JSONL data onto `branch`. + /// + /// `base` selects the branch-creation behavior: with `Some(base)`, a + /// missing target branch is forked from `base` first (the former + /// `ingest` semantics); with `None`, the target branch must already + /// exist — staging fails on an unknown branch when it resolves the + /// manifest snapshot, so a typo'd branch name can never create one. pub async fn load_as( &self, branch: &str, + base: Option<&str>, data: &str, mode: LoadMode, actor_id: Option<&str>, ) -> Result { // Engine-layer policy gate (MR-722 fan-out / PR #3). Scope is // `Branch(branch)` to match the HTTP-layer Change convention. - // `ingest_as` also calls `load_as` after enforcing its own - // Change gate — that double-check is fine because both gates - // resolve to identical Cedar decisions for the same actor + - // branch (the second check is a structurally-correct no-op). + // When a fork happens below, `branch_create_from_as` additionally + // checks `BranchCreate` — both authorities are genuinely needed + // for "load into a fresh branch", so the layered check is + // correct, not redundant. self.enforce( omnigraph_policy::PolicyAction::Change, &omnigraph_policy::ResourceScope::Branch(branch.to_string()), @@ -205,15 +196,47 @@ impl Omnigraph { // `commit_prepared_updates_on_branch_with_expected`) and leave // `self.coordinator` with a stale manifest snapshot. let requested = Self::normalize_branch_name(branch)?; + let base_branch = match base { + Some(base) => { + Some(Self::normalize_branch_name(base)?.unwrap_or_else(|| "main".to_string())) + } + None => None, + }; + // Fork-if-missing only when a base branch was explicitly given. + // `requested == None` is `main`, which always exists. + let mut branch_created = false; + if let (Some(target), Some(base_name)) = (requested.as_deref(), base_branch.as_deref()) { + let exists = self.branch_list().await?.iter().any(|name| name == target); + if !exists { + // Thread the actor through to the implicit BranchCreate so + // policy decisions match what an explicit `branch_create_from_as` + // call would see. Calling the no-actor variant here would + // bypass BranchCreate enforcement when policy is installed — + // the footgun guard catches that case too, but threading is + // the correct fix. + self.branch_create_from_as( + crate::db::ReadTarget::branch(base_name), + target, + actor_id, + ) + .await?; + branch_created = true; + } + } // Direct-to-target writes: no Run state machine, no `__run__` staging // branch. Cross-table OCC is enforced by the publisher's // `expected_table_versions` CAS inside `load_jsonl_reader`. - self.load_direct_on_branch(requested.as_deref(), data, mode, actor_id) - .await + let mut result = self + .load_direct_on_branch(requested.as_deref(), data, mode, actor_id) + .await?; + result.branch = requested.unwrap_or_else(|| "main".to_string()); + result.base_branch = base_branch; + result.branch_created = branch_created; + Ok(result) } pub async fn load_file(&self, branch: &str, path: &str, mode: LoadMode) -> Result { - self.load_file_as(branch, path, mode, None).await + self.load_file_as(branch, None, path, mode, None).await } /// Read a file into memory and delegate to `load_as`. Used by the @@ -222,12 +245,13 @@ impl Omnigraph { pub async fn load_file_as( &self, branch: &str, + base: Option<&str>, path: &str, mode: LoadMode, actor_id: Option<&str>, ) -> Result { - let data = std::fs::read_to_string(path).map_err(|e| OmniError::Io(e))?; - self.load_as(branch, &data, mode, actor_id).await + let data = std::fs::read_to_string(path).map_err(OmniError::Io)?; + self.load_as(branch, base, &data, mode, actor_id).await } async fn load_direct_on_branch( @@ -1824,6 +1848,7 @@ edge WorksAt: Person -> Company } #[tokio::test] + #[allow(deprecated)] async fn test_ingest_creates_branch_and_reports_tables() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -1868,6 +1893,7 @@ edge WorksAt: Person -> Company } #[tokio::test] + #[allow(deprecated)] async fn test_ingest_existing_branch_ignores_from_and_merges_data() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -1942,6 +1968,7 @@ edge WorksAt: Person -> Company } #[tokio::test] + #[allow(deprecated)] async fn test_ingest_as_stamps_actor_on_branch_head_commit() { let dir = tempfile::tempdir().unwrap(); let uri = dir.path().to_str().unwrap(); @@ -1967,6 +1994,68 @@ edge WorksAt: Person -> Company assert_eq!(head.actor_id.as_deref(), Some("act-andrew")); } + #[tokio::test] + async fn test_load_as_with_base_forks_missing_branch_and_stamps_metadata() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + let result = db + .load_as("feature", Some("main"), TEST_DATA, LoadMode::Merge, None) + .await + .unwrap(); + + assert_eq!(result.branch, "feature"); + assert_eq!(result.base_branch.as_deref(), Some("main")); + assert!(result.branch_created); + assert!( + db.branch_list() + .await + .unwrap() + .contains(&"feature".to_string()) + ); + + // Re-loading onto the now-existing branch records the base but + // performs no fork. + let again = db + .load_as( + "feature", + Some("main"), + r#"{"type":"Person","data":{"name":"Bob","age":26}}"#, + LoadMode::Merge, + None, + ) + .await + .unwrap(); + assert!(!again.branch_created); + assert_eq!(again.base_branch.as_deref(), Some("main")); + } + + #[tokio::test] + async fn test_load_as_without_base_errors_on_missing_branch() { + let dir = tempfile::tempdir().unwrap(); + let uri = dir.path().to_str().unwrap(); + let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap(); + + let result = db + .load_as("nonexistent", None, TEST_DATA, LoadMode::Merge, None) + .await; + assert!(result.is_err(), "load without base must not create branches"); + assert!( + !db.branch_list() + .await + .unwrap() + .contains(&"nonexistent".to_string()), + "failed load must not leave a branch behind" + ); + + // Loads to main carry the default branch metadata. + let main_load = db.load("main", TEST_DATA, LoadMode::Overwrite).await.unwrap(); + assert_eq!(main_load.branch, "main"); + assert_eq!(main_load.base_branch, None); + assert!(!main_load.branch_created); + } + #[test] fn test_range_constraint_rejects_nan() { use arrow_array::{Float64Array, RecordBatch, StringArray}; diff --git a/crates/omnigraph/tests/policy_engine_chassis.rs b/crates/omnigraph/tests/policy_engine_chassis.rs index def5349..8443940 100644 --- a/crates/omnigraph/tests/policy_engine_chassis.rs +++ b/crates/omnigraph/tests/policy_engine_chassis.rs @@ -243,6 +243,7 @@ async fn load_as_denies_when_policy_rejects_actor() { let result = db .load_as( "main", + None, ONE_PERSON_JSONL, LoadMode::Merge, Some("act-denied"), @@ -258,6 +259,7 @@ async fn load_as_allows_when_policy_permits_actor() { db.load_as( "main", + None, ONE_PERSON_JSONL, LoadMode::Merge, Some("act-allowed"), @@ -281,6 +283,7 @@ async fn load_file_as_denies_when_policy_rejects_actor() { let result = db .load_file_as( "main", + None, data_path.to_str().unwrap(), LoadMode::Merge, Some("act-denied"), @@ -298,6 +301,7 @@ async fn load_file_as_allows_when_policy_permits_actor() { db.load_file_as( "main", + None, data_path.to_str().unwrap(), LoadMode::Merge, Some("act-allowed"), @@ -307,6 +311,7 @@ async fn load_file_as_allows_when_policy_permits_actor() { } #[tokio::test] +#[allow(deprecated)] async fn ingest_as_denies_when_policy_rejects_actor() { let dir = tempfile::tempdir().unwrap(); let (db, _engine) = init_with_policy(&dir).await; @@ -324,6 +329,7 @@ async fn ingest_as_denies_when_policy_rejects_actor() { } #[tokio::test] +#[allow(deprecated)] async fn ingest_as_allows_when_policy_permits_actor() { let dir = tempfile::tempdir().unwrap(); let (db, _engine) = init_with_policy(&dir).await;