tests: pin /ingest admission gate + 429 Retry-After (red)

Per AGENTS.md rule 8, this commit lands the failing regression test
ahead of the fix. Currently fails on f925ad1 with 8/8 statuses returning
200 because /ingest does not call WorkloadController::try_admit.

The test pins:
- /ingest is gated on per-actor admission control (returns 429 when
  the cap is exceeded).
- 429 responses carry the structured `code: too_many_requests` error
  body so clients can distinguish them from generic conflicts.
- 429 responses include a `Retry-After` header so clients can implement
  bounded backoff. The doc claim at api.rs:343 and lib.rs:344 was that
  this header exists; the IntoResponse impl currently emits no headers.

Two follow-up commits will turn this green:
1. Wire WorkloadController::try_admit on /ingest and the four other
   mutating handlers (Block 2.1).
2. Emit the Retry-After header on 429/503 responses (Block 2.2).

The test uses #[serial] + EnvGuard to override
OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=1 without racing parallel tests, then
spawns 8 concurrent /ingest tasks aligned at a tokio::sync::Barrier so
multiple tasks reach try_admit close in time. With cap=1, at least one
must be rejected.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-08 16:57:01 +02:00
parent c263732b1a
commit 0976cbebc5
No known key found for this signature in database

View file

@ -1,6 +1,7 @@
use std::env;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use axum::Router;
use axum::body::{Body, to_bytes};
@ -2556,6 +2557,121 @@ async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordin
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[serial]
async fn ingest_per_actor_admission_cap_returns_429() {
// Pin the admission gate on `/ingest`. With per-actor in-flight cap of 1
// and 8 concurrent requests from the same actor, at least one request
// must be rejected with HTTP 429 and `code: too_many_requests`.
//
// Pre-fix bug class: the admission pattern at `server_change`
// (`crates/omnigraph-server/src/lib.rs:932`) was the only handler
// that called `WorkloadController::try_admit`. A heavy actor sending
// bulk-ingest traffic would exhaust shared engine capacity (Lance I/O
// threads, manifest churn) without ever hitting an admission cap.
// Pinned at the HTTP boundary so future refactors that drop the
// try_admit call from a mutating handler turn this red.
//
// Post-fix invariant: `/ingest`, `/branches/create`, `/branches/delete`,
// `/branches/merge`, and `/schema/apply` all gate on
// `state.workload.try_admit(&actor_arc, est_bytes)` after Cedar
// authorization and before the engine call. Cap exhaustion surfaces as
// 429 with `code: too_many_requests`.
let _guard = EnvGuard::set(&[
("OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX", Some("1")),
("OMNIGRAPH_PER_ACTOR_BYTES_MAX", Some("1000000000")),
]);
let (_temp, app) = app_for_loaded_repo_with_auth_tokens(&[("act-flooder", "flooder-token")]).await;
// Eight concurrent ingests, all from act-flooder. Only one fits in a
// cap=1 in-flight semaphore; the others must 429.
const N: usize = 8;
let barrier = Arc::new(tokio::sync::Barrier::new(N));
let mut handles = Vec::with_capacity(N);
for i in 0..N {
let app = app.clone();
let barrier = Arc::clone(&barrier);
handles.push(tokio::spawn(async move {
// Align the 8 tasks at the barrier so they all attempt
// try_admit close in time.
barrier.wait().await;
let body = serde_json::to_vec(&IngestRequest {
data: format!(
"{{\"type\":\"Person\",\"data\":{{\"name\":\"flooder-{i}\",\"age\":{i}}}}}\n"
),
branch: Some("main".to_string()),
from: Some("main".to_string()),
mode: Some(omnigraph::loader::LoadMode::Merge),
})
.unwrap();
let req = Request::builder()
.uri("/ingest")
.method(Method::POST)
.header("authorization", "Bearer flooder-token")
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
let response = app.oneshot(req).await.unwrap();
let status = response.status();
let headers = response.headers().clone();
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
(status, headers, body.to_vec())
}));
}
let mut results = Vec::with_capacity(N);
for h in handles {
results.push(h.await.unwrap());
}
let statuses: Vec<StatusCode> = results.iter().map(|(s, _, _)| *s).collect();
let too_many: Vec<usize> = statuses
.iter()
.enumerate()
.filter(|(_, s)| **s == StatusCode::TOO_MANY_REQUESTS)
.map(|(i, _)| i)
.collect();
assert!(
!too_many.is_empty(),
"expected at least one /ingest under cap=1 to return 429; got statuses: {:?}",
statuses,
);
// Validate the structured error body for each 429 (body must carry
// the `too_many_requests` code so clients can distinguish it from
// generic conflicts).
for i in &too_many {
let body_value: Value = serde_json::from_slice(&results[*i].2).unwrap();
let error: ErrorOutput = serde_json::from_value(body_value).unwrap();
assert_eq!(
error.code,
Some(omnigraph_server::api::ErrorCode::TooManyRequests),
"429 body must carry code=too_many_requests; idx {} got {:?}",
i,
error.code,
);
}
// Validate the `Retry-After` header is set on every 429. Pinned by
// the same test so a future refactor that drops the header from
// `IntoResponse for ApiError` turns this red. The constant
// matches `crates/omnigraph-server/src/lib.rs::ApiError::into_response`.
for i in &too_many {
let retry_after = results[*i]
.1
.get(axum::http::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.map(str::to_string);
assert!(
retry_after.is_some(),
"429 response must include a Retry-After header; idx {} headers were: {:?}",
i,
results[*i].1,
);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn oversized_request_body_returns_payload_too_large() {
let (_temp, app) = app_for_loaded_repo().await;