From 90676ef52f9332815ffd94bba610992363392be3 Mon Sep 17 00:00:00 2001 From: aaltshuler Date: Thu, 11 Jun 2026 04:05:29 +0300 Subject: [PATCH] feat(server)!: POST /ingest forks only when 'from' is present MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Branch creation becomes opt-in by presence of the request's 'from' field. Previously the handler defaulted from to 'main' and always auto-created a missing branch — a typo'd branch name silently forked main and landed the data there, with the client none the wiser. Now a request without 'from' against a missing branch returns 404 branch-not-found and creates nothing; with 'from' set, fork-if-missing behaves as before. The BranchCreate authority is only consulted when a fork will actually happen. The handler calls the unified load_as directly (the deprecated ingest_as shim is no longer used in the server). IngestOutput.base_branch becomes nullable: it echoes the request's 'from' and is null when absent. OpenAPI regenerated; the CLI's local ingest arm moves to load_file_as + the new converter shape. BREAKING CHANGE: clients that relied on implicit fork-from-main with 'from' omitted must now pass from='main' explicitly. IngestOutput.base_branch is now nullable. Co-Authored-By: Claude Fable 5 --- crates/omnigraph-cli/src/main.rs | 9 ++-- crates/omnigraph-server/src/api.rs | 28 ++++++---- crates/omnigraph-server/src/lib.rs | 46 +++++++++------- crates/omnigraph-server/tests/server.rs | 71 +++++++++++++++++++++++++ docs/user/server.md | 2 +- openapi.json | 15 +++--- 6 files changed, 131 insertions(+), 40 deletions(-) diff --git a/crates/omnigraph-cli/src/main.rs b/crates/omnigraph-cli/src/main.rs index d1fbb99..d8123ce 100644 --- a/crates/omnigraph-cli/src/main.rs +++ b/crates/omnigraph-cli/src/main.rs @@ -1587,7 +1587,7 @@ fn print_ingest_human(output: &IngestOutput) { "ingested {} into branch {} from {} with {} ({})", output.uri, output.branch, - output.base_branch, + output.base_branch.as_deref().unwrap_or("main"), output.mode.as_str(), if output.branch_created { "branch created" @@ -2729,11 +2729,8 @@ async fn main() -> Result<()> { } else { let db = open_local_db_with_policy(&graph).await?; let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config); - // Deprecated shim retained until the CLI ingest command - // becomes an alias of the unified `load` handler. - #[allow(deprecated)] let result = db - .ingest_file_as( + .load_file_as( &branch, Some(&from), &data.to_string_lossy(), @@ -2741,7 +2738,7 @@ async fn main() -> Result<()> { actor, ) .await?; - ingest_output(&uri, &result, None) + ingest_output(&uri, &result, mode.into(), None) }; if json { print_json(&payload)?; diff --git a/crates/omnigraph-server/src/api.rs b/crates/omnigraph-server/src/api.rs index 4a6024f..ff3cf67 100644 --- a/crates/omnigraph-server/src/api.rs +++ b/crates/omnigraph-server/src/api.rs @@ -1,6 +1,6 @@ use omnigraph::db::{GraphCommit, MergeOutcome, ReadTarget, SchemaApplyResult, Snapshot}; use omnigraph::error::{MergeConflict, MergeConflictKind}; -use omnigraph::loader::{IngestResult, LoadMode}; +use omnigraph::loader::{LoadMode, LoadResult}; use crate::queries::StoredQuery; use omnigraph_compiler::SchemaMigrationStep; use omnigraph_compiler::query::ast::Param; @@ -208,7 +208,9 @@ pub struct IngestTableOutput { pub struct IngestOutput { pub uri: String, pub branch: String, - pub base_branch: String, + /// Base branch a fork was requested from (the request's `from`), echoed + /// even when the branch already existed. `null` when `from` was absent. + pub base_branch: Option, pub branch_created: bool, #[schema(value_type = LoadModeSchema)] pub mode: LoadMode, @@ -493,9 +495,12 @@ pub struct SchemaOutput { #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub struct IngestRequest { - /// Target branch. Created from `from` if it does not yet exist. Defaults to `main`. + /// Target branch. Defaults to `main`. Without `from`, the branch must + /// already exist — a missing branch is a 404, never an implicit fork. pub branch: Option, - /// Parent branch used to create `branch` if it does not exist. Defaults to `main`. + /// Parent branch used to create `branch` if it does not exist. Branch + /// creation is opt-in by presence of this field; omit it to require an + /// existing branch. pub from: Option, /// How existing rows are handled. Defaults to `merge`. #[schema(value_type = Option)] @@ -642,18 +647,23 @@ pub fn read_output(query_name: String, target: &ReadTarget, result: QueryResult) } } -pub fn ingest_output(uri: &str, result: &IngestResult, actor_id: Option) -> IngestOutput { +pub fn ingest_output( + uri: &str, + result: &LoadResult, + mode: LoadMode, + actor_id: Option, +) -> IngestOutput { IngestOutput { uri: uri.to_string(), branch: result.branch.clone(), base_branch: result.base_branch.clone(), branch_created: result.branch_created, - mode: result.mode, + mode, tables: result - .tables - .iter() + .to_ingest_tables() + .into_iter() .map(|table| IngestTableOutput { - table_key: table.table_key.clone(), + table_key: table.table_key, rows_loaded: table.rows_loaded, }) .collect(), diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 26e4837..0038674 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -2663,13 +2663,15 @@ async fn server_schema_apply( ), security(("bearer_token" = [])), )] -/// Bulk-ingest NDJSON data into a branch. +/// Bulk-load NDJSON data into a branch. /// /// `data` is NDJSON with one record per line. `mode` controls behavior on /// existing rows: `merge` upserts by id (default), `append` blindly inserts, -/// `overwrite` replaces table contents. If `branch` does not exist it is -/// created from `from` (defaults to `main`). **Destructive** when `mode` is -/// `overwrite` or when ingest produces conflicting writes. +/// `overwrite` replaces table contents. Branch creation is opt-in by +/// presence of `from`: with `from` set, a missing `branch` is created from +/// it; without `from`, `branch` must already exist — a missing branch is a +/// 404, never an implicit fork. **Destructive** when `mode` is `overwrite` +/// or when the load produces conflicting writes. async fn server_ingest( State(state): State, Extension(handle): Extension>, @@ -2677,7 +2679,7 @@ async fn server_ingest( Json(request): Json, ) -> std::result::Result, ApiError> { let branch = request.branch.unwrap_or_else(|| "main".to_string()); - let from = request.from.unwrap_or_else(|| "main".to_string()); + let from = request.from; let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge); let actor_arc = actor .as_ref() @@ -2697,15 +2699,25 @@ async fn server_ingest( }; if !branch_exists { - authorize_request( - actor.as_ref().map(|Extension(actor)| actor), - handle.policy.as_deref(), - PolicyRequest { - action: PolicyAction::BranchCreate, - branch: Some(from.clone()), - target_branch: Some(branch.clone()), - }, - )?; + match from.as_deref() { + // Fork-if-missing is opt-in by presence of `from`; without it a + // typo'd branch name must surface as an error, not silently + // create a fork and land the data there. + None => { + return Err(ApiError::not_found(format!( + "branch '{branch}' not found; pass `from` to create it" + ))); + } + Some(from) => authorize_request( + actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), + PolicyRequest { + action: PolicyAction::BranchCreate, + branch: Some(from.to_string()), + target_branch: Some(branch.clone()), + }, + )?, + } } authorize_request( actor.as_ref().map(|Extension(actor)| actor), @@ -2722,12 +2734,9 @@ async fn server_ingest( .try_admit(&actor_arc, est_bytes) .map_err(ApiError::from_workload_reject)?; - // Deprecated shim retained until the from-absent semantics change - // lands; the handler then calls `load_as` directly. - #[allow(deprecated)] let result = { let db = &handle.engine; - db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id) + db.load_as(&branch, from.as_deref(), &request.data, mode, actor_id) .await .map_err(ApiError::from_omni)? }; @@ -2735,6 +2744,7 @@ async fn server_ingest( Ok(Json(ingest_output( handle.uri.as_str(), &result, + mode, actor_id.map(str::to_string), ))) } diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index fd214bb..7858587 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -2265,6 +2265,77 @@ async fn ingest_existing_branch_skips_branch_create_policy_check() { assert_eq!(body["base_branch"], "other-base"); } +/// Regression: branch creation is opt-in by presence of `from`. A request +/// without `from` against a branch that doesn't exist must 404 — not +/// silently fork `main` and land the data on the typo'd branch. +#[tokio::test(flavor = "multi_thread")] +async fn ingest_without_from_returns_404_for_missing_branch_and_creates_nothing() { + let (temp, app) = app_for_loaded_graph().await; + let graph = graph_path(temp.path()); + let ingest = IngestRequest { + branch: Some("feature-typo".to_string()), + from: None, + mode: Some(LoadMode::Merge), + data: r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#.to_string(), + }; + + let (status, body) = json_response( + &app, + Request::builder() + .uri("/ingest") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&ingest).unwrap())) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::NOT_FOUND); + let error: ErrorOutput = serde_json::from_value(body).unwrap(); + assert_eq!(error.code, Some(omnigraph_server::api::ErrorCode::NotFound)); + + let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap(); + assert!( + !db.branch_list() + .await + .unwrap() + .contains(&"feature-typo".to_string()), + "a 404'd ingest must not create the branch" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn ingest_without_from_loads_into_existing_branch() { + let (temp, app) = app_for_loaded_graph().await; + let graph = graph_path(temp.path()); + { + let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap(); + db.branch_create_from(ReadTarget::branch("main"), "feature") + .await + .unwrap(); + } + let ingest = IngestRequest { + branch: Some("feature".to_string()), + from: None, + mode: Some(LoadMode::Merge), + data: r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#.to_string(), + }; + + let (status, body) = json_response( + &app, + Request::builder() + .uri("/ingest") + .method(Method::POST) + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&ingest).unwrap())) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::OK); + assert_eq!(body["branch"], "feature"); + assert_eq!(body["branch_created"], false); + assert_eq!(body["base_branch"], serde_json::Value::Null); +} + #[tokio::test(flavor = "multi_thread")] async fn ingest_denies_missing_branch_without_branch_create_permission() { let (_temp, app) = app_for_loaded_graph_with_auth_tokens_and_policy( diff --git a/docs/user/server.md b/docs/user/server.md index 60988ca..0922e74 100644 --- a/docs/user/server.md +++ b/docs/user/server.md @@ -56,7 +56,7 @@ Per-graph endpoints — same body shape across modes; URLs differ: | POST | `/queries/{name}` | `/graphs/{id}/queries/{name}` | bearer + `invoke_query` (+ `change` for a stored mutation) | invoke a named query from the `queries:` registry; deny == 404 | `server_invoke_query` | | GET | `/schema` | `/graphs/{id}/schema` | bearer + `read` | get current `.pg` source | `server_schema_get` | | POST | `/schema/apply` | `/graphs/{id}/schema/apply` | bearer + `schema_apply` (target=`main`) | migrate | `server_schema_apply` | -| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (if new) + `change` | bulk load | `server_ingest` (32 MB body limit) | +| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (only when `from` is set and the branch is created) + `change` | bulk load; branch creation is opt-in via `from` — without it a missing `branch` is a 404, never an implicit fork | `server_ingest` (32 MB body limit) | | GET | `/branches` | `/graphs/{id}/branches` | bearer + `read` | list branches | `server_branch_list` | | POST | `/branches` | `/graphs/{id}/branches` | bearer + `branch_create` | create | `server_branch_create` | | DELETE | `/branches/{branch}` | `/graphs/{id}/branches/{branch}` | bearer + `branch_delete` | delete | `server_branch_delete` | diff --git a/openapi.json b/openapi.json index 335c0bc..85c5b8d 100644 --- a/openapi.json +++ b/openapi.json @@ -670,8 +670,8 @@ "tags": [ "mutations" ], - "summary": "Bulk-ingest NDJSON data into a branch.", - "description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. If `branch` does not exist it is\ncreated from `from` (defaults to `main`). **Destructive** when `mode` is\n`overwrite` or when ingest produces conflicting writes.", + "summary": "Bulk-load NDJSON data into a branch.", + "description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. Branch creation is opt-in by\npresence of `from`: with `from` set, a missing `branch` is created from\nit; without `from`, `branch` must already exist — a missing branch is a\n404, never an implicit fork. **Destructive** when `mode` is `overwrite`\nor when the load produces conflicting writes.", "operationId": "ingest", "requestBody": { "content": { @@ -1710,7 +1710,6 @@ "required": [ "uri", "branch", - "base_branch", "branch_created", "mode", "tables" @@ -1723,7 +1722,11 @@ ] }, "base_branch": { - "type": "string" + "type": [ + "string", + "null" + ], + "description": "Base branch a fork was requested from (the request's `from`), echoed\neven when the branch already existed. `null` when `from` was absent." }, "branch": { "type": "string" @@ -1756,7 +1759,7 @@ "string", "null" ], - "description": "Target branch. Created from `from` if it does not yet exist. Defaults to `main`." + "description": "Target branch. Defaults to `main`. Without `from`, the branch must\nalready exist — a missing branch is a 404, never an implicit fork." }, "data": { "type": "string", @@ -1768,7 +1771,7 @@ "string", "null" ], - "description": "Parent branch used to create `branch` if it does not exist. Defaults to `main`." + "description": "Parent branch used to create `branch` if it does not exist. Branch\ncreation is opt-in by presence of this field; omit it to require an\nexisting branch." }, "mode": { "oneOf": [