server: gate /ingest /branches/* /schema/apply on per-actor admission

Closes the gap that admission control only fired on /change. A heavy
actor sending bulk-ingest traffic could exhaust shared engine capacity
(Lance I/O threads, manifest churn) without hitting the per-actor cap.

Wires `state.workload.try_admit(&actor_arc, est_bytes)` into the five
remaining mutating handlers AFTER Cedar authorization (so denied
requests don't consume admission slots) and BEFORE the engine call.
Byte estimates per handler:

- /ingest: request.data.len() (NDJSON body)
- /schema/apply: request.schema_source.len()
- /branches/create, /branches/delete, /branches/merge: 256
  (small JSON; the heavy work is bounded per-(table, branch) by the
  engine's writer queue rather than by request size)

The admission guard is held in `let _admission = ...` so it stays
alive until handler return, releasing the count permit + decrementing
the byte budget on drop.

Pinned by `ingest_per_actor_admission_cap_returns_429` (previous
commit). The test still fails on the Retry-After header assertion;
the next commit emits the header.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-08 16:57:53 +02:00
parent 0976cbebc5
commit 05a8bd5de1
No known key found for this signature in database

View file

@ -1024,6 +1024,10 @@ async fn server_schema_apply(
actor: Option<Extension<AuthenticatedActor>>,
Json(request): Json<SchemaApplyRequest>,
) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
authorize_request(
&state,
@ -1035,6 +1039,11 @@ async fn server_schema_apply(
target_branch: Some("main".to_string()),
},
)?;
let est_bytes = request.schema_source.len() as u64;
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
let result = {
let db = &state.engine;
db.apply_schema(&request.schema_source)
@ -1073,6 +1082,10 @@ async fn server_ingest(
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let from = request.from.unwrap_or_else(|| "main".to_string());
let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
let branch_exists = {
@ -1106,6 +1119,11 @@ async fn server_ingest(
target_branch: None,
},
)?;
let est_bytes = request.data.len() as u64;
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
let result = {
let db = &state.engine;
@ -1187,6 +1205,10 @@ async fn server_branch_create(
Json(request): Json<BranchCreateRequest>,
) -> std::result::Result<Json<BranchCreateOutput>, ApiError> {
let from = request.from.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
@ -1200,6 +1222,13 @@ async fn server_branch_create(
target_branch: Some(request.name.clone()),
},
)?;
// Branch metadata only — small constant bytes estimate. The Lance
// shallow-clone work is bounded by the parent's manifest size, not
// the request body.
let _admission = state
.workload
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
{
let db = &state.engine;
db.branch_create_from(ReadTarget::branch(&from), &request.name)
@ -1240,6 +1269,10 @@ async fn server_branch_delete(
actor: Option<Extension<AuthenticatedActor>>,
Path(branch): Path<String>,
) -> std::result::Result<Json<BranchDeleteOutput>, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
authorize_request(
&state,
@ -1251,6 +1284,11 @@ async fn server_branch_delete(
target_branch: Some(branch.clone()),
},
)?;
// Metadata-only manifest tombstone — small constant estimate.
let _admission = state
.workload
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
{
let db = &state.engine;
db.branch_delete(&branch)
@ -1291,6 +1329,10 @@ async fn server_branch_merge(
Json(request): Json<BranchMergeRequest>,
) -> std::result::Result<Json<BranchMergeOutput>, ApiError> {
let target = request.target.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
authorize_request(
&state,
@ -1302,6 +1344,13 @@ async fn server_branch_merge(
target_branch: Some(target.clone()),
},
)?;
// Merge body is small JSON; the heavy work is in the engine but is
// bounded per-(table, branch) by the writer queue. Small constant
// estimate suffices for the actor in-flight count.
let _admission = state
.workload
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
let outcome = {
let db = &state.engine;
db.branch_merge_as(&request.source, &target, actor_id)