From b09a0972cb9f318a73fa733133df988e22493753 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Fri, 8 May 2026 17:12:50 +0200 Subject: [PATCH] bench: add actor-isolation harness for WorkloadController MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Empirical proof of MR-686's central design promise: per-actor admission control isolates noisy actors from light traffic. The existing bench_concurrent_http harness measures aggregate throughput; this harness measures the latency tail seen by light actors while a heavy actor saturates its own per-actor cap. Setup: one "heavy" actor flooding /ingest with multi-row NDJSON batches; N "light" actors each running short bursts of /change inserts, each authenticating with a distinct bearer token so the WorkloadController accounts them as separate identities. Output: heavy throughput / 429 count, light p50/p95/p99/max latency. Acceptance heuristic on local FS: light-actor p99 < 2 s while the heavy actor saturates its own cap. Sample run on local FS, cap=1, 4 light actors x 30 ops, 20 heavy batches x 50 rows: light p99 = 710 ms, light errors = 0 (well under the 2 s acceptance target). The test demonstrates the isolation property — the heavy /ingest holds its own admission slot but doesn't affect light actors since they have separate per-actor state. Usage: cargo run --release -p omnigraph-server --example bench_actor_isolation -- \ --light-actors 4 --light-ops-per-actor 30 \ --heavy-batches 20 --heavy-rows-per-batch 50 \ --inflight-cap 1 \ --output .context/bench-results/after-pr2-phase2/actor-isolation.json Co-Authored-By: Claude Opus 4.7 (1M context) --- .../examples/bench_actor_isolation.rs | 329 ++++++++++++++++++ 1 file changed, 329 insertions(+) create mode 100644 crates/omnigraph-server/examples/bench_actor_isolation.rs diff --git a/crates/omnigraph-server/examples/bench_actor_isolation.rs b/crates/omnigraph-server/examples/bench_actor_isolation.rs new file mode 100644 index 0000000..9f8b62a --- /dev/null +++ b/crates/omnigraph-server/examples/bench_actor_isolation.rs @@ -0,0 +1,329 @@ +//! Actor-isolation benchmark for MR-686's `WorkloadController`. +//! +//! The handoff calls this out as the empirical proof of MR-686's central +//! design promise: per-actor admission control isolates noisy actors so a +//! heavy `/ingest` user does not starve light `/change` traffic. The +//! per-`(table, branch)` queue pins the same-key serialization story; this +//! bench pins actor isolation under load. +//! +//! Setup: +//! - One "heavy" actor flooding `/ingest` with multi-row NDJSON bodies. +//! - N "light" actors each running short bursts of `/change` inserts. +//! - Each actor authenticates with its own bearer token so the +//! `WorkloadController` accounts them as distinct identities. +//! +//! Output: heavy-actor throughput / 429s, light-actor p50 / p95 / p99 +//! latency. Acceptance heuristic on local FS: light-actor p99 < 2 s +//! while the heavy actor saturates its own per-actor cap. +//! +//! Usage: +//! ```sh +//! cargo run --release -p omnigraph-server --example bench_actor_isolation -- \ +//! --light-actors 4 --light-ops-per-actor 50 \ +//! --heavy-batches 200 --heavy-rows-per-batch 200 \ +//! --inflight-cap 1 \ +//! --output bench-results/after-pr2-phase2/actor-isolation.json +//! ``` + +use std::path::PathBuf; +use std::time::{Duration, Instant}; + +use axum::Router; +use axum::body::{Body, to_bytes}; +use axum::http::{Method, Request, StatusCode}; +use clap::Parser; +use omnigraph::db::Omnigraph; +use omnigraph_server::api::{ChangeRequest, IngestRequest}; +use omnigraph_server::{AppState, build_app}; +use serde::Serialize; +use tower::ServiceExt; + +const SCHEMA: &str = "node Person {\n name: String @key\n age: I32?\n}\n"; + +const HEAVY_TOKEN: &str = "heavy-actor-token"; +const HEAVY_ACTOR: &str = "act-heavy"; + +#[derive(Parser, Debug)] +#[command(about = "Actor-isolation HTTP bench for MR-686 WorkloadController")] +struct Args { + /// Number of light actors driving /change traffic concurrently with the + /// heavy /ingest flood. Each gets its own bearer token. + #[arg(long, default_value_t = 4)] + light_actors: usize, + /// Number of /change ops per light actor. + #[arg(long, default_value_t = 50)] + light_ops_per_actor: usize, + /// Number of /ingest batches the heavy actor sends back-to-back. + #[arg(long, default_value_t = 200)] + heavy_batches: usize, + /// NDJSON rows per heavy /ingest batch. + #[arg(long, default_value_t = 200)] + heavy_rows_per_batch: usize, + /// `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX` for the run. Lower values + /// surface admission rejections faster. + #[arg(long, default_value_t = 1)] + inflight_cap: u32, + /// Output file for the JSON results. Stdout always gets a copy. + #[arg(long)] + output: Option, + /// Optional label to record alongside results. + #[arg(long, default_value = "")] + label: String, +} + +#[derive(Serialize, Debug)] +struct BenchResults { + label: String, + inflight_cap: u32, + light_actors: usize, + light_ops_per_actor: usize, + heavy_batches: usize, + heavy_rows_per_batch: usize, + wall_time_ms: u64, + heavy_ok: usize, + heavy_too_many_requests: usize, + heavy_other_errors: usize, + heavy_throughput_attempts_per_sec: f64, + light_ok: usize, + light_too_many_requests: usize, + light_other_errors: usize, + light_p50_ms: f64, + light_p95_ms: f64, + light_p99_ms: f64, + light_p999_ms: f64, + light_max_ms: f64, + notes: &'static str, +} + +fn build_heavy_body(batch_idx: usize, rows: usize) -> String { + let mut data = String::new(); + for r in 0..rows { + data.push_str(&format!( + "{{\"type\":\"Person\",\"data\":{{\"name\":\"heavy-b{}-r{}\",\"age\":{}}}}}\n", + batch_idx, + r, + r % 100, + )); + } + serde_json::to_string(&IngestRequest { + data, + branch: Some("main".to_string()), + from: Some("main".to_string()), + mode: Some(omnigraph::loader::LoadMode::Merge), + }) + .unwrap() +} + +async fn drive_heavy_actor(app: Router, batches: usize, rows_per_batch: usize) -> (usize, usize, usize) { + let mut ok = 0usize; + let mut too_many = 0usize; + let mut other = 0usize; + for b in 0..batches { + let body = build_heavy_body(b, rows_per_batch); + let req = Request::builder() + .method(Method::POST) + .uri("/ingest") + .header("authorization", format!("Bearer {HEAVY_TOKEN}")) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let response = match app.clone().oneshot(req).await { + Ok(r) => r, + Err(_) => { + other += 1; + continue; + } + }; + match response.status() { + StatusCode::OK => ok += 1, + StatusCode::TOO_MANY_REQUESTS => too_many += 1, + _ => other += 1, + } + } + (ok, too_many, other) +} + +async fn drive_light_actor( + app: Router, + token: String, + actor_idx: usize, + ops: usize, +) -> (Vec, usize, usize, usize) { + let mut latencies = Vec::with_capacity(ops); + let mut ok = 0usize; + let mut too_many = 0usize; + let mut other = 0usize; + for op_idx in 0..ops { + let request_body = ChangeRequest { + query_source: "query insert_person($name: String, $age: I32) {\n insert Person { name: $name, age: $age }\n}".to_string(), + query_name: Some("insert_person".to_string()), + params: Some(serde_json::json!({ + "name": format!("light-{actor_idx}-{op_idx}"), + "age": op_idx as i32, + })), + branch: Some("main".to_string()), + }; + let body = serde_json::to_vec(&request_body).unwrap(); + let req = Request::builder() + .method(Method::POST) + .uri("/change") + .header("authorization", format!("Bearer {token}")) + .header("content-type", "application/json") + .body(Body::from(body)) + .unwrap(); + let start = Instant::now(); + let response = match app.clone().oneshot(req).await { + Ok(r) => r, + Err(_) => { + other += 1; + continue; + } + }; + let elapsed = start.elapsed(); + match response.status() { + StatusCode::OK => { + ok += 1; + latencies.push(elapsed); + } + StatusCode::TOO_MANY_REQUESTS => { + too_many += 1; + // Drain to free the body resource. + let _ = to_bytes(response.into_body(), 16 * 1024).await; + } + _ => { + other += 1; + let _ = to_bytes(response.into_body(), 16 * 1024).await; + } + } + } + (latencies, ok, too_many, other) +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + if args.light_actors == 0 || args.light_ops_per_actor == 0 || args.heavy_batches == 0 { + eprintln!("--light-actors, --light-ops-per-actor, --heavy-batches must all be > 0"); + std::process::exit(2); + } + + // Override the per-actor in-flight cap before AppState is constructed + // (WorkloadController::from_env reads it at startup). + // SAFETY: single-threaded init at process start; no concurrent env reads. + unsafe { + std::env::set_var("OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX", args.inflight_cap.to_string()); + } + + let temp = tempfile::tempdir().expect("tempdir"); + let repo = temp.path().join("bench.omni"); + Omnigraph::init(repo.to_str().unwrap(), SCHEMA) + .await + .expect("init repo"); + + // Build bearer tokens: one for the heavy actor + one per light actor. + let mut tokens: Vec<(String, String)> = + vec![(HEAVY_ACTOR.to_string(), HEAVY_TOKEN.to_string())]; + for i in 0..args.light_actors { + tokens.push((format!("act-light-{i}"), format!("light-token-{i}"))); + } + let db = Omnigraph::open(repo.to_str().unwrap()) + .await + .expect("open repo"); + let state = AppState::new_with_bearer_tokens(repo.to_string_lossy().to_string(), db, tokens); + let app = build_app(state); + + eprintln!( + "running heavy={}x{} light={}x{} cap={}", + args.heavy_batches, + args.heavy_rows_per_batch, + args.light_actors, + args.light_ops_per_actor, + args.inflight_cap, + ); + + let start = Instant::now(); + let heavy_app = app.clone(); + let heavy_handle = tokio::spawn(async move { + drive_heavy_actor(heavy_app, args.heavy_batches, args.heavy_rows_per_batch).await + }); + + let mut light_handles = Vec::with_capacity(args.light_actors); + for actor_idx in 0..args.light_actors { + let app = app.clone(); + let token = format!("light-token-{actor_idx}"); + let ops = args.light_ops_per_actor; + light_handles.push(tokio::spawn(async move { + drive_light_actor(app, token, actor_idx, ops).await + })); + } + + let (heavy_ok, heavy_too_many, heavy_other) = heavy_handle.await.expect("heavy task panicked"); + let mut light_latencies: Vec = + Vec::with_capacity(args.light_actors * args.light_ops_per_actor); + let mut light_ok = 0usize; + let mut light_too_many = 0usize; + let mut light_other = 0usize; + for h in light_handles { + let (lats, ok, too_many, other) = h.await.expect("light task panicked"); + light_latencies.extend(lats); + light_ok += ok; + light_too_many += too_many; + light_other += other; + } + let wall = start.elapsed(); + + light_latencies.sort(); + let n = light_latencies.len(); + let pct = |p: f64| -> f64 { + if n == 0 { + return 0.0; + } + let idx = ((n as f64 - 1.0) * p).round() as usize; + light_latencies[idx].as_secs_f64() * 1000.0 + }; + let max_ms = light_latencies + .last() + .map(|d| d.as_secs_f64() * 1000.0) + .unwrap_or(0.0); + let heavy_throughput = if wall.as_secs_f64() > 0.0 { + args.heavy_batches as f64 / wall.as_secs_f64() + } else { + 0.0 + }; + + let results = BenchResults { + label: args.label.clone(), + inflight_cap: args.inflight_cap, + light_actors: args.light_actors, + light_ops_per_actor: args.light_ops_per_actor, + heavy_batches: args.heavy_batches, + heavy_rows_per_batch: args.heavy_rows_per_batch, + wall_time_ms: wall.as_millis() as u64, + heavy_ok, + heavy_too_many_requests: heavy_too_many, + heavy_other_errors: heavy_other, + heavy_throughput_attempts_per_sec: heavy_throughput, + light_ok, + light_too_many_requests: light_too_many, + light_other_errors: light_other, + light_p50_ms: pct(0.50), + light_p95_ms: pct(0.95), + light_p99_ms: pct(0.99), + light_p999_ms: pct(0.999), + light_max_ms: max_ms, + notes: "MR-686 actor-isolation bench. Heavy /ingest + light /change concurrent.", + }; + + let json = serde_json::to_string_pretty(&results).unwrap(); + println!("{json}"); + + if let Some(path) = args.output.as_ref() { + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + { + std::fs::create_dir_all(parent).expect("mkdir output parent"); + } + std::fs::write(path, &json).expect("write output"); + eprintln!("wrote {}", path.display()); + } +}