server+bench: AppState::new_with_workload; bench drops set_var, exercises heavy cap

Two cubic findings on bench_actor_isolation.rs flagged together:

P2 (lib.rs:202): `unsafe { std::env::set_var(...) }` ran inside
`#[tokio::main] async fn main()` AFTER the multi-thread tokio runtime
was up. Rust 2024 made `set_var` unsafe because libc's `setenv` is
not thread-safe; concurrent env reads from logging or runtime
internals can race or read torn state.

Fix (correct by design, AGENTS.md rule 9): add a public
`AppState::new_with_workload(uri, db, bearer_tokens, workload)`
constructor that takes a caller-built `WorkloadController`. Tests and
benches override per-actor caps via the constructor instead of
mutating global env. Closes the bug class "tests need to mutate
global env to override AppState defaults."

P2 (lib.rs:130): heavy actor's `oneshot.await` inside the loop
serialized — heavy in-flight count was always 1, so cap=1 never
tripped on the heavy side. The bench validated isolation (light p99
bounded) but didn't demonstrate the rejection path.

Fix: add a `--heavy-concurrency` arg (default 4) and spawn batches
as concurrent tokio tasks bounded by an internal semaphore. With
heavy_concurrency=4 and inflight_cap=1, the bench now reports
heavy_too_many_requests > 0 and heavy_ok == 1 at peak — proving the
gate fires for the heavy actor.

Sample run on local FS (4 light actors × 30 ops, 20 heavy batches ×
50 rows, heavy_concurrency=4, cap=1):

  heavy_ok: 1
  heavy_too_many_requests: 19
  light_ok: 120
  light_too_many_requests: 0
  light_p99: 565 ms (target < 2 s)

Heavy saturates its own cap; light actors are completely unaffected.
The isolation property is now empirically proven by the rejection
counts rather than just by the latency tail.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-08 17:57:42 +02:00
parent 8e1a8e7d55
commit 22d76dbb40
No known key found for this signature in database
2 changed files with 111 additions and 31 deletions

View file

@ -34,6 +34,7 @@ use axum::http::{Method, Request, StatusCode};
use clap::Parser;
use omnigraph::db::Omnigraph;
use omnigraph_server::api::{ChangeRequest, IngestRequest};
use omnigraph_server::workload::WorkloadController;
use omnigraph_server::{AppState, build_app};
use serde::Serialize;
use tower::ServiceExt;
@ -53,16 +54,32 @@ struct Args {
/// Number of /change ops per light actor.
#[arg(long, default_value_t = 50)]
light_ops_per_actor: usize,
/// Number of /ingest batches the heavy actor sends back-to-back.
/// Number of /ingest batches the heavy actor sends.
#[arg(long, default_value_t = 200)]
heavy_batches: usize,
/// NDJSON rows per heavy /ingest batch.
#[arg(long, default_value_t = 200)]
heavy_rows_per_batch: usize,
/// `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX` for the run. Lower values
/// surface admission rejections faster.
/// Concurrent in-flight /ingest tasks the heavy actor maintains. With
/// `inflight_cap` smaller than this, the heavy actor exercises its own
/// admission cap (and the bench reports `heavy_too_many_requests > 0`),
/// proving the gate fires without affecting light actors. Default 4
/// against cap=1 → expect ~3/4 batches rejected.
#[arg(long, default_value_t = 4)]
heavy_concurrency: usize,
/// Per-actor in-flight cap for the run. Passed directly into the
/// `WorkloadController` constructor (no env-var fiddling). Lower
/// values surface admission rejections faster.
#[arg(long, default_value_t = 1)]
inflight_cap: u32,
/// Per-actor byte budget (bytes). Default 1 GiB so byte budget
/// doesn't bottleneck the count gate during normal bench runs.
#[arg(long, default_value_t = 1_073_741_824)]
byte_cap: u64,
/// Global rewrite-pool cap. Bench is non-rewriting so default 4
/// matches production.
#[arg(long, default_value_t = 4)]
global_rewrite_cap: u32,
/// Output file for the JSON results. Stdout always gets a copy.
#[arg(long)]
output: Option<PathBuf>,
@ -114,27 +131,53 @@ fn build_heavy_body(batch_idx: usize, rows: usize) -> String {
.unwrap()
}
async fn drive_heavy_actor(app: Router, batches: usize, rows_per_batch: usize) -> (usize, usize, usize) {
async fn send_heavy_batch(app: Router, batch_idx: usize, rows: usize) -> StatusCode {
let body = build_heavy_body(batch_idx, rows);
let req = Request::builder()
.method(Method::POST)
.uri("/ingest")
.header("authorization", format!("Bearer {HEAVY_TOKEN}"))
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
match app.oneshot(req).await {
Ok(r) => r.status(),
Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
/// Drive `batches` /ingest calls from the heavy actor with up to
/// `concurrency` in flight at a time. With `concurrency > inflight_cap`,
/// the heavy actor's own admission permits are exhausted at peak, and
/// some batches return 429. Returns (ok, 429, other) counts.
async fn drive_heavy_actor(
app: Router,
batches: usize,
rows_per_batch: usize,
concurrency: usize,
) -> (usize, usize, usize) {
use tokio::sync::Semaphore;
let limiter = Arc::new(Semaphore::new(concurrency.max(1)));
let mut handles = Vec::with_capacity(batches);
for b in 0..batches {
let app = app.clone();
let limiter = Arc::clone(&limiter);
handles.push(tokio::spawn(async move {
// Bound concurrency to `concurrency`; this is the bench's
// own pacing, not the server's admission control. The
// server's `WorkloadController` is what we're trying to
// exercise — and it has its own cap (potentially smaller).
let _permit = limiter.acquire_owned().await.unwrap();
send_heavy_batch(app, b, rows_per_batch).await
}));
}
let mut ok = 0usize;
let mut too_many = 0usize;
let mut other = 0usize;
for b in 0..batches {
let body = build_heavy_body(b, rows_per_batch);
let req = Request::builder()
.method(Method::POST)
.uri("/ingest")
.header("authorization", format!("Bearer {HEAVY_TOKEN}"))
.header("content-type", "application/json")
.body(Body::from(body))
.unwrap();
let response = match app.clone().oneshot(req).await {
Ok(r) => r,
Err(_) => {
other += 1;
continue;
}
};
match response.status() {
for h in handles {
match h.await.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) {
StatusCode::OK => ok += 1,
StatusCode::TOO_MANY_REQUESTS => too_many += 1,
_ => other += 1,
@ -143,6 +186,8 @@ async fn drive_heavy_actor(app: Router, batches: usize, rows_per_batch: usize) -
(ok, too_many, other)
}
use std::sync::Arc;
async fn drive_light_actor(
app: Router,
token: String,
@ -207,13 +252,6 @@ async fn main() {
std::process::exit(2);
}
// Override the per-actor in-flight cap before AppState is constructed
// (WorkloadController::from_env reads it at startup).
// SAFETY: single-threaded init at process start; no concurrent env reads.
unsafe {
std::env::set_var("OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX", args.inflight_cap.to_string());
}
let temp = tempfile::tempdir().expect("tempdir");
let repo = temp.path().join("bench.omni");
Omnigraph::init(repo.to_str().unwrap(), SCHEMA)
@ -229,13 +267,25 @@ async fn main() {
let db = Omnigraph::open(repo.to_str().unwrap())
.await
.expect("open repo");
let state = AppState::new_with_bearer_tokens(repo.to_string_lossy().to_string(), db, tokens);
// Construct a custom WorkloadController with the requested caps and
// pass it through `AppState::new_with_workload`. Avoids the
// `unsafe { std::env::set_var(...) }` antipattern that violates
// `setenv`'s thread-safety precondition once the multi-thread tokio
// runtime is up.
let workload = WorkloadController::new(args.inflight_cap, args.byte_cap, args.global_rewrite_cap);
let state = AppState::new_with_workload(
repo.to_string_lossy().to_string(),
db,
tokens,
workload,
);
let app = build_app(state);
eprintln!(
"running heavy={}x{} light={}x{} cap={}",
"running heavy={}x{} (concurrency={}) light={}x{} cap={}",
args.heavy_batches,
args.heavy_rows_per_batch,
args.heavy_concurrency,
args.light_actors,
args.light_ops_per_actor,
args.inflight_cap,
@ -243,8 +293,15 @@ async fn main() {
let start = Instant::now();
let heavy_app = app.clone();
let heavy_concurrency = args.heavy_concurrency;
let heavy_handle = tokio::spawn(async move {
drive_heavy_actor(heavy_app, args.heavy_batches, args.heavy_rows_per_batch).await
drive_heavy_actor(
heavy_app,
args.heavy_batches,
args.heavy_rows_per_batch,
heavy_concurrency,
)
.await
});
let mut light_handles = Vec::with_capacity(args.light_actors);

View file

@ -206,6 +206,29 @@ impl AppState {
}
}
/// Construct with a caller-provided [`workload::WorkloadController`].
/// Tests and benches use this to override per-actor caps without
/// mutating global env vars (which is unsafe in Rust 2024 once the
/// async runtime is up — `setenv` isn't thread-safe).
pub fn new_with_workload(
uri: String,
db: Omnigraph,
bearer_tokens: Vec<(String, String)>,
workload: workload::WorkloadController,
) -> Self {
let bearer_tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
.into_iter()
.map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
.collect();
Self {
uri,
engine: Arc::new(db),
workload: Arc::new(workload),
bearer_tokens: Arc::from(bearer_tokens),
policy_engine: None,
}
}
pub async fn open(uri: impl Into<String>) -> Result<Self> {
Self::open_with_bearer_token(uri, None).await
}