mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
server+bench: AppState::new_with_workload; bench drops set_var, exercises heavy cap
Two cubic findings on bench_actor_isolation.rs flagged together:
P2 (lib.rs:202): `unsafe { std::env::set_var(...) }` ran inside
`#[tokio::main] async fn main()` AFTER the multi-thread tokio runtime
was up. Rust 2024 made `set_var` unsafe because libc's `setenv` is
not thread-safe; concurrent env reads from logging or runtime
internals can race or read torn state.
Fix (correct by design, AGENTS.md rule 9): add a public
`AppState::new_with_workload(uri, db, bearer_tokens, workload)`
constructor that takes a caller-built `WorkloadController`. Tests and
benches override per-actor caps via the constructor instead of
mutating global env. Closes the bug class "tests need to mutate
global env to override AppState defaults."
P2 (lib.rs:130): heavy actor's `oneshot.await` inside the loop
serialized — heavy in-flight count was always 1, so cap=1 never
tripped on the heavy side. The bench validated isolation (light p99
bounded) but didn't demonstrate the rejection path.
Fix: add a `--heavy-concurrency` arg (default 4) and spawn batches
as concurrent tokio tasks bounded by an internal semaphore. With
heavy_concurrency=4 and inflight_cap=1, the bench now reports
heavy_too_many_requests > 0 and heavy_ok == 1 at peak — proving the
gate fires for the heavy actor.
Sample run on local FS (4 light actors × 30 ops, 20 heavy batches ×
50 rows, heavy_concurrency=4, cap=1):
heavy_ok: 1
heavy_too_many_requests: 19
light_ok: 120
light_too_many_requests: 0
light_p99: 565 ms (target < 2 s)
Heavy saturates its own cap; light actors are completely unaffected.
The isolation property is now empirically proven by the rejection
counts rather than just by the latency tail.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
8e1a8e7d55
commit
22d76dbb40
2 changed files with 111 additions and 31 deletions
|
|
@ -34,6 +34,7 @@ use axum::http::{Method, Request, StatusCode};
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use omnigraph::db::Omnigraph;
|
use omnigraph::db::Omnigraph;
|
||||||
use omnigraph_server::api::{ChangeRequest, IngestRequest};
|
use omnigraph_server::api::{ChangeRequest, IngestRequest};
|
||||||
|
use omnigraph_server::workload::WorkloadController;
|
||||||
use omnigraph_server::{AppState, build_app};
|
use omnigraph_server::{AppState, build_app};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tower::ServiceExt;
|
use tower::ServiceExt;
|
||||||
|
|
@ -53,16 +54,32 @@ struct Args {
|
||||||
/// Number of /change ops per light actor.
|
/// Number of /change ops per light actor.
|
||||||
#[arg(long, default_value_t = 50)]
|
#[arg(long, default_value_t = 50)]
|
||||||
light_ops_per_actor: usize,
|
light_ops_per_actor: usize,
|
||||||
/// Number of /ingest batches the heavy actor sends back-to-back.
|
/// Number of /ingest batches the heavy actor sends.
|
||||||
#[arg(long, default_value_t = 200)]
|
#[arg(long, default_value_t = 200)]
|
||||||
heavy_batches: usize,
|
heavy_batches: usize,
|
||||||
/// NDJSON rows per heavy /ingest batch.
|
/// NDJSON rows per heavy /ingest batch.
|
||||||
#[arg(long, default_value_t = 200)]
|
#[arg(long, default_value_t = 200)]
|
||||||
heavy_rows_per_batch: usize,
|
heavy_rows_per_batch: usize,
|
||||||
/// `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX` for the run. Lower values
|
/// Concurrent in-flight /ingest tasks the heavy actor maintains. With
|
||||||
/// surface admission rejections faster.
|
/// `inflight_cap` smaller than this, the heavy actor exercises its own
|
||||||
|
/// admission cap (and the bench reports `heavy_too_many_requests > 0`),
|
||||||
|
/// proving the gate fires without affecting light actors. Default 4
|
||||||
|
/// against cap=1 → expect ~3/4 batches rejected.
|
||||||
|
#[arg(long, default_value_t = 4)]
|
||||||
|
heavy_concurrency: usize,
|
||||||
|
/// Per-actor in-flight cap for the run. Passed directly into the
|
||||||
|
/// `WorkloadController` constructor (no env-var fiddling). Lower
|
||||||
|
/// values surface admission rejections faster.
|
||||||
#[arg(long, default_value_t = 1)]
|
#[arg(long, default_value_t = 1)]
|
||||||
inflight_cap: u32,
|
inflight_cap: u32,
|
||||||
|
/// Per-actor byte budget (bytes). Default 1 GiB so byte budget
|
||||||
|
/// doesn't bottleneck the count gate during normal bench runs.
|
||||||
|
#[arg(long, default_value_t = 1_073_741_824)]
|
||||||
|
byte_cap: u64,
|
||||||
|
/// Global rewrite-pool cap. Bench is non-rewriting so default 4
|
||||||
|
/// matches production.
|
||||||
|
#[arg(long, default_value_t = 4)]
|
||||||
|
global_rewrite_cap: u32,
|
||||||
/// Output file for the JSON results. Stdout always gets a copy.
|
/// Output file for the JSON results. Stdout always gets a copy.
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
output: Option<PathBuf>,
|
output: Option<PathBuf>,
|
||||||
|
|
@ -114,12 +131,8 @@ fn build_heavy_body(batch_idx: usize, rows: usize) -> String {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn drive_heavy_actor(app: Router, batches: usize, rows_per_batch: usize) -> (usize, usize, usize) {
|
async fn send_heavy_batch(app: Router, batch_idx: usize, rows: usize) -> StatusCode {
|
||||||
let mut ok = 0usize;
|
let body = build_heavy_body(batch_idx, rows);
|
||||||
let mut too_many = 0usize;
|
|
||||||
let mut other = 0usize;
|
|
||||||
for b in 0..batches {
|
|
||||||
let body = build_heavy_body(b, rows_per_batch);
|
|
||||||
let req = Request::builder()
|
let req = Request::builder()
|
||||||
.method(Method::POST)
|
.method(Method::POST)
|
||||||
.uri("/ingest")
|
.uri("/ingest")
|
||||||
|
|
@ -127,14 +140,44 @@ async fn drive_heavy_actor(app: Router, batches: usize, rows_per_batch: usize) -
|
||||||
.header("content-type", "application/json")
|
.header("content-type", "application/json")
|
||||||
.body(Body::from(body))
|
.body(Body::from(body))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let response = match app.clone().oneshot(req).await {
|
match app.oneshot(req).await {
|
||||||
Ok(r) => r,
|
Ok(r) => r.status(),
|
||||||
Err(_) => {
|
Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
other += 1;
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
match response.status() {
|
|
||||||
|
/// Drive `batches` /ingest calls from the heavy actor with up to
|
||||||
|
/// `concurrency` in flight at a time. With `concurrency > inflight_cap`,
|
||||||
|
/// the heavy actor's own admission permits are exhausted at peak, and
|
||||||
|
/// some batches return 429. Returns (ok, 429, other) counts.
|
||||||
|
async fn drive_heavy_actor(
|
||||||
|
app: Router,
|
||||||
|
batches: usize,
|
||||||
|
rows_per_batch: usize,
|
||||||
|
concurrency: usize,
|
||||||
|
) -> (usize, usize, usize) {
|
||||||
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
|
let limiter = Arc::new(Semaphore::new(concurrency.max(1)));
|
||||||
|
let mut handles = Vec::with_capacity(batches);
|
||||||
|
for b in 0..batches {
|
||||||
|
let app = app.clone();
|
||||||
|
let limiter = Arc::clone(&limiter);
|
||||||
|
handles.push(tokio::spawn(async move {
|
||||||
|
// Bound concurrency to `concurrency`; this is the bench's
|
||||||
|
// own pacing, not the server's admission control. The
|
||||||
|
// server's `WorkloadController` is what we're trying to
|
||||||
|
// exercise — and it has its own cap (potentially smaller).
|
||||||
|
let _permit = limiter.acquire_owned().await.unwrap();
|
||||||
|
send_heavy_batch(app, b, rows_per_batch).await
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut ok = 0usize;
|
||||||
|
let mut too_many = 0usize;
|
||||||
|
let mut other = 0usize;
|
||||||
|
for h in handles {
|
||||||
|
match h.await.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) {
|
||||||
StatusCode::OK => ok += 1,
|
StatusCode::OK => ok += 1,
|
||||||
StatusCode::TOO_MANY_REQUESTS => too_many += 1,
|
StatusCode::TOO_MANY_REQUESTS => too_many += 1,
|
||||||
_ => other += 1,
|
_ => other += 1,
|
||||||
|
|
@ -143,6 +186,8 @@ async fn drive_heavy_actor(app: Router, batches: usize, rows_per_batch: usize) -
|
||||||
(ok, too_many, other)
|
(ok, too_many, other)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
async fn drive_light_actor(
|
async fn drive_light_actor(
|
||||||
app: Router,
|
app: Router,
|
||||||
token: String,
|
token: String,
|
||||||
|
|
@ -207,13 +252,6 @@ async fn main() {
|
||||||
std::process::exit(2);
|
std::process::exit(2);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Override the per-actor in-flight cap before AppState is constructed
|
|
||||||
// (WorkloadController::from_env reads it at startup).
|
|
||||||
// SAFETY: single-threaded init at process start; no concurrent env reads.
|
|
||||||
unsafe {
|
|
||||||
std::env::set_var("OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX", args.inflight_cap.to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
let temp = tempfile::tempdir().expect("tempdir");
|
let temp = tempfile::tempdir().expect("tempdir");
|
||||||
let repo = temp.path().join("bench.omni");
|
let repo = temp.path().join("bench.omni");
|
||||||
Omnigraph::init(repo.to_str().unwrap(), SCHEMA)
|
Omnigraph::init(repo.to_str().unwrap(), SCHEMA)
|
||||||
|
|
@ -229,13 +267,25 @@ async fn main() {
|
||||||
let db = Omnigraph::open(repo.to_str().unwrap())
|
let db = Omnigraph::open(repo.to_str().unwrap())
|
||||||
.await
|
.await
|
||||||
.expect("open repo");
|
.expect("open repo");
|
||||||
let state = AppState::new_with_bearer_tokens(repo.to_string_lossy().to_string(), db, tokens);
|
// Construct a custom WorkloadController with the requested caps and
|
||||||
|
// pass it through `AppState::new_with_workload`. Avoids the
|
||||||
|
// `unsafe { std::env::set_var(...) }` antipattern that violates
|
||||||
|
// `setenv`'s thread-safety precondition once the multi-thread tokio
|
||||||
|
// runtime is up.
|
||||||
|
let workload = WorkloadController::new(args.inflight_cap, args.byte_cap, args.global_rewrite_cap);
|
||||||
|
let state = AppState::new_with_workload(
|
||||||
|
repo.to_string_lossy().to_string(),
|
||||||
|
db,
|
||||||
|
tokens,
|
||||||
|
workload,
|
||||||
|
);
|
||||||
let app = build_app(state);
|
let app = build_app(state);
|
||||||
|
|
||||||
eprintln!(
|
eprintln!(
|
||||||
"running heavy={}x{} light={}x{} cap={}",
|
"running heavy={}x{} (concurrency={}) light={}x{} cap={}",
|
||||||
args.heavy_batches,
|
args.heavy_batches,
|
||||||
args.heavy_rows_per_batch,
|
args.heavy_rows_per_batch,
|
||||||
|
args.heavy_concurrency,
|
||||||
args.light_actors,
|
args.light_actors,
|
||||||
args.light_ops_per_actor,
|
args.light_ops_per_actor,
|
||||||
args.inflight_cap,
|
args.inflight_cap,
|
||||||
|
|
@ -243,8 +293,15 @@ async fn main() {
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let heavy_app = app.clone();
|
let heavy_app = app.clone();
|
||||||
|
let heavy_concurrency = args.heavy_concurrency;
|
||||||
let heavy_handle = tokio::spawn(async move {
|
let heavy_handle = tokio::spawn(async move {
|
||||||
drive_heavy_actor(heavy_app, args.heavy_batches, args.heavy_rows_per_batch).await
|
drive_heavy_actor(
|
||||||
|
heavy_app,
|
||||||
|
args.heavy_batches,
|
||||||
|
args.heavy_rows_per_batch,
|
||||||
|
heavy_concurrency,
|
||||||
|
)
|
||||||
|
.await
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut light_handles = Vec::with_capacity(args.light_actors);
|
let mut light_handles = Vec::with_capacity(args.light_actors);
|
||||||
|
|
|
||||||
|
|
@ -206,6 +206,29 @@ impl AppState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Construct with a caller-provided [`workload::WorkloadController`].
|
||||||
|
/// Tests and benches use this to override per-actor caps without
|
||||||
|
/// mutating global env vars (which is unsafe in Rust 2024 once the
|
||||||
|
/// async runtime is up — `setenv` isn't thread-safe).
|
||||||
|
pub fn new_with_workload(
|
||||||
|
uri: String,
|
||||||
|
db: Omnigraph,
|
||||||
|
bearer_tokens: Vec<(String, String)>,
|
||||||
|
workload: workload::WorkloadController,
|
||||||
|
) -> Self {
|
||||||
|
let bearer_tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
|
||||||
|
.into_iter()
|
||||||
|
.map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
|
||||||
|
.collect();
|
||||||
|
Self {
|
||||||
|
uri,
|
||||||
|
engine: Arc::new(db),
|
||||||
|
workload: Arc::new(workload),
|
||||||
|
bearer_tokens: Arc::from(bearer_tokens),
|
||||||
|
policy_engine: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn open(uri: impl Into<String>) -> Result<Self> {
|
pub async fn open(uri: impl Into<String>) -> Result<Self> {
|
||||||
Self::open_with_bearer_token(uri, None).await
|
Self::open_with_bearer_token(uri, None).await
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue