mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-21 02:28:07 +02:00
server: drop unwired try_admit_rewrite / 503 admission surface
This commit is contained in:
parent
4bb7964af9
commit
6a3f0677ae
7 changed files with 12 additions and 109 deletions
|
|
@ -19,11 +19,6 @@
|
|||
//! against `byte_cap` is race-free via decrement-on-rejection. The
|
||||
//! server maps an over-budget result to HTTP 429 as well.
|
||||
//!
|
||||
//! - **Global rewrite semaphore**: bounds the number of concurrent
|
||||
//! compaction / index-build / similar O(table-size) rewrite paths.
|
||||
//! Default: 4. Exhaustion maps to HTTP 503 because the limit is a
|
||||
//! capacity-planning safety net rather than a per-actor abuse guard.
|
||||
//!
|
||||
//! Counts are governed by the semaphore (race-free `try_acquire_owned()`
|
||||
//! enforces the cap atomically); bytes use `fetch_add` + decrement-on-
|
||||
//! rejection. Both checks are atomic compare-and-act, never
|
||||
|
|
@ -54,10 +49,6 @@ pub const DEFAULT_PER_ACTOR_INFLIGHT_MAX: u32 = 16;
|
|||
/// `OMNIGRAPH_PER_ACTOR_BYTES_MAX`.
|
||||
pub const DEFAULT_PER_ACTOR_BYTES_MAX: u64 = 4 * 1024 * 1024 * 1024;
|
||||
|
||||
/// Default global rewrite-pool capacity (compaction, index builds).
|
||||
/// Override via `OMNIGRAPH_GLOBAL_REWRITE_MAX`.
|
||||
pub const DEFAULT_GLOBAL_REWRITE_MAX: u32 = 4;
|
||||
|
||||
/// Why a `try_admit` call returned `Err`. The server maps each variant
|
||||
/// to a specific HTTP response code; see `WorkloadController` docs.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
|
|
@ -66,8 +57,6 @@ pub enum RejectReason {
|
|||
InFlightCountExceeded { cap: u32 },
|
||||
/// Actor exceeded the per-actor in-flight byte budget. HTTP 429.
|
||||
ByteBudgetExceeded { cap: u64, attempted: u64 },
|
||||
/// Global rewrite pool is full. HTTP 503.
|
||||
GlobalRewriteExhausted { cap: u32 },
|
||||
}
|
||||
|
||||
impl std::fmt::Display for RejectReason {
|
||||
|
|
@ -81,9 +70,6 @@ impl std::fmt::Display for RejectReason {
|
|||
"actor byte budget exceeded: would use {} bytes against cap {}",
|
||||
attempted, cap
|
||||
),
|
||||
RejectReason::GlobalRewriteExhausted { cap } => {
|
||||
write!(f, "global rewrite pool full (cap {})", cap)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -126,19 +112,15 @@ pub struct WorkloadController {
|
|||
per_actor: DashMap<Arc<str>, Arc<ActorState>>,
|
||||
inflight_cap: u32,
|
||||
byte_cap: u64,
|
||||
global_rewrite: Arc<Semaphore>,
|
||||
global_rewrite_cap: u32,
|
||||
}
|
||||
|
||||
impl WorkloadController {
|
||||
/// Construct from explicit caps. Tests can override.
|
||||
pub fn new(inflight_cap: u32, byte_cap: u64, global_rewrite_cap: u32) -> Self {
|
||||
pub fn new(inflight_cap: u32, byte_cap: u64) -> Self {
|
||||
Self {
|
||||
per_actor: DashMap::new(),
|
||||
inflight_cap,
|
||||
byte_cap,
|
||||
global_rewrite: Arc::new(Semaphore::new(global_rewrite_cap as usize)),
|
||||
global_rewrite_cap,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -150,19 +132,13 @@ impl WorkloadController {
|
|||
DEFAULT_PER_ACTOR_INFLIGHT_MAX,
|
||||
);
|
||||
let byte_cap = parse_env_u64("OMNIGRAPH_PER_ACTOR_BYTES_MAX", DEFAULT_PER_ACTOR_BYTES_MAX);
|
||||
let global_rewrite_cap =
|
||||
parse_env_u32("OMNIGRAPH_GLOBAL_REWRITE_MAX", DEFAULT_GLOBAL_REWRITE_MAX);
|
||||
Self::new(inflight_cap, byte_cap, global_rewrite_cap)
|
||||
Self::new(inflight_cap, byte_cap)
|
||||
}
|
||||
|
||||
/// Construct with default caps. Suitable for tests / single-tenant
|
||||
/// deployments without explicit configuration.
|
||||
pub fn with_defaults() -> Self {
|
||||
Self::new(
|
||||
DEFAULT_PER_ACTOR_INFLIGHT_MAX,
|
||||
DEFAULT_PER_ACTOR_BYTES_MAX,
|
||||
DEFAULT_GLOBAL_REWRITE_MAX,
|
||||
)
|
||||
Self::new(DEFAULT_PER_ACTOR_INFLIGHT_MAX, DEFAULT_PER_ACTOR_BYTES_MAX)
|
||||
}
|
||||
|
||||
fn actor_state(&self, actor_id: &Arc<str>) -> Arc<ActorState> {
|
||||
|
|
@ -229,17 +205,6 @@ impl WorkloadController {
|
|||
est_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
/// Reserve a global rewrite slot (compaction, index build, etc.).
|
||||
/// Returned guard releases the slot when dropped.
|
||||
pub fn try_admit_rewrite(&self) -> Result<RewriteGuard, RejectReason> {
|
||||
match Arc::clone(&self.global_rewrite).try_acquire_owned() {
|
||||
Ok(permit) => Ok(RewriteGuard { _permit: permit }),
|
||||
Err(_) => Err(RejectReason::GlobalRewriteExhausted {
|
||||
cap: self.global_rewrite_cap,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop-on-completion guard for an admitted request. Dropping releases
|
||||
|
|
@ -260,12 +225,6 @@ impl Drop for AdmissionGuard {
|
|||
}
|
||||
}
|
||||
|
||||
/// Drop-on-completion guard for the global rewrite pool.
|
||||
#[derive(Debug)]
|
||||
pub struct RewriteGuard {
|
||||
_permit: OwnedSemaphorePermit,
|
||||
}
|
||||
|
||||
fn parse_env_u32(name: &str, default: u32) -> u32 {
|
||||
match std::env::var(name) {
|
||||
Ok(v) => v.parse::<u32>().unwrap_or_else(|err| {
|
||||
|
|
@ -304,7 +263,7 @@ mod tests {
|
|||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn try_admit_admits_under_cap() {
|
||||
let controller = WorkloadController::new(2, 1024, 1);
|
||||
let controller = WorkloadController::new(2, 1024);
|
||||
let actor: Arc<str> = "alice".into();
|
||||
let g1 = controller.try_admit(&actor, 100).expect("first admit");
|
||||
let _g2 = controller.try_admit(&actor, 100).expect("second admit");
|
||||
|
|
@ -321,7 +280,7 @@ mod tests {
|
|||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn byte_budget_caps_admission() {
|
||||
let controller = WorkloadController::new(16, 1000, 1);
|
||||
let controller = WorkloadController::new(16, 1000);
|
||||
let actor: Arc<str> = "alice".into();
|
||||
let _g1 = controller.try_admit(&actor, 600).expect("first admit");
|
||||
let err = controller
|
||||
|
|
@ -348,7 +307,7 @@ mod tests {
|
|||
// Each task holds its admission guard until released via a
|
||||
// oneshot channel; this forces real contention because guards
|
||||
// can't drop and free permits before all 32 calls have raced.
|
||||
let controller = Arc::new(WorkloadController::new(16, u64::MAX / 4, 1));
|
||||
let controller = Arc::new(WorkloadController::new(16, u64::MAX / 4));
|
||||
let actor: Arc<str> = "racer".into();
|
||||
|
||||
let (release_tx, _) = tokio::sync::broadcast::channel::<()>(1);
|
||||
|
|
@ -392,7 +351,7 @@ mod tests {
|
|||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn per_actor_caps_independent() {
|
||||
let controller = WorkloadController::new(1, 1024, 1);
|
||||
let controller = WorkloadController::new(1, 1024);
|
||||
let alice: Arc<str> = "alice".into();
|
||||
let bob: Arc<str> = "bob".into();
|
||||
let _ga = controller.try_admit(&alice, 100).expect("alice ok");
|
||||
|
|
@ -401,22 +360,4 @@ mod tests {
|
|||
assert!(matches!(err, RejectReason::InFlightCountExceeded { .. }));
|
||||
let _gb = controller.try_admit(&bob, 100).expect("bob ok");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn global_rewrite_cap_enforced() {
|
||||
let controller = WorkloadController::new(16, u64::MAX / 4, 2);
|
||||
let g1 = controller.try_admit_rewrite().expect("first rewrite");
|
||||
let _g2 = controller.try_admit_rewrite().expect("second rewrite");
|
||||
let err = controller
|
||||
.try_admit_rewrite()
|
||||
.expect_err("third should reject");
|
||||
assert!(matches!(
|
||||
err,
|
||||
RejectReason::GlobalRewriteExhausted { cap: 2 }
|
||||
));
|
||||
drop(g1);
|
||||
let _g3 = controller
|
||||
.try_admit_rewrite()
|
||||
.expect("rewrite after drop");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue