diff --git a/Cargo.lock b/Cargo.lock index 0b1a6ee..9af6392 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4686,6 +4686,7 @@ dependencies = [ "cedar-policy", "clap", "color-eyre", + "dashmap", "futures", "lance-index", "omnigraph-compiler", diff --git a/crates/omnigraph-server/src/api.rs b/crates/omnigraph-server/src/api.rs index 9dd45ee..1f01651 100644 --- a/crates/omnigraph-server/src/api.rs +++ b/crates/omnigraph-server/src/api.rs @@ -339,6 +339,12 @@ pub enum ErrorCode { BadRequest, NotFound, Conflict, + /// 429 Too Many Requests — per-actor admission cap exceeded. + /// Clients should respect the `Retry-After` header. + TooManyRequests, + /// 503 Service Unavailable — global rewrite pool exhausted + /// (compaction, index build). Clients should retry later. + ServiceUnavailable, Internal, } diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index ed44a13..b9ce418 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -48,7 +48,7 @@ use serde_json::Value; use sha2::{Digest, Sha256}; use subtle::ConstantTimeEq; use tokio::net::TcpListener; -use tokio::sync::{RwLock, mpsc}; +use tokio::sync::mpsc; use tower_http::trace::TraceLayer; use tracing::{error, info}; use tracing_subscriber::EnvFilter; @@ -119,7 +119,14 @@ pub struct ServerConfig { #[derive(Clone)] pub struct AppState { uri: String, - db: Arc>, + /// PR 2 (MR-686): the engine is now `Arc` — no global + /// write lock. Concurrent handlers call `&self` engine APIs + /// directly. Per-(table, branch) write queues inside the engine + /// serialize same-key writers; per-actor admission control on + /// `workload` isolates noisy actors. + engine: Arc, + /// Per-actor admission control. See `workload::WorkloadController`. + workload: Arc, bearer_tokens: Arc<[(BearerTokenHash, Arc)]>, policy_engine: Option>, } @@ -192,7 +199,8 @@ impl AppState { .collect(); Self { uri, - db: Arc::new(RwLock::new(db)), + engine: Arc::new(db), + workload: Arc::new(workload::WorkloadController::from_env()), bearer_tokens: Arc::from(bearer_tokens), policy_engine: policy_engine.map(Arc::new), } @@ -332,6 +340,46 @@ impl ApiError { } } + /// HTTP 429 Too Many Requests — actor exceeded their per-actor + /// admission cap (count or byte budget). Clients should respect the + /// `Retry-After` header. Mapped from `RejectReason::InFlightCountExceeded` + /// and `RejectReason::ByteBudgetExceeded`. + pub fn too_many_requests(message: impl Into) -> Self { + Self { + status: StatusCode::TOO_MANY_REQUESTS, + code: ErrorCode::TooManyRequests, + message: message.into(), + merge_conflicts: Vec::new(), + manifest_conflict: None, + } + } + + /// HTTP 503 Service Unavailable — global rewrite pool exhausted. + /// Mapped from `RejectReason::GlobalRewriteExhausted`. + pub fn service_unavailable(message: impl Into) -> Self { + Self { + status: StatusCode::SERVICE_UNAVAILABLE, + code: ErrorCode::ServiceUnavailable, + message: message.into(), + merge_conflicts: Vec::new(), + manifest_conflict: None, + } + } + + /// Convert a `WorkloadController` rejection into the matching + /// `ApiError` variant. + pub fn from_workload_reject(reject: workload::RejectReason) -> Self { + match reject { + workload::RejectReason::InFlightCountExceeded { .. } + | workload::RejectReason::ByteBudgetExceeded { .. } => { + Self::too_many_requests(reject.to_string()) + } + workload::RejectReason::GlobalRewriteExhausted { .. } => { + Self::service_unavailable(reject.to_string()) + } + } + } + fn merge_conflict(conflicts: Vec) -> Self { Self { status: StatusCode::CONFLICT, @@ -675,7 +723,7 @@ async fn server_snapshot( }, )?; let snapshot = { - let db = Arc::clone(&state.db).read_owned().await; + let db = &state.engine; db.snapshot_of(ReadTarget::branch(branch.as_str())) .await .map_err(ApiError::from_omni)? @@ -719,7 +767,7 @@ async fn server_read( let policy_branch = match &target { ReadTarget::Branch(branch) => Some(branch.clone()), ReadTarget::Snapshot(_) if state.policy_engine().is_some() && actor.is_some() => { - let db = Arc::clone(&state.db).read_owned().await; + let db = &state.engine; db.resolved_branch_of(target.clone()) .await .map(|branch| branch.or_else(|| Some("main".to_string()))) @@ -747,7 +795,7 @@ async fn server_read( .map_err(|err| ApiError::bad_request(err.to_string()))?; let result = { - let db = Arc::clone(&state.db).read_owned().await; + let db = &state.engine; db.query( target.clone(), &request.query_source, @@ -799,15 +847,15 @@ async fn server_export( target_branch: None, }, )?; - let db = Arc::clone(&state.db); + let engine = Arc::clone(&state.engine); let type_names = request.type_names.clone(); let table_keys = request.table_keys.clone(); let (tx, rx) = mpsc::unbounded_channel::>(); tokio::spawn(async move { let result = { - let db = db.read().await; let mut writer = ExportStreamWriter { sender: tx.clone() }; - db.export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer) + engine + .export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer) .await }; if let Err(err) = result { @@ -852,6 +900,10 @@ async fn server_change( Json(request): Json, ) -> std::result::Result, ApiError> { let branch = request.branch.unwrap_or_else(|| "main".to_string()); + let actor_arc = actor + .as_ref() + .map(|Extension(actor)| Arc::clone(&actor.0)) + .unwrap_or_else(|| Arc::::from("anonymous")); let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); authorize_request( &state, @@ -863,6 +915,22 @@ async fn server_change( target_branch: None, }, )?; + // Per-actor admission: bound concurrent in-flight mutations and + // estimated bytes per actor. Cedar runs FIRST so denied requests + // don't consume admission slots. Estimate uses the request body + // size as a coarse proxy; engine memory pressure can run higher + // (factorize, vector index) but the global rewrite gate covers + // the heavy paths. + let est_bytes = request.query_source.len() as u64 + + request + .params + .as_ref() + .map(|p| p.to_string().len() as u64) + .unwrap_or(0); + let _admission = state + .workload + .try_admit(&actor_arc, est_bytes) + .map_err(ApiError::from_workload_reject)?; let (selected_name, query_params) = select_named_query(&request.query_source, request.query_name.as_deref()) .map_err(|err| ApiError::bad_request(err.to_string()))?; @@ -870,7 +938,7 @@ async fn server_change( .map_err(|err| ApiError::bad_request(err.to_string()))?; let result = { - let mut db = Arc::clone(&state.db).write_owned().await; + let db = &state.engine; db.mutate_as( &branch, &request.query_source, @@ -925,7 +993,7 @@ async fn server_schema_get( }, )?; let schema_source = { - let db = Arc::clone(&state.db).read_owned().await; + let db = &state.engine; db.schema_source().to_string() }; Ok(Json(SchemaOutput { schema_source })) @@ -968,7 +1036,7 @@ async fn server_schema_apply( }, )?; let result = { - let mut db = Arc::clone(&state.db).write_owned().await; + let db = &state.engine; db.apply_schema(&request.schema_source) .await .map_err(ApiError::from_omni)? @@ -1008,7 +1076,7 @@ async fn server_ingest( let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); let branch_exists = { - let db = Arc::clone(&state.db).read_owned().await; + let db = &state.engine; db.branch_list() .await .map_err(ApiError::from_omni)? @@ -1040,7 +1108,7 @@ async fn server_ingest( )?; let result = { - let mut db = Arc::clone(&state.db).write_owned().await; + let db = &state.engine; db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id) .await .map_err(ApiError::from_omni)? @@ -1086,7 +1154,7 @@ async fn server_branch_list( }, )?; let mut branches = { - let db = Arc::clone(&state.db).read_owned().await; + let db = &state.engine; db.branch_list().await.map_err(ApiError::from_omni)? }; branches.sort(); @@ -1133,7 +1201,7 @@ async fn server_branch_create( }, )?; { - let mut db = Arc::clone(&state.db).write_owned().await; + let db = &state.engine; db.branch_create_from(ReadTarget::branch(&from), &request.name) .await .map_err(ApiError::from_omni)?; @@ -1184,7 +1252,7 @@ async fn server_branch_delete( }, )?; { - let mut db = Arc::clone(&state.db).write_owned().await; + let db = &state.engine; db.branch_delete(&branch) .await .map_err(ApiError::from_omni)?; @@ -1235,7 +1303,7 @@ async fn server_branch_merge( }, )?; let outcome = { - let mut db = Arc::clone(&state.db).write_owned().await; + let db = &state.engine; db.branch_merge_as(&request.source, &target, actor_id) .await .map_err(ApiError::from_omni)? @@ -1284,7 +1352,7 @@ async fn server_commit_list( }, )?; let commits = { - let db = Arc::clone(&state.db).read_owned().await; + let db = &state.engine; db.list_commits(query.branch.as_deref()) .await .map_err(ApiError::from_omni)? @@ -1333,7 +1401,7 @@ async fn server_commit_show( }, )?; let commit = { - let db = Arc::clone(&state.db).read_owned().await; + let db = &state.engine; db.get_commit(&commit_id) .await .map_err(ApiError::from_omni)? diff --git a/openapi.json b/openapi.json index 4bacfe6..a7f0cad 100644 --- a/openapi.json +++ b/openapi.json @@ -1140,6 +1140,8 @@ "bad_request", "not_found", "conflict", + "too_many_requests", + "service_unavailable", "internal" ] },