mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-09 01:35:18 +02:00
server: add WorkloadController for per-actor admission (PR 2 Step E)
PR 2 removes the global server `RwLock<Omnigraph>` (Step F). Without
admission control, one heavy actor would exhaust shared capacity
(Lance I/O threads, manifest churn, network) and starve other actors.
The WorkloadController bounds per-actor in-flight count + bytes and
provides a global rewrite-pool semaphore for compaction / index builds.
New file: `crates/omnigraph-server/src/workload.rs` (~250 LOC + 5 tests).
API:
- `WorkloadController::new(inflight_cap, byte_cap, rewrite_cap)` /
`from_env()` / `with_defaults()`.
- `try_admit(actor_id, est_bytes) -> Result<AdmissionGuard, RejectReason>`
acquires both an in-flight count permit and adds est_bytes to the
per-actor counter atomically; returns RejectReason on either gate.
- `try_admit_rewrite() -> Result<RewriteGuard, RejectReason>` for the
global rewrite pool (Step F maps RewriteGuard exhaustion to HTTP 503).
- `RejectReason::{InFlightCountExceeded, ByteBudgetExceeded,
GlobalRewriteExhausted}`.
Race-free admission via `tokio::sync::Semaphore::try_acquire_owned()`
for the count gate (master plan Finding 6: independent atomic
load+check+add lets two callers both pass a cap-N check; the Semaphore
gate is atomic). Bytes use `fetch_add` + decrement-on-rejection so the
cap is never exceeded even on rollback.
Defaults (override via env):
- OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16
- OMNIGRAPH_PER_ACTOR_BYTES_MAX=4_294_967_296 (4 GiB)
- OMNIGRAPH_GLOBAL_REWRITE_MAX=4
Tests cover under-cap admission, byte-budget rollback, per-actor
isolation, global rewrite cap, and the load-bearing 32-concurrent-vs-
cap-16 race test (forces real contention via a broadcast release
channel so guards can't recycle permits task-by-task; pins the
master plan's race-free invariant).
Adds workspace dep `dashmap = "6"` for per-actor state.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
1b0a2c9310
commit
17a1665002
3 changed files with 424 additions and 0 deletions
|
|
@ -37,6 +37,7 @@ futures = { workspace = true }
|
|||
sha2 = { workspace = true }
|
||||
subtle = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
dashmap = "6"
|
||||
aws-config = { version = "1", optional = true, default-features = false, features = ["rustls", "rt-tokio", "credentials-process", "sso"] }
|
||||
aws-sdk-secretsmanager = { version = "1", optional = true, default-features = false, features = ["rustls", "rt-tokio"] }
|
||||
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ pub mod api;
|
|||
pub mod auth;
|
||||
pub mod config;
|
||||
pub mod policy;
|
||||
pub mod workload;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs;
|
||||
|
|
|
|||
422
crates/omnigraph-server/src/workload.rs
Normal file
422
crates/omnigraph-server/src/workload.rs
Normal file
|
|
@ -0,0 +1,422 @@
|
|||
//! Per-actor admission control for the HTTP server (MR-686 §VII.A).
|
||||
//!
|
||||
//! The HTTP server's previous global `RwLock<Omnigraph>` serialized every
|
||||
//! mutating request across all actors. PR 2 removes that lock — engine
|
||||
//! APIs are now `&self`, so concurrent calls from different actors can
|
||||
//! run against `Arc<Omnigraph>` simultaneously. Without admission
|
||||
//! control, one heavy actor can exhaust shared capacity (Lance I/O
|
||||
//! threads, manifest churn, network) and starve other actors.
|
||||
//!
|
||||
//! This module provides:
|
||||
//!
|
||||
//! - **Per-actor in-flight count cap**: each actor has a
|
||||
//! `tokio::sync::Semaphore` with `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX`
|
||||
//! permits (default 16). `try_acquire_owned()` returns `Err` when
|
||||
//! exhausted; the server maps this to HTTP 429.
|
||||
//!
|
||||
//! - **Per-actor in-flight byte budget**: each actor accumulates an
|
||||
//! `AtomicU64` byte estimate. `fetch_add(est_bytes)` then a check
|
||||
//! against `byte_cap` is race-free via decrement-on-rejection. The
|
||||
//! server maps an over-budget result to HTTP 429 as well.
|
||||
//!
|
||||
//! - **Global rewrite semaphore**: bounds the number of concurrent
|
||||
//! compaction / index-build / similar O(table-size) rewrite paths.
|
||||
//! Default: 4. Exhaustion maps to HTTP 503 because the limit is a
|
||||
//! capacity-planning safety net rather than a per-actor abuse guard.
|
||||
//!
|
||||
//! Counts are governed by the semaphore (race-free `try_acquire_owned()`
|
||||
//! enforces the cap atomically); bytes use `fetch_add` + decrement-on-
|
||||
//! rejection. Both checks are atomic compare-and-act, never
|
||||
//! load-then-act — the test
|
||||
//! `actor_admission_race_does_not_exceed_cap` pins this contract by
|
||||
//! spawning 32 concurrent `try_admit` calls against a cap of 16 and
|
||||
//! asserting exactly 16 succeed.
|
||||
//!
|
||||
//! Acquisition order against the engine's per-(table, branch) write
|
||||
//! queue: admission FIRST (the HTTP handler reserves capacity before
|
||||
//! calling into the engine), engine queue SECOND (acquired inside
|
||||
//! `MutationStaging::commit_all`). This composes cleanly because
|
||||
//! admission is a single per-actor count + budget check, never
|
||||
//! cross-actor; nothing the engine does can change a peer actor's
|
||||
//! admission state.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
|
||||
|
||||
/// Default per-actor in-flight count cap. Override via
|
||||
/// `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX`.
|
||||
pub const DEFAULT_PER_ACTOR_INFLIGHT_MAX: u32 = 16;
|
||||
|
||||
/// Default per-actor in-flight byte budget (4 GiB). Override via
|
||||
/// `OMNIGRAPH_PER_ACTOR_BYTES_MAX`.
|
||||
pub const DEFAULT_PER_ACTOR_BYTES_MAX: u64 = 4 * 1024 * 1024 * 1024;
|
||||
|
||||
/// Default global rewrite-pool capacity (compaction, index builds).
|
||||
/// Override via `OMNIGRAPH_GLOBAL_REWRITE_MAX`.
|
||||
pub const DEFAULT_GLOBAL_REWRITE_MAX: u32 = 4;
|
||||
|
||||
/// Why a `try_admit` call returned `Err`. The server maps each variant
|
||||
/// to a specific HTTP response code; see `WorkloadController` docs.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum RejectReason {
|
||||
/// Actor exceeded the per-actor in-flight count cap. HTTP 429.
|
||||
InFlightCountExceeded { cap: u32 },
|
||||
/// Actor exceeded the per-actor in-flight byte budget. HTTP 429.
|
||||
ByteBudgetExceeded { cap: u64, attempted: u64 },
|
||||
/// Global rewrite pool is full. HTTP 503.
|
||||
GlobalRewriteExhausted { cap: u32 },
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RejectReason {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
RejectReason::InFlightCountExceeded { cap } => {
|
||||
write!(f, "actor in-flight count cap {} exceeded", cap)
|
||||
}
|
||||
RejectReason::ByteBudgetExceeded { cap, attempted } => write!(
|
||||
f,
|
||||
"actor byte budget exceeded: would use {} bytes against cap {}",
|
||||
attempted, cap
|
||||
),
|
||||
RejectReason::GlobalRewriteExhausted { cap } => {
|
||||
write!(f, "global rewrite pool full (cap {})", cap)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-actor counters. One instance per actor_id, lazily created on
|
||||
/// first admission attempt.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ActorState {
|
||||
/// Counts the number of concurrent in-flight requests for this
|
||||
/// actor. `try_acquire_owned()` is the count-cap gate.
|
||||
in_flight_sem: Arc<Semaphore>,
|
||||
/// Total bytes estimated to be in flight for this actor across
|
||||
/// concurrent requests. `fetch_add` + check + decrement-on-failure
|
||||
/// keeps the cap atomic.
|
||||
bytes: AtomicU64,
|
||||
/// Per-actor byte cap (snapshot of `WorkloadController.byte_cap`
|
||||
/// at construction; cap mutations don't propagate to existing
|
||||
/// ActorStates by design — controller config changes apply on
|
||||
/// next ActorState construction).
|
||||
byte_cap: u64,
|
||||
/// Per-actor count cap (same snapshot semantics as `byte_cap`).
|
||||
inflight_cap: u32,
|
||||
}
|
||||
|
||||
impl ActorState {
|
||||
fn new(inflight_cap: u32, byte_cap: u64) -> Self {
|
||||
Self {
|
||||
in_flight_sem: Arc::new(Semaphore::new(inflight_cap as usize)),
|
||||
bytes: AtomicU64::new(0),
|
||||
byte_cap,
|
||||
inflight_cap,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Server-side per-actor admission controller. Constructed once at
|
||||
/// server startup and shared via `Arc<WorkloadController>` on
|
||||
/// `AppState`.
|
||||
pub struct WorkloadController {
|
||||
per_actor: DashMap<Arc<str>, Arc<ActorState>>,
|
||||
inflight_cap: u32,
|
||||
byte_cap: u64,
|
||||
global_rewrite: Arc<Semaphore>,
|
||||
global_rewrite_cap: u32,
|
||||
}
|
||||
|
||||
impl WorkloadController {
|
||||
/// Construct from explicit caps. Tests can override.
|
||||
pub fn new(inflight_cap: u32, byte_cap: u64, global_rewrite_cap: u32) -> Self {
|
||||
Self {
|
||||
per_actor: DashMap::new(),
|
||||
inflight_cap,
|
||||
byte_cap,
|
||||
global_rewrite: Arc::new(Semaphore::new(global_rewrite_cap as usize)),
|
||||
global_rewrite_cap,
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct from environment variables, falling back to defaults.
|
||||
/// Bad env values fall back to the default with a `tracing::warn!`.
|
||||
pub fn from_env() -> Self {
|
||||
let inflight_cap = parse_env_u32(
|
||||
"OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX",
|
||||
DEFAULT_PER_ACTOR_INFLIGHT_MAX,
|
||||
);
|
||||
let byte_cap = parse_env_u64("OMNIGRAPH_PER_ACTOR_BYTES_MAX", DEFAULT_PER_ACTOR_BYTES_MAX);
|
||||
let global_rewrite_cap =
|
||||
parse_env_u32("OMNIGRAPH_GLOBAL_REWRITE_MAX", DEFAULT_GLOBAL_REWRITE_MAX);
|
||||
Self::new(inflight_cap, byte_cap, global_rewrite_cap)
|
||||
}
|
||||
|
||||
/// Construct with default caps. Suitable for tests / single-tenant
|
||||
/// deployments without explicit configuration.
|
||||
pub fn with_defaults() -> Self {
|
||||
Self::new(
|
||||
DEFAULT_PER_ACTOR_INFLIGHT_MAX,
|
||||
DEFAULT_PER_ACTOR_BYTES_MAX,
|
||||
DEFAULT_GLOBAL_REWRITE_MAX,
|
||||
)
|
||||
}
|
||||
|
||||
fn actor_state(&self, actor_id: &Arc<str>) -> Arc<ActorState> {
|
||||
if let Some(existing) = self.per_actor.get(actor_id) {
|
||||
return existing.clone();
|
||||
}
|
||||
// Race-on-construct is benign: DashMap's `entry().or_insert_with`
|
||||
// serializes per-key construction; the loser's freshly-built
|
||||
// ActorState gets dropped without observable effect.
|
||||
self.per_actor
|
||||
.entry(actor_id.clone())
|
||||
.or_insert_with(|| Arc::new(ActorState::new(self.inflight_cap, self.byte_cap)))
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Reserve admission for one in-flight request from `actor_id`
|
||||
/// estimated to consume `est_bytes`. Returns an `AdmissionGuard`
|
||||
/// that releases the count permit + decrements the byte total
|
||||
/// when dropped.
|
||||
///
|
||||
/// On rejection, the byte counter is decremented before returning
|
||||
/// — callers can retry without leaking budget.
|
||||
pub fn try_admit(
|
||||
&self,
|
||||
actor_id: &Arc<str>,
|
||||
est_bytes: u64,
|
||||
) -> Result<AdmissionGuard, RejectReason> {
|
||||
let state = self.actor_state(actor_id);
|
||||
|
||||
// Count gate: race-free via `try_acquire_owned()`. If exhausted,
|
||||
// immediately reject — no byte accounting needed for this request.
|
||||
let permit = match Arc::clone(&state.in_flight_sem).try_acquire_owned() {
|
||||
Ok(permit) => permit,
|
||||
Err(TryAcquireError::NoPermits) => {
|
||||
return Err(RejectReason::InFlightCountExceeded {
|
||||
cap: state.inflight_cap,
|
||||
});
|
||||
}
|
||||
Err(TryAcquireError::Closed) => {
|
||||
return Err(RejectReason::InFlightCountExceeded {
|
||||
cap: state.inflight_cap,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Byte gate: atomic fetch_add then check; decrement on overflow.
|
||||
// `Ordering::SeqCst` is conservative; per-actor accounting is
|
||||
// not on the hot path of read queries.
|
||||
let prior = state.bytes.fetch_add(est_bytes, Ordering::SeqCst);
|
||||
let attempted = prior.saturating_add(est_bytes);
|
||||
if attempted > state.byte_cap {
|
||||
// Roll back the byte add. The permit drops with `permit`
|
||||
// going out of scope below.
|
||||
state.bytes.fetch_sub(est_bytes, Ordering::SeqCst);
|
||||
return Err(RejectReason::ByteBudgetExceeded {
|
||||
cap: state.byte_cap,
|
||||
attempted,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(AdmissionGuard {
|
||||
_permit: permit,
|
||||
actor_state: state,
|
||||
est_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
/// Reserve a global rewrite slot (compaction, index build, etc.).
|
||||
/// Returned guard releases the slot when dropped.
|
||||
pub fn try_admit_rewrite(&self) -> Result<RewriteGuard, RejectReason> {
|
||||
match Arc::clone(&self.global_rewrite).try_acquire_owned() {
|
||||
Ok(permit) => Ok(RewriteGuard { _permit: permit }),
|
||||
Err(_) => Err(RejectReason::GlobalRewriteExhausted {
|
||||
cap: self.global_rewrite_cap,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop-on-completion guard for an admitted request. Dropping releases
|
||||
/// the in-flight count permit (via `Drop` on the underlying semaphore
|
||||
/// permit) and decrements the actor's byte counter.
|
||||
#[derive(Debug)]
|
||||
pub struct AdmissionGuard {
|
||||
_permit: OwnedSemaphorePermit,
|
||||
actor_state: Arc<ActorState>,
|
||||
est_bytes: u64,
|
||||
}
|
||||
|
||||
impl Drop for AdmissionGuard {
|
||||
fn drop(&mut self) {
|
||||
self.actor_state
|
||||
.bytes
|
||||
.fetch_sub(self.est_bytes, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop-on-completion guard for the global rewrite pool.
|
||||
#[derive(Debug)]
|
||||
pub struct RewriteGuard {
|
||||
_permit: OwnedSemaphorePermit,
|
||||
}
|
||||
|
||||
fn parse_env_u32(name: &str, default: u32) -> u32 {
|
||||
match std::env::var(name) {
|
||||
Ok(v) => v.parse::<u32>().unwrap_or_else(|err| {
|
||||
tracing::warn!(
|
||||
env = name,
|
||||
value = %v,
|
||||
error = %err,
|
||||
default,
|
||||
"invalid env value, using default"
|
||||
);
|
||||
default
|
||||
}),
|
||||
Err(_) => default,
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_env_u64(name: &str, default: u64) -> u64 {
|
||||
match std::env::var(name) {
|
||||
Ok(v) => v.parse::<u64>().unwrap_or_else(|err| {
|
||||
tracing::warn!(
|
||||
env = name,
|
||||
value = %v,
|
||||
error = %err,
|
||||
default,
|
||||
"invalid env value, using default"
|
||||
);
|
||||
default
|
||||
}),
|
||||
Err(_) => default,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn try_admit_admits_under_cap() {
|
||||
let controller = WorkloadController::new(2, 1024, 1);
|
||||
let actor: Arc<str> = "alice".into();
|
||||
let g1 = controller.try_admit(&actor, 100).expect("first admit");
|
||||
let _g2 = controller.try_admit(&actor, 100).expect("second admit");
|
||||
let err = controller
|
||||
.try_admit(&actor, 100)
|
||||
.expect_err("third should reject on count");
|
||||
assert!(matches!(err, RejectReason::InFlightCountExceeded { cap: 2 }));
|
||||
drop(g1);
|
||||
// After drop, a new admit succeeds again.
|
||||
let _g3 = controller
|
||||
.try_admit(&actor, 100)
|
||||
.expect("admit after drop");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn byte_budget_caps_admission() {
|
||||
let controller = WorkloadController::new(16, 1000, 1);
|
||||
let actor: Arc<str> = "alice".into();
|
||||
let _g1 = controller.try_admit(&actor, 600).expect("first admit");
|
||||
let err = controller
|
||||
.try_admit(&actor, 600)
|
||||
.expect_err("second should reject on bytes");
|
||||
match err {
|
||||
RejectReason::ByteBudgetExceeded { cap, attempted } => {
|
||||
assert_eq!(cap, 1000);
|
||||
assert_eq!(attempted, 1200);
|
||||
}
|
||||
other => panic!("expected ByteBudgetExceeded, got {:?}", other),
|
||||
}
|
||||
// Verify the byte counter was rolled back: a smaller request fits.
|
||||
let _g2 = controller.try_admit(&actor, 300).expect("smaller admit");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn actor_admission_race_does_not_exceed_cap() {
|
||||
// Pin master plan §"WorkloadController" Finding 6: independent
|
||||
// atomic load + check + add allows two concurrent callers to
|
||||
// both pass a cap-N check. The Semaphore-based gate is
|
||||
// race-free — exactly cap_count callers succeed.
|
||||
//
|
||||
// Each task holds its admission guard until released via a
|
||||
// oneshot channel; this forces real contention because guards
|
||||
// can't drop and free permits before all 32 calls have raced.
|
||||
let controller = Arc::new(WorkloadController::new(16, u64::MAX / 4, 1));
|
||||
let actor: Arc<str> = "racer".into();
|
||||
|
||||
let (release_tx, _) = tokio::sync::broadcast::channel::<()>(1);
|
||||
|
||||
let mut handles = Vec::with_capacity(32);
|
||||
for _ in 0..32 {
|
||||
let controller = Arc::clone(&controller);
|
||||
let actor = actor.clone();
|
||||
let mut release_rx = release_tx.subscribe();
|
||||
handles.push(tokio::spawn(async move {
|
||||
let result = controller.try_admit(&actor, 1);
|
||||
let success = result.is_ok();
|
||||
// Hold the guard (if any) until the test signals release,
|
||||
// so the cap-16 contention is observable across all 32
|
||||
// tasks instead of permits being recycled task-by-task.
|
||||
let _guard = result.ok();
|
||||
let _ = release_rx.recv().await;
|
||||
success
|
||||
}));
|
||||
}
|
||||
|
||||
// Give all 32 tasks a chance to hit `try_admit` before any can
|
||||
// drop their guard. 50ms is plenty for tokio's scheduler on a
|
||||
// 4-worker runtime.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
// Release every task; collect succeed/reject counts.
|
||||
let _ = release_tx.send(());
|
||||
|
||||
let mut accepted = 0u32;
|
||||
let mut rejected = 0u32;
|
||||
for h in handles {
|
||||
if h.await.unwrap() {
|
||||
accepted += 1;
|
||||
} else {
|
||||
rejected += 1;
|
||||
}
|
||||
}
|
||||
assert_eq!(accepted, 16, "expected exactly 16 successful admits");
|
||||
assert_eq!(rejected, 16, "expected exactly 16 rejections");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn per_actor_caps_independent() {
|
||||
let controller = WorkloadController::new(1, 1024, 1);
|
||||
let alice: Arc<str> = "alice".into();
|
||||
let bob: Arc<str> = "bob".into();
|
||||
let _ga = controller.try_admit(&alice, 100).expect("alice ok");
|
||||
// Alice over count cap, Bob unaffected.
|
||||
let err = controller.try_admit(&alice, 100).expect_err("alice rejected");
|
||||
assert!(matches!(err, RejectReason::InFlightCountExceeded { .. }));
|
||||
let _gb = controller.try_admit(&bob, 100).expect("bob ok");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn global_rewrite_cap_enforced() {
|
||||
let controller = WorkloadController::new(16, u64::MAX / 4, 2);
|
||||
let g1 = controller.try_admit_rewrite().expect("first rewrite");
|
||||
let _g2 = controller.try_admit_rewrite().expect("second rewrite");
|
||||
let err = controller
|
||||
.try_admit_rewrite()
|
||||
.expect_err("third should reject");
|
||||
assert!(matches!(
|
||||
err,
|
||||
RejectReason::GlobalRewriteExhausted { cap: 2 }
|
||||
));
|
||||
drop(g1);
|
||||
let _g3 = controller
|
||||
.try_admit_rewrite()
|
||||
.expect("rewrite after drop");
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue