From 0976cbebc5cf1ae9d7647ee95aa0cd7b4bce2abd Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 16:57:01 +0200 Subject: [PATCH] tests: pin /ingest admission gate + 429 Retry-After (red) Per AGENTS.md rule 8, this commit lands the failing regression test ahead of the fix. Currently fails on f925ad1 with 8/8 statuses returning 200 because /ingest does not call WorkloadController::try_admit. The test pins: - /ingest is gated on per-actor admission control (returns 429 when the cap is exceeded). - 429 responses carry the structured `code: too_many_requests` error body so clients can distinguish them from generic conflicts. - 429 responses include a `Retry-After` header so clients can implement bounded backoff. The doc claim at api.rs:343 and lib.rs:344 was that this header exists; the IntoResponse impl currently emits no headers. Two follow-up commits will turn this green: 1. Wire WorkloadController::try_admit on /ingest and the four other mutating handlers (Block 2.1). 2. Emit the Retry-After header on 429/503 responses (Block 2.2). The test uses #[serial] + EnvGuard to override OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=1 without racing parallel tests, then spawns 8 concurrent /ingest tasks aligned at a tokio::sync::Barrier so multiple tasks reach try_admit close in time. With cap=1, at least one must be rejected. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/tests/server.rs | 116 ++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 6fa9787..9c17e2f 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -1,6 +1,7 @@ use std::env; use std::fs; use std::path::{Path, PathBuf}; +use std::sync::Arc; use axum::Router; use axum::body::{Body, to_bytes}; @@ -2556,6 +2557,121 @@ async fn concurrent_branch_create_from_distinct_parents_does_not_corrupt_coordin ); } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +#[serial] +async fn ingest_per_actor_admission_cap_returns_429() { + // Pin the admission gate on `/ingest`. With per-actor in-flight cap of 1 + // and 8 concurrent requests from the same actor, at least one request + // must be rejected with HTTP 429 and `code: too_many_requests`. + // + // Pre-fix bug class: the admission pattern at `server_change` + // (`crates/omnigraph-server/src/lib.rs:932`) was the only handler + // that called `WorkloadController::try_admit`. A heavy actor sending + // bulk-ingest traffic would exhaust shared engine capacity (Lance I/O + // threads, manifest churn) without ever hitting an admission cap. + // Pinned at the HTTP boundary so future refactors that drop the + // try_admit call from a mutating handler turn this red. + // + // Post-fix invariant: `/ingest`, `/branches/create`, `/branches/delete`, + // `/branches/merge`, and `/schema/apply` all gate on + // `state.workload.try_admit(&actor_arc, est_bytes)` after Cedar + // authorization and before the engine call. Cap exhaustion surfaces as + // 429 with `code: too_many_requests`. + let _guard = EnvGuard::set(&[ + ("OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX", Some("1")), + ("OMNIGRAPH_PER_ACTOR_BYTES_MAX", Some("1000000000")), + ]); + let (_temp, app) = app_for_loaded_repo_with_auth_tokens(&[("act-flooder", "flooder-token")]).await; + + // Eight concurrent ingests, all from act-flooder. Only one fits in a + // cap=1 in-flight semaphore; the others must 429. + const N: usize = 8; + let barrier = Arc::new(tokio::sync::Barrier::new(N)); + let mut handles = Vec::with_capacity(N); + for i in 0..N { + let app = app.clone(); + let barrier = Arc::clone(&barrier); + handles.push(tokio::spawn(async move { + // Align the 8 tasks at the barrier so they all attempt + // try_admit close in time. + barrier.wait().await; + + let body = serde_json::to_vec(&IngestRequest { + data: format!( + "{{\"type\":\"Person\",\"data\":{{\"name\":\"flooder-{i}\",\"age\":{i}}}}}\n" + ), + branch: Some("main".to_string()), + from: Some("main".to_string()), + mode: Some(omnigraph::loader::LoadMode::Merge), + }) + .unwrap(); + let req = Request::builder() + .uri("/ingest") + .method(Method::POST) + .header("authorization", "Bearer flooder-token") + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let response = app.oneshot(req).await.unwrap(); + let status = response.status(); + let headers = response.headers().clone(); + let body = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + (status, headers, body.to_vec()) + })); + } + + let mut results = Vec::with_capacity(N); + for h in handles { + results.push(h.await.unwrap()); + } + let statuses: Vec = results.iter().map(|(s, _, _)| *s).collect(); + + let too_many: Vec = statuses + .iter() + .enumerate() + .filter(|(_, s)| **s == StatusCode::TOO_MANY_REQUESTS) + .map(|(i, _)| i) + .collect(); + assert!( + !too_many.is_empty(), + "expected at least one /ingest under cap=1 to return 429; got statuses: {:?}", + statuses, + ); + + // Validate the structured error body for each 429 (body must carry + // the `too_many_requests` code so clients can distinguish it from + // generic conflicts). + for i in &too_many { + let body_value: Value = serde_json::from_slice(&results[*i].2).unwrap(); + let error: ErrorOutput = serde_json::from_value(body_value).unwrap(); + assert_eq!( + error.code, + Some(omnigraph_server::api::ErrorCode::TooManyRequests), + "429 body must carry code=too_many_requests; idx {} got {:?}", + i, + error.code, + ); + } + + // Validate the `Retry-After` header is set on every 429. Pinned by + // the same test so a future refactor that drops the header from + // `IntoResponse for ApiError` turns this red. The constant + // matches `crates/omnigraph-server/src/lib.rs::ApiError::into_response`. + for i in &too_many { + let retry_after = results[*i] + .1 + .get(axum::http::header::RETRY_AFTER) + .and_then(|v| v.to_str().ok()) + .map(str::to_string); + assert!( + retry_after.is_some(), + "429 response must include a Retry-After header; idx {} headers were: {:?}", + i, + results[*i].1, + ); + } +} + #[tokio::test(flavor = "multi_thread")] async fn oversized_request_body_returns_payload_too_large() { let (_temp, app) = app_for_loaded_repo().await;