diff --git a/crates/omnigraph-server/examples/bench_actor_isolation.rs b/crates/omnigraph-server/examples/bench_actor_isolation.rs index c4ffd8d..1eca032 100644 --- a/crates/omnigraph-server/examples/bench_actor_isolation.rs +++ b/crates/omnigraph-server/examples/bench_actor_isolation.rs @@ -76,10 +76,6 @@ struct Args { /// doesn't bottleneck the count gate during normal bench runs. #[arg(long, default_value_t = 1_073_741_824)] byte_cap: u64, - /// Global rewrite-pool cap. Bench is non-rewriting so default 4 - /// matches production. - #[arg(long, default_value_t = 4)] - global_rewrite_cap: u32, /// Output file for the JSON results. Stdout always gets a copy. #[arg(long)] output: Option, @@ -282,7 +278,7 @@ async fn main() { // `unsafe { std::env::set_var(...) }` antipattern that violates // `setenv`'s thread-safety precondition once the multi-thread tokio // runtime is up. - let workload = WorkloadController::new(args.inflight_cap, args.byte_cap, args.global_rewrite_cap); + let workload = WorkloadController::new(args.inflight_cap, args.byte_cap); let state = AppState::new_with_workload( repo.to_string_lossy().to_string(), db, diff --git a/crates/omnigraph-server/src/api.rs b/crates/omnigraph-server/src/api.rs index 1f01651..89534f5 100644 --- a/crates/omnigraph-server/src/api.rs +++ b/crates/omnigraph-server/src/api.rs @@ -342,9 +342,6 @@ pub enum ErrorCode { /// 429 Too Many Requests — per-actor admission cap exceeded. /// Clients should respect the `Retry-After` header. TooManyRequests, - /// 503 Service Unavailable — global rewrite pool exhausted - /// (compaction, index build). Clients should retry later. - ServiceUnavailable, Internal, } diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 2c1e241..bb4601f 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -377,18 +377,6 @@ impl ApiError { } } - /// HTTP 503 Service Unavailable — global rewrite pool exhausted. - /// Mapped from `RejectReason::GlobalRewriteExhausted`. - pub fn service_unavailable(message: impl Into) -> Self { - Self { - status: StatusCode::SERVICE_UNAVAILABLE, - code: ErrorCode::ServiceUnavailable, - message: message.into(), - merge_conflicts: Vec::new(), - manifest_conflict: None, - } - } - /// Convert a `WorkloadController` rejection into the matching /// `ApiError` variant. pub fn from_workload_reject(reject: workload::RejectReason) -> Self { @@ -397,9 +385,6 @@ impl ApiError { | workload::RejectReason::ByteBudgetExceeded { .. } => { Self::too_many_requests(reject.to_string()) } - workload::RejectReason::GlobalRewriteExhausted { .. } => { - Self::service_unavailable(reject.to_string()) - } } } @@ -490,21 +475,13 @@ fn summarize_merge_conflicts(conflicts: &[api::MergeConflictOutput]) -> String { format!("merge conflicts: {}{}", preview.join("; "), suffix) } -/// Constant `Retry-After` value (seconds) emitted on 429 / 503 responses. -/// Matches the doc claim at `ApiError::too_many_requests` and -/// `ApiError::service_unavailable`. Plumbing per-RejectReason durations -/// through is a follow-up; the admission rejects we surface today are -/// uniformly bounded by the in-flight cap recovery time, which is -/// dominated by request handler duration rather than calendar wait. +/// Constant `Retry-After` value (seconds) emitted on 429 responses. const RETRY_AFTER_SECONDS: &str = "60"; impl IntoResponse for ApiError { fn into_response(self) -> Response { let mut headers = axum::http::HeaderMap::new(); - if matches!( - self.code, - ErrorCode::TooManyRequests | ErrorCode::ServiceUnavailable - ) { + if matches!(self.code, ErrorCode::TooManyRequests) { headers.insert( axum::http::header::RETRY_AFTER, axum::http::HeaderValue::from_static(RETRY_AFTER_SECONDS), diff --git a/crates/omnigraph-server/src/workload.rs b/crates/omnigraph-server/src/workload.rs index 0e83c0d..efc7068 100644 --- a/crates/omnigraph-server/src/workload.rs +++ b/crates/omnigraph-server/src/workload.rs @@ -19,11 +19,6 @@ //! against `byte_cap` is race-free via decrement-on-rejection. The //! server maps an over-budget result to HTTP 429 as well. //! -//! - **Global rewrite semaphore**: bounds the number of concurrent -//! compaction / index-build / similar O(table-size) rewrite paths. -//! Default: 4. Exhaustion maps to HTTP 503 because the limit is a -//! capacity-planning safety net rather than a per-actor abuse guard. -//! //! Counts are governed by the semaphore (race-free `try_acquire_owned()` //! enforces the cap atomically); bytes use `fetch_add` + decrement-on- //! rejection. Both checks are atomic compare-and-act, never @@ -54,10 +49,6 @@ pub const DEFAULT_PER_ACTOR_INFLIGHT_MAX: u32 = 16; /// `OMNIGRAPH_PER_ACTOR_BYTES_MAX`. pub const DEFAULT_PER_ACTOR_BYTES_MAX: u64 = 4 * 1024 * 1024 * 1024; -/// Default global rewrite-pool capacity (compaction, index builds). -/// Override via `OMNIGRAPH_GLOBAL_REWRITE_MAX`. -pub const DEFAULT_GLOBAL_REWRITE_MAX: u32 = 4; - /// Why a `try_admit` call returned `Err`. The server maps each variant /// to a specific HTTP response code; see `WorkloadController` docs. #[derive(Debug, Clone, PartialEq, Eq)] @@ -66,8 +57,6 @@ pub enum RejectReason { InFlightCountExceeded { cap: u32 }, /// Actor exceeded the per-actor in-flight byte budget. HTTP 429. ByteBudgetExceeded { cap: u64, attempted: u64 }, - /// Global rewrite pool is full. HTTP 503. - GlobalRewriteExhausted { cap: u32 }, } impl std::fmt::Display for RejectReason { @@ -81,9 +70,6 @@ impl std::fmt::Display for RejectReason { "actor byte budget exceeded: would use {} bytes against cap {}", attempted, cap ), - RejectReason::GlobalRewriteExhausted { cap } => { - write!(f, "global rewrite pool full (cap {})", cap) - } } } } @@ -126,19 +112,15 @@ pub struct WorkloadController { per_actor: DashMap, Arc>, inflight_cap: u32, byte_cap: u64, - global_rewrite: Arc, - global_rewrite_cap: u32, } impl WorkloadController { /// Construct from explicit caps. Tests can override. - pub fn new(inflight_cap: u32, byte_cap: u64, global_rewrite_cap: u32) -> Self { + pub fn new(inflight_cap: u32, byte_cap: u64) -> Self { Self { per_actor: DashMap::new(), inflight_cap, byte_cap, - global_rewrite: Arc::new(Semaphore::new(global_rewrite_cap as usize)), - global_rewrite_cap, } } @@ -150,19 +132,13 @@ impl WorkloadController { DEFAULT_PER_ACTOR_INFLIGHT_MAX, ); let byte_cap = parse_env_u64("OMNIGRAPH_PER_ACTOR_BYTES_MAX", DEFAULT_PER_ACTOR_BYTES_MAX); - let global_rewrite_cap = - parse_env_u32("OMNIGRAPH_GLOBAL_REWRITE_MAX", DEFAULT_GLOBAL_REWRITE_MAX); - Self::new(inflight_cap, byte_cap, global_rewrite_cap) + Self::new(inflight_cap, byte_cap) } /// Construct with default caps. Suitable for tests / single-tenant /// deployments without explicit configuration. pub fn with_defaults() -> Self { - Self::new( - DEFAULT_PER_ACTOR_INFLIGHT_MAX, - DEFAULT_PER_ACTOR_BYTES_MAX, - DEFAULT_GLOBAL_REWRITE_MAX, - ) + Self::new(DEFAULT_PER_ACTOR_INFLIGHT_MAX, DEFAULT_PER_ACTOR_BYTES_MAX) } fn actor_state(&self, actor_id: &Arc) -> Arc { @@ -229,17 +205,6 @@ impl WorkloadController { est_bytes, }) } - - /// Reserve a global rewrite slot (compaction, index build, etc.). - /// Returned guard releases the slot when dropped. - pub fn try_admit_rewrite(&self) -> Result { - match Arc::clone(&self.global_rewrite).try_acquire_owned() { - Ok(permit) => Ok(RewriteGuard { _permit: permit }), - Err(_) => Err(RejectReason::GlobalRewriteExhausted { - cap: self.global_rewrite_cap, - }), - } - } } /// Drop-on-completion guard for an admitted request. Dropping releases @@ -260,12 +225,6 @@ impl Drop for AdmissionGuard { } } -/// Drop-on-completion guard for the global rewrite pool. -#[derive(Debug)] -pub struct RewriteGuard { - _permit: OwnedSemaphorePermit, -} - fn parse_env_u32(name: &str, default: u32) -> u32 { match std::env::var(name) { Ok(v) => v.parse::().unwrap_or_else(|err| { @@ -304,7 +263,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn try_admit_admits_under_cap() { - let controller = WorkloadController::new(2, 1024, 1); + let controller = WorkloadController::new(2, 1024); let actor: Arc = "alice".into(); let g1 = controller.try_admit(&actor, 100).expect("first admit"); let _g2 = controller.try_admit(&actor, 100).expect("second admit"); @@ -321,7 +280,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn byte_budget_caps_admission() { - let controller = WorkloadController::new(16, 1000, 1); + let controller = WorkloadController::new(16, 1000); let actor: Arc = "alice".into(); let _g1 = controller.try_admit(&actor, 600).expect("first admit"); let err = controller @@ -348,7 +307,7 @@ mod tests { // Each task holds its admission guard until released via a // oneshot channel; this forces real contention because guards // can't drop and free permits before all 32 calls have raced. - let controller = Arc::new(WorkloadController::new(16, u64::MAX / 4, 1)); + let controller = Arc::new(WorkloadController::new(16, u64::MAX / 4)); let actor: Arc = "racer".into(); let (release_tx, _) = tokio::sync::broadcast::channel::<()>(1); @@ -392,7 +351,7 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn per_actor_caps_independent() { - let controller = WorkloadController::new(1, 1024, 1); + let controller = WorkloadController::new(1, 1024); let alice: Arc = "alice".into(); let bob: Arc = "bob".into(); let _ga = controller.try_admit(&alice, 100).expect("alice ok"); @@ -401,22 +360,4 @@ mod tests { assert!(matches!(err, RejectReason::InFlightCountExceeded { .. })); let _gb = controller.try_admit(&bob, 100).expect("bob ok"); } - - #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn global_rewrite_cap_enforced() { - let controller = WorkloadController::new(16, u64::MAX / 4, 2); - let g1 = controller.try_admit_rewrite().expect("first rewrite"); - let _g2 = controller.try_admit_rewrite().expect("second rewrite"); - let err = controller - .try_admit_rewrite() - .expect_err("third should reject"); - assert!(matches!( - err, - RejectReason::GlobalRewriteExhausted { cap: 2 } - )); - drop(g1); - let _g3 = controller - .try_admit_rewrite() - .expect("rewrite after drop"); - } } diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 5f8ca31..03f4aa7 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -3360,7 +3360,6 @@ async fn ingest_per_actor_admission_cap_returns_429() { let workload = omnigraph_server::workload::WorkloadController::new( 1, // per-actor in-flight cap (the fixture under test) 1_000_000_000, // per-actor byte budget — large so it never bottlenecks - 4, // global rewrite cap (default-equivalent) ); let state = AppState::new_with_workload( repo.to_string_lossy().to_string(), diff --git a/docs/server.md b/docs/server.md index ba2130e..bfac282 100644 --- a/docs/server.md +++ b/docs/server.md @@ -28,7 +28,7 @@ Only `/export` streams (`application/x-ndjson`, MPSC channel + `Body::from_strea ## Error model -Uniform `ErrorOutput { error, code?, merge_conflicts[], manifest_conflict? }` with `code ∈ unauthorized | forbidden | bad_request | not_found | conflict | too_many_requests | service_unavailable | internal`. Merge conflicts attach structured `MergeConflictOutput { table_key, row_id?, kind, message }`. +Uniform `ErrorOutput { error, code?, merge_conflicts[], manifest_conflict? }` with `code ∈ unauthorized | forbidden | bad_request | not_found | conflict | too_many_requests | internal`. Merge conflicts attach structured `MergeConflictOutput { table_key, row_id?, kind, message }`. `manifest_conflict` is set on **publisher CAS rejections** (HTTP 409): the caller's pre-write view of one table's manifest version was stale. @@ -37,7 +37,7 @@ which table to refresh and retry. This is the conflict shape produced by concurrent `/change` or `/ingest` calls landing the same `(table, branch)` race (MR-771 / MR-766). -HTTP status codes used: 200, 400, 401, 403, 404, 409, 429, 500, 503. +HTTP status codes used: 200, 400, 401, 403, 404, 409, 429, 500. ## Per-actor admission control (MR-686) @@ -52,18 +52,12 @@ churn, network), the server gates mutating handlers through a |---|---|---| | `OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX` | 16 | Concurrent in-flight mutations per actor | | `OMNIGRAPH_PER_ACTOR_BYTES_MAX` | 4 GiB | In-flight estimated bytes per actor | -| `OMNIGRAPH_GLOBAL_REWRITE_MAX` | 4 | Concurrent compaction / index-build slots | When an actor exceeds its in-flight count or byte budget, the server returns **HTTP 429 Too Many Requests** with `code: too_many_requests` and a `Retry-After` header (seconds). The actor should back off; other actors are unaffected. -When the global rewrite pool is exhausted (compaction, index build), -the server returns **HTTP 503 Service Unavailable** with -`code: service_unavailable`. Clients can retry; the rewrite pool -empties as in-flight rewrites complete. - Cedar policy authorization runs **before** admission accounting so denied requests don't consume admission slots. diff --git a/openapi.json b/openapi.json index 0934925..5e7f358 100644 --- a/openapi.json +++ b/openapi.json @@ -1201,7 +1201,6 @@ "not_found", "conflict", "too_many_requests", - "service_unavailable", "internal" ] },