From 05a8bd5de14b3bda42e5979f4f44fa3383b4ca2f Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 16:57:53 +0200 Subject: [PATCH] server: gate /ingest /branches/* /schema/apply on per-actor admission Closes the gap that admission control only fired on /change. A heavy actor sending bulk-ingest traffic could exhaust shared engine capacity (Lance I/O threads, manifest churn) without hitting the per-actor cap. Wires `state.workload.try_admit(&actor_arc, est_bytes)` into the five remaining mutating handlers AFTER Cedar authorization (so denied requests don't consume admission slots) and BEFORE the engine call. Byte estimates per handler: - /ingest: request.data.len() (NDJSON body) - /schema/apply: request.schema_source.len() - /branches/create, /branches/delete, /branches/merge: 256 (small JSON; the heavy work is bounded per-(table, branch) by the engine's writer queue rather than by request size) The admission guard is held in `let _admission = ...` so it stays alive until handler return, releasing the count permit + decrementing the byte budget on drop. Pinned by `ingest_per_actor_admission_cap_returns_429` (previous commit). The test still fails on the Retry-After header assertion; the next commit emits the header. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/src/lib.rs | 49 ++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index b9ce418..cb5ca41 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -1024,6 +1024,10 @@ async fn server_schema_apply( actor: Option>, Json(request): Json, ) -> std::result::Result, ApiError> { + let actor_arc = actor + .as_ref() + .map(|Extension(actor)| Arc::clone(&actor.0)) + .unwrap_or_else(|| Arc::::from("anonymous")); let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); authorize_request( &state, @@ -1035,6 +1039,11 @@ async fn server_schema_apply( target_branch: Some("main".to_string()), }, )?; + let est_bytes = request.schema_source.len() as u64; + let _admission = state + .workload + .try_admit(&actor_arc, est_bytes) + .map_err(ApiError::from_workload_reject)?; let result = { let db = &state.engine; db.apply_schema(&request.schema_source) @@ -1073,6 +1082,10 @@ async fn server_ingest( let branch = request.branch.unwrap_or_else(|| "main".to_string()); let from = request.from.unwrap_or_else(|| "main".to_string()); let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge); + let actor_arc = actor + .as_ref() + .map(|Extension(actor)| Arc::clone(&actor.0)) + .unwrap_or_else(|| Arc::::from("anonymous")); let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); let branch_exists = { @@ -1106,6 +1119,11 @@ async fn server_ingest( target_branch: None, }, )?; + let est_bytes = request.data.len() as u64; + let _admission = state + .workload + .try_admit(&actor_arc, est_bytes) + .map_err(ApiError::from_workload_reject)?; let result = { let db = &state.engine; @@ -1187,6 +1205,10 @@ async fn server_branch_create( Json(request): Json, ) -> std::result::Result, ApiError> { let from = request.from.unwrap_or_else(|| "main".to_string()); + let actor_arc = actor + .as_ref() + .map(|Extension(actor)| Arc::clone(&actor.0)) + .unwrap_or_else(|| Arc::::from("anonymous")); authorize_request( &state, actor.as_ref().map(|Extension(actor)| actor), @@ -1200,6 +1222,13 @@ async fn server_branch_create( target_branch: Some(request.name.clone()), }, )?; + // Branch metadata only — small constant bytes estimate. The Lance + // shallow-clone work is bounded by the parent's manifest size, not + // the request body. + let _admission = state + .workload + .try_admit(&actor_arc, 256) + .map_err(ApiError::from_workload_reject)?; { let db = &state.engine; db.branch_create_from(ReadTarget::branch(&from), &request.name) @@ -1240,6 +1269,10 @@ async fn server_branch_delete( actor: Option>, Path(branch): Path, ) -> std::result::Result, ApiError> { + let actor_arc = actor + .as_ref() + .map(|Extension(actor)| Arc::clone(&actor.0)) + .unwrap_or_else(|| Arc::::from("anonymous")); let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); authorize_request( &state, @@ -1251,6 +1284,11 @@ async fn server_branch_delete( target_branch: Some(branch.clone()), }, )?; + // Metadata-only manifest tombstone — small constant estimate. + let _admission = state + .workload + .try_admit(&actor_arc, 256) + .map_err(ApiError::from_workload_reject)?; { let db = &state.engine; db.branch_delete(&branch) @@ -1291,6 +1329,10 @@ async fn server_branch_merge( Json(request): Json, ) -> std::result::Result, ApiError> { let target = request.target.unwrap_or_else(|| "main".to_string()); + let actor_arc = actor + .as_ref() + .map(|Extension(actor)| Arc::clone(&actor.0)) + .unwrap_or_else(|| Arc::::from("anonymous")); let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); authorize_request( &state, @@ -1302,6 +1344,13 @@ async fn server_branch_merge( target_branch: Some(target.clone()), }, )?; + // Merge body is small JSON; the heavy work is in the engine but is + // bounded per-(table, branch) by the writer queue. Small constant + // estimate suffices for the actor in-flight count. + let _admission = state + .workload + .try_admit(&actor_arc, 256) + .map_err(ApiError::from_workload_reject)?; let outcome = { let db = &state.engine; db.branch_merge_as(&request.source, &target, actor_id)