feat(server)!: POST /ingest forks only when 'from' is present

Branch creation becomes opt-in by presence of the request's 'from' field.
Previously the handler defaulted from to 'main' and always auto-created a
missing branch — a typo'd branch name silently forked main and landed the
data there, with the client none the wiser. Now a request without 'from'
against a missing branch returns 404 branch-not-found and creates nothing;
with 'from' set, fork-if-missing behaves as before. The BranchCreate
authority is only consulted when a fork will actually happen.

The handler calls the unified load_as directly (the deprecated ingest_as
shim is no longer used in the server). IngestOutput.base_branch becomes
nullable: it echoes the request's 'from' and is null when absent. OpenAPI
regenerated; the CLI's local ingest arm moves to load_file_as + the new
converter shape.

BREAKING CHANGE: clients that relied on implicit fork-from-main with 'from'
omitted must now pass from='main' explicitly. IngestOutput.base_branch is
now nullable.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-11 04:05:29 +03:00
parent c236a4c2df
commit 90676ef52f
6 changed files with 131 additions and 40 deletions

View file

@ -1587,7 +1587,7 @@ fn print_ingest_human(output: &IngestOutput) {
"ingested {} into branch {} from {} with {} ({})",
output.uri,
output.branch,
output.base_branch,
output.base_branch.as_deref().unwrap_or("main"),
output.mode.as_str(),
if output.branch_created {
"branch created"
@ -2729,11 +2729,8 @@ async fn main() -> Result<()> {
} else {
let db = open_local_db_with_policy(&graph).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
// Deprecated shim retained until the CLI ingest command
// becomes an alias of the unified `load` handler.
#[allow(deprecated)]
let result = db
.ingest_file_as(
.load_file_as(
&branch,
Some(&from),
&data.to_string_lossy(),
@ -2741,7 +2738,7 @@ async fn main() -> Result<()> {
actor,
)
.await?;
ingest_output(&uri, &result, None)
ingest_output(&uri, &result, mode.into(), None)
};
if json {
print_json(&payload)?;

View file

@ -1,6 +1,6 @@
use omnigraph::db::{GraphCommit, MergeOutcome, ReadTarget, SchemaApplyResult, Snapshot};
use omnigraph::error::{MergeConflict, MergeConflictKind};
use omnigraph::loader::{IngestResult, LoadMode};
use omnigraph::loader::{LoadMode, LoadResult};
use crate::queries::StoredQuery;
use omnigraph_compiler::SchemaMigrationStep;
use omnigraph_compiler::query::ast::Param;
@ -208,7 +208,9 @@ pub struct IngestTableOutput {
pub struct IngestOutput {
pub uri: String,
pub branch: String,
pub base_branch: String,
/// Base branch a fork was requested from (the request's `from`), echoed
/// even when the branch already existed. `null` when `from` was absent.
pub base_branch: Option<String>,
pub branch_created: bool,
#[schema(value_type = LoadModeSchema)]
pub mode: LoadMode,
@ -493,9 +495,12 @@ pub struct SchemaOutput {
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct IngestRequest {
/// Target branch. Created from `from` if it does not yet exist. Defaults to `main`.
/// Target branch. Defaults to `main`. Without `from`, the branch must
/// already exist — a missing branch is a 404, never an implicit fork.
pub branch: Option<String>,
/// Parent branch used to create `branch` if it does not exist. Defaults to `main`.
/// Parent branch used to create `branch` if it does not exist. Branch
/// creation is opt-in by presence of this field; omit it to require an
/// existing branch.
pub from: Option<String>,
/// How existing rows are handled. Defaults to `merge`.
#[schema(value_type = Option<LoadModeSchema>)]
@ -642,18 +647,23 @@ pub fn read_output(query_name: String, target: &ReadTarget, result: QueryResult)
}
}
pub fn ingest_output(uri: &str, result: &IngestResult, actor_id: Option<String>) -> IngestOutput {
pub fn ingest_output(
uri: &str,
result: &LoadResult,
mode: LoadMode,
actor_id: Option<String>,
) -> IngestOutput {
IngestOutput {
uri: uri.to_string(),
branch: result.branch.clone(),
base_branch: result.base_branch.clone(),
branch_created: result.branch_created,
mode: result.mode,
mode,
tables: result
.tables
.iter()
.to_ingest_tables()
.into_iter()
.map(|table| IngestTableOutput {
table_key: table.table_key.clone(),
table_key: table.table_key,
rows_loaded: table.rows_loaded,
})
.collect(),

View file

@ -2663,13 +2663,15 @@ async fn server_schema_apply(
),
security(("bearer_token" = [])),
)]
/// Bulk-ingest NDJSON data into a branch.
/// Bulk-load NDJSON data into a branch.
///
/// `data` is NDJSON with one record per line. `mode` controls behavior on
/// existing rows: `merge` upserts by id (default), `append` blindly inserts,
/// `overwrite` replaces table contents. If `branch` does not exist it is
/// created from `from` (defaults to `main`). **Destructive** when `mode` is
/// `overwrite` or when ingest produces conflicting writes.
/// `overwrite` replaces table contents. Branch creation is opt-in by
/// presence of `from`: with `from` set, a missing `branch` is created from
/// it; without `from`, `branch` must already exist — a missing branch is a
/// 404, never an implicit fork. **Destructive** when `mode` is `overwrite`
/// or when the load produces conflicting writes.
async fn server_ingest(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
@ -2677,7 +2679,7 @@ async fn server_ingest(
Json(request): Json<IngestRequest>,
) -> std::result::Result<Json<IngestOutput>, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let from = request.from.unwrap_or_else(|| "main".to_string());
let from = request.from;
let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
let actor_arc = actor
.as_ref()
@ -2697,15 +2699,25 @@ async fn server_ingest(
};
if !branch_exists {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
branch: Some(from.clone()),
target_branch: Some(branch.clone()),
},
)?;
match from.as_deref() {
// Fork-if-missing is opt-in by presence of `from`; without it a
// typo'd branch name must surface as an error, not silently
// create a fork and land the data there.
None => {
return Err(ApiError::not_found(format!(
"branch '{branch}' not found; pass `from` to create it"
)));
}
Some(from) => authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
branch: Some(from.to_string()),
target_branch: Some(branch.clone()),
},
)?,
}
}
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
@ -2722,12 +2734,9 @@ async fn server_ingest(
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
// Deprecated shim retained until the from-absent semantics change
// lands; the handler then calls `load_as` directly.
#[allow(deprecated)]
let result = {
let db = &handle.engine;
db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
db.load_as(&branch, from.as_deref(), &request.data, mode, actor_id)
.await
.map_err(ApiError::from_omni)?
};
@ -2735,6 +2744,7 @@ async fn server_ingest(
Ok(Json(ingest_output(
handle.uri.as_str(),
&result,
mode,
actor_id.map(str::to_string),
)))
}

View file

@ -2265,6 +2265,77 @@ async fn ingest_existing_branch_skips_branch_create_policy_check() {
assert_eq!(body["base_branch"], "other-base");
}
/// Regression: branch creation is opt-in by presence of `from`. A request
/// without `from` against a branch that doesn't exist must 404 — not
/// silently fork `main` and land the data on the typo'd branch.
#[tokio::test(flavor = "multi_thread")]
async fn ingest_without_from_returns_404_for_missing_branch_and_creates_nothing() {
let (temp, app) = app_for_loaded_graph().await;
let graph = graph_path(temp.path());
let ingest = IngestRequest {
branch: Some("feature-typo".to_string()),
from: None,
mode: Some(LoadMode::Merge),
data: r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#.to_string(),
};
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&ingest).unwrap()))
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::NOT_FOUND);
let error: ErrorOutput = serde_json::from_value(body).unwrap();
assert_eq!(error.code, Some(omnigraph_server::api::ErrorCode::NotFound));
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
assert!(
!db.branch_list()
.await
.unwrap()
.contains(&"feature-typo".to_string()),
"a 404'd ingest must not create the branch"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn ingest_without_from_loads_into_existing_branch() {
let (temp, app) = app_for_loaded_graph().await;
let graph = graph_path(temp.path());
{
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
db.branch_create_from(ReadTarget::branch("main"), "feature")
.await
.unwrap();
}
let ingest = IngestRequest {
branch: Some("feature".to_string()),
from: None,
mode: Some(LoadMode::Merge),
data: r#"{"type":"Person","data":{"name":"Zoe","age":33}}"#.to_string(),
};
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&ingest).unwrap()))
.unwrap(),
)
.await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body["branch"], "feature");
assert_eq!(body["branch_created"], false);
assert_eq!(body["base_branch"], serde_json::Value::Null);
}
#[tokio::test(flavor = "multi_thread")]
async fn ingest_denies_missing_branch_without_branch_create_permission() {
let (_temp, app) = app_for_loaded_graph_with_auth_tokens_and_policy(

View file

@ -56,7 +56,7 @@ Per-graph endpoints — same body shape across modes; URLs differ:
| POST | `/queries/{name}` | `/graphs/{id}/queries/{name}` | bearer + `invoke_query` (+ `change` for a stored mutation) | invoke a named query from the `queries:` registry; deny == 404 | `server_invoke_query` |
| GET | `/schema` | `/graphs/{id}/schema` | bearer + `read` | get current `.pg` source | `server_schema_get` |
| POST | `/schema/apply` | `/graphs/{id}/schema/apply` | bearer + `schema_apply` (target=`main`) | migrate | `server_schema_apply` |
| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (if new) + `change` | bulk load | `server_ingest` (32 MB body limit) |
| POST | `/ingest` | `/graphs/{id}/ingest` | bearer + `branch_create` (only when `from` is set and the branch is created) + `change` | bulk load; branch creation is opt-in via `from` — without it a missing `branch` is a 404, never an implicit fork | `server_ingest` (32 MB body limit) |
| GET | `/branches` | `/graphs/{id}/branches` | bearer + `read` | list branches | `server_branch_list` |
| POST | `/branches` | `/graphs/{id}/branches` | bearer + `branch_create` | create | `server_branch_create` |
| DELETE | `/branches/{branch}` | `/graphs/{id}/branches/{branch}` | bearer + `branch_delete` | delete | `server_branch_delete` |

View file

@ -670,8 +670,8 @@
"tags": [
"mutations"
],
"summary": "Bulk-ingest NDJSON data into a branch.",
"description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. If `branch` does not exist it is\ncreated from `from` (defaults to `main`). **Destructive** when `mode` is\n`overwrite` or when ingest produces conflicting writes.",
"summary": "Bulk-load NDJSON data into a branch.",
"description": "`data` is NDJSON with one record per line. `mode` controls behavior on\nexisting rows: `merge` upserts by id (default), `append` blindly inserts,\n`overwrite` replaces table contents. Branch creation is opt-in by\npresence of `from`: with `from` set, a missing `branch` is created from\nit; without `from`, `branch` must already exist — a missing branch is a\n404, never an implicit fork. **Destructive** when `mode` is `overwrite`\nor when the load produces conflicting writes.",
"operationId": "ingest",
"requestBody": {
"content": {
@ -1710,7 +1710,6 @@
"required": [
"uri",
"branch",
"base_branch",
"branch_created",
"mode",
"tables"
@ -1723,7 +1722,11 @@
]
},
"base_branch": {
"type": "string"
"type": [
"string",
"null"
],
"description": "Base branch a fork was requested from (the request's `from`), echoed\neven when the branch already existed. `null` when `from` was absent."
},
"branch": {
"type": "string"
@ -1756,7 +1759,7 @@
"string",
"null"
],
"description": "Target branch. Created from `from` if it does not yet exist. Defaults to `main`."
"description": "Target branch. Defaults to `main`. Without `from`, the branch must\nalready exist — a missing branch is a 404, never an implicit fork."
},
"data": {
"type": "string",
@ -1768,7 +1771,7 @@
"string",
"null"
],
"description": "Parent branch used to create `branch` if it does not exist. Defaults to `main`."
"description": "Parent branch used to create `branch` if it does not exist. Branch\ncreation is opt-in by presence of this field; omit it to require an\nexisting branch."
},
"mode": {
"oneOf": [