diff --git a/crates/omnigraph-server/examples/bench_actor_isolation.rs b/crates/omnigraph-server/examples/bench_actor_isolation.rs index 9f8b62a..96e9cec 100644 --- a/crates/omnigraph-server/examples/bench_actor_isolation.rs +++ b/crates/omnigraph-server/examples/bench_actor_isolation.rs @@ -34,6 +34,7 @@ use axum::http::{Method, Request, StatusCode}; use clap::Parser; use omnigraph::db::Omnigraph; use omnigraph_server::api::{ChangeRequest, IngestRequest}; +use omnigraph_server::workload::WorkloadController; use omnigraph_server::{AppState, build_app}; use serde::Serialize; use tower::ServiceExt; @@ -53,16 +54,32 @@ struct Args { /// Number of /change ops per light actor. #[arg(long, default_value_t = 50)] light_ops_per_actor: usize, - /// Number of /ingest batches the heavy actor sends back-to-back. + /// Number of /ingest batches the heavy actor sends. #[arg(long, default_value_t = 200)] heavy_batches: usize, /// NDJSON rows per heavy /ingest batch. #[arg(long, default_value_t = 200)] heavy_rows_per_batch: usize, - /// `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX` for the run. Lower values - /// surface admission rejections faster. + /// Concurrent in-flight /ingest tasks the heavy actor maintains. With + /// `inflight_cap` smaller than this, the heavy actor exercises its own + /// admission cap (and the bench reports `heavy_too_many_requests > 0`), + /// proving the gate fires without affecting light actors. Default 4 + /// against cap=1 → expect ~3/4 batches rejected. + #[arg(long, default_value_t = 4)] + heavy_concurrency: usize, + /// Per-actor in-flight cap for the run. Passed directly into the + /// `WorkloadController` constructor (no env-var fiddling). Lower + /// values surface admission rejections faster. #[arg(long, default_value_t = 1)] inflight_cap: u32, + /// Per-actor byte budget (bytes). Default 1 GiB so byte budget + /// doesn't bottleneck the count gate during normal bench runs. + #[arg(long, default_value_t = 1_073_741_824)] + byte_cap: u64, + /// Global rewrite-pool cap. Bench is non-rewriting so default 4 + /// matches production. + #[arg(long, default_value_t = 4)] + global_rewrite_cap: u32, /// Output file for the JSON results. Stdout always gets a copy. #[arg(long)] output: Option, @@ -114,27 +131,53 @@ fn build_heavy_body(batch_idx: usize, rows: usize) -> String { .unwrap() } -async fn drive_heavy_actor(app: Router, batches: usize, rows_per_batch: usize) -> (usize, usize, usize) { +async fn send_heavy_batch(app: Router, batch_idx: usize, rows: usize) -> StatusCode { + let body = build_heavy_body(batch_idx, rows); + let req = Request::builder() + .method(Method::POST) + .uri("/ingest") + .header("authorization", format!("Bearer {HEAVY_TOKEN}")) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + match app.oneshot(req).await { + Ok(r) => r.status(), + Err(_) => StatusCode::INTERNAL_SERVER_ERROR, + } +} + +/// Drive `batches` /ingest calls from the heavy actor with up to +/// `concurrency` in flight at a time. With `concurrency > inflight_cap`, +/// the heavy actor's own admission permits are exhausted at peak, and +/// some batches return 429. Returns (ok, 429, other) counts. +async fn drive_heavy_actor( + app: Router, + batches: usize, + rows_per_batch: usize, + concurrency: usize, +) -> (usize, usize, usize) { + use tokio::sync::Semaphore; + + let limiter = Arc::new(Semaphore::new(concurrency.max(1))); + let mut handles = Vec::with_capacity(batches); + for b in 0..batches { + let app = app.clone(); + let limiter = Arc::clone(&limiter); + handles.push(tokio::spawn(async move { + // Bound concurrency to `concurrency`; this is the bench's + // own pacing, not the server's admission control. The + // server's `WorkloadController` is what we're trying to + // exercise — and it has its own cap (potentially smaller). + let _permit = limiter.acquire_owned().await.unwrap(); + send_heavy_batch(app, b, rows_per_batch).await + })); + } + let mut ok = 0usize; let mut too_many = 0usize; let mut other = 0usize; - for b in 0..batches { - let body = build_heavy_body(b, rows_per_batch); - let req = Request::builder() - .method(Method::POST) - .uri("/ingest") - .header("authorization", format!("Bearer {HEAVY_TOKEN}")) - .header("content-type", "application/json") - .body(Body::from(body)) - .unwrap(); - let response = match app.clone().oneshot(req).await { - Ok(r) => r, - Err(_) => { - other += 1; - continue; - } - }; - match response.status() { + for h in handles { + match h.await.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) { StatusCode::OK => ok += 1, StatusCode::TOO_MANY_REQUESTS => too_many += 1, _ => other += 1, @@ -143,6 +186,8 @@ async fn drive_heavy_actor(app: Router, batches: usize, rows_per_batch: usize) - (ok, too_many, other) } +use std::sync::Arc; + async fn drive_light_actor( app: Router, token: String, @@ -207,13 +252,6 @@ async fn main() { std::process::exit(2); } - // Override the per-actor in-flight cap before AppState is constructed - // (WorkloadController::from_env reads it at startup). - // SAFETY: single-threaded init at process start; no concurrent env reads. - unsafe { - std::env::set_var("OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX", args.inflight_cap.to_string()); - } - let temp = tempfile::tempdir().expect("tempdir"); let repo = temp.path().join("bench.omni"); Omnigraph::init(repo.to_str().unwrap(), SCHEMA) @@ -229,13 +267,25 @@ async fn main() { let db = Omnigraph::open(repo.to_str().unwrap()) .await .expect("open repo"); - let state = AppState::new_with_bearer_tokens(repo.to_string_lossy().to_string(), db, tokens); + // Construct a custom WorkloadController with the requested caps and + // pass it through `AppState::new_with_workload`. Avoids the + // `unsafe { std::env::set_var(...) }` antipattern that violates + // `setenv`'s thread-safety precondition once the multi-thread tokio + // runtime is up. + let workload = WorkloadController::new(args.inflight_cap, args.byte_cap, args.global_rewrite_cap); + let state = AppState::new_with_workload( + repo.to_string_lossy().to_string(), + db, + tokens, + workload, + ); let app = build_app(state); eprintln!( - "running heavy={}x{} light={}x{} cap={}", + "running heavy={}x{} (concurrency={}) light={}x{} cap={}", args.heavy_batches, args.heavy_rows_per_batch, + args.heavy_concurrency, args.light_actors, args.light_ops_per_actor, args.inflight_cap, @@ -243,8 +293,15 @@ async fn main() { let start = Instant::now(); let heavy_app = app.clone(); + let heavy_concurrency = args.heavy_concurrency; let heavy_handle = tokio::spawn(async move { - drive_heavy_actor(heavy_app, args.heavy_batches, args.heavy_rows_per_batch).await + drive_heavy_actor( + heavy_app, + args.heavy_batches, + args.heavy_rows_per_batch, + heavy_concurrency, + ) + .await }); let mut light_handles = Vec::with_capacity(args.light_actors); diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 6c6dcaf..dfd5924 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -206,6 +206,29 @@ impl AppState { } } + /// Construct with a caller-provided [`workload::WorkloadController`]. + /// Tests and benches use this to override per-actor caps without + /// mutating global env vars (which is unsafe in Rust 2024 once the + /// async runtime is up — `setenv` isn't thread-safe). + pub fn new_with_workload( + uri: String, + db: Omnigraph, + bearer_tokens: Vec<(String, String)>, + workload: workload::WorkloadController, + ) -> Self { + let bearer_tokens: Vec<(BearerTokenHash, Arc)> = bearer_tokens + .into_iter() + .map(|(actor, token)| (hash_bearer_token(&token), Arc::::from(actor))) + .collect(); + Self { + uri, + engine: Arc::new(db), + workload: Arc::new(workload), + bearer_tokens: Arc::from(bearer_tokens), + policy_engine: None, + } + } + pub async fn open(uri: impl Into) -> Result { Self::open_with_bearer_token(uri, None).await }