server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F)

The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>`
that has serialized every mutating request across all actors. Disjoint
`(table, branch)` writes from different actors now run concurrently,
guarded only by the engine's per-(table, branch) write queue (PR 1b)
and per-actor admission control (PR 2 Step E).

AppState changes:
- `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>`
- New field: `workload: Arc<workload::WorkloadController>` initialized
  from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`,
  `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`,
  `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`).
- `tokio::sync::RwLock` import dropped.

Handler updates (16 sites):
- All `Arc::clone(&state.db).read_owned().await` and `write_owned()`
  calls replaced with `let db = &state.engine`. Engine APIs are now
  `&self` (Step C) so this works directly.
- `/export` clones `Arc<Omnigraph>` once and moves into the spawned
  task instead of acquiring a long-held read lock.
- `/change` handler additionally wires
  `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST
  so denied requests don't consume admission slots; admission runs
  SECOND before the engine call. `est_bytes` uses the request body
  size as a coarse proxy.

API surface additions (`api::ErrorCode`):
- `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect
  `Retry-After`)
- `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted)

`ApiError` constructors `too_many_requests` / `service_unavailable` and
`from_workload_reject` (maps `RejectReason` variants to HTTP status).

Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`,
`/schema/apply`) currently flow through the Arc<Omnigraph> path
without admission gates; wiring those is mechanical and lands as a
follow-up. The /change hot path covers the bulk of MR-686's load
profile.

OpenAPI regenerated to include the new ErrorCode variants.
102 lib + 39 server tests + 5 workload tests pass. The regression
sentinel `change_conflict_returns_manifest_conflict_409` continues
to pass (revalidation perf opt + per-table queue + publisher CAS
preserve manifest_conflict semantics under concurrent writers).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-07 17:08:26 +02:00
parent 17a1665002
commit c15962e6b0
No known key found for this signature in database
4 changed files with 97 additions and 20 deletions

1
Cargo.lock generated
View file

@ -4686,6 +4686,7 @@ dependencies = [
"cedar-policy",
"clap",
"color-eyre",
"dashmap",
"futures",
"lance-index",
"omnigraph-compiler",

View file

@ -339,6 +339,12 @@ pub enum ErrorCode {
BadRequest,
NotFound,
Conflict,
/// 429 Too Many Requests — per-actor admission cap exceeded.
/// Clients should respect the `Retry-After` header.
TooManyRequests,
/// 503 Service Unavailable — global rewrite pool exhausted
/// (compaction, index build). Clients should retry later.
ServiceUnavailable,
Internal,
}

View file

@ -48,7 +48,7 @@ use serde_json::Value;
use sha2::{Digest, Sha256};
use subtle::ConstantTimeEq;
use tokio::net::TcpListener;
use tokio::sync::{RwLock, mpsc};
use tokio::sync::mpsc;
use tower_http::trace::TraceLayer;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
@ -119,7 +119,14 @@ pub struct ServerConfig {
#[derive(Clone)]
pub struct AppState {
uri: String,
db: Arc<RwLock<Omnigraph>>,
/// PR 2 (MR-686): the engine is now `Arc<Omnigraph>` — no global
/// write lock. Concurrent handlers call `&self` engine APIs
/// directly. Per-(table, branch) write queues inside the engine
/// serialize same-key writers; per-actor admission control on
/// `workload` isolates noisy actors.
engine: Arc<Omnigraph>,
/// Per-actor admission control. See `workload::WorkloadController`.
workload: Arc<workload::WorkloadController>,
bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
policy_engine: Option<Arc<PolicyEngine>>,
}
@ -192,7 +199,8 @@ impl AppState {
.collect();
Self {
uri,
db: Arc::new(RwLock::new(db)),
engine: Arc::new(db),
workload: Arc::new(workload::WorkloadController::from_env()),
bearer_tokens: Arc::from(bearer_tokens),
policy_engine: policy_engine.map(Arc::new),
}
@ -332,6 +340,46 @@ impl ApiError {
}
}
/// HTTP 429 Too Many Requests — actor exceeded their per-actor
/// admission cap (count or byte budget). Clients should respect the
/// `Retry-After` header. Mapped from `RejectReason::InFlightCountExceeded`
/// and `RejectReason::ByteBudgetExceeded`.
pub fn too_many_requests(message: impl Into<String>) -> Self {
Self {
status: StatusCode::TOO_MANY_REQUESTS,
code: ErrorCode::TooManyRequests,
message: message.into(),
merge_conflicts: Vec::new(),
manifest_conflict: None,
}
}
/// HTTP 503 Service Unavailable — global rewrite pool exhausted.
/// Mapped from `RejectReason::GlobalRewriteExhausted`.
pub fn service_unavailable(message: impl Into<String>) -> Self {
Self {
status: StatusCode::SERVICE_UNAVAILABLE,
code: ErrorCode::ServiceUnavailable,
message: message.into(),
merge_conflicts: Vec::new(),
manifest_conflict: None,
}
}
/// Convert a `WorkloadController` rejection into the matching
/// `ApiError` variant.
pub fn from_workload_reject(reject: workload::RejectReason) -> Self {
match reject {
workload::RejectReason::InFlightCountExceeded { .. }
| workload::RejectReason::ByteBudgetExceeded { .. } => {
Self::too_many_requests(reject.to_string())
}
workload::RejectReason::GlobalRewriteExhausted { .. } => {
Self::service_unavailable(reject.to_string())
}
}
}
fn merge_conflict(conflicts: Vec<api::MergeConflictOutput>) -> Self {
Self {
status: StatusCode::CONFLICT,
@ -675,7 +723,7 @@ async fn server_snapshot(
},
)?;
let snapshot = {
let db = Arc::clone(&state.db).read_owned().await;
let db = &state.engine;
db.snapshot_of(ReadTarget::branch(branch.as_str()))
.await
.map_err(ApiError::from_omni)?
@ -719,7 +767,7 @@ async fn server_read(
let policy_branch = match &target {
ReadTarget::Branch(branch) => Some(branch.clone()),
ReadTarget::Snapshot(_) if state.policy_engine().is_some() && actor.is_some() => {
let db = Arc::clone(&state.db).read_owned().await;
let db = &state.engine;
db.resolved_branch_of(target.clone())
.await
.map(|branch| branch.or_else(|| Some("main".to_string())))
@ -747,7 +795,7 @@ async fn server_read(
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let result = {
let db = Arc::clone(&state.db).read_owned().await;
let db = &state.engine;
db.query(
target.clone(),
&request.query_source,
@ -799,15 +847,15 @@ async fn server_export(
target_branch: None,
},
)?;
let db = Arc::clone(&state.db);
let engine = Arc::clone(&state.engine);
let type_names = request.type_names.clone();
let table_keys = request.table_keys.clone();
let (tx, rx) = mpsc::unbounded_channel::<std::result::Result<Bytes, io::Error>>();
tokio::spawn(async move {
let result = {
let db = db.read().await;
let mut writer = ExportStreamWriter { sender: tx.clone() };
db.export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer)
engine
.export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer)
.await
};
if let Err(err) = result {
@ -852,6 +900,10 @@ async fn server_change(
Json(request): Json<ChangeRequest>,
) -> std::result::Result<Json<ChangeOutput>, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
authorize_request(
&state,
@ -863,6 +915,22 @@ async fn server_change(
target_branch: None,
},
)?;
// Per-actor admission: bound concurrent in-flight mutations and
// estimated bytes per actor. Cedar runs FIRST so denied requests
// don't consume admission slots. Estimate uses the request body
// size as a coarse proxy; engine memory pressure can run higher
// (factorize, vector index) but the global rewrite gate covers
// the heavy paths.
let est_bytes = request.query_source.len() as u64
+ request
.params
.as_ref()
.map(|p| p.to_string().len() as u64)
.unwrap_or(0);
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
let (selected_name, query_params) =
select_named_query(&request.query_source, request.query_name.as_deref())
.map_err(|err| ApiError::bad_request(err.to_string()))?;
@ -870,7 +938,7 @@ async fn server_change(
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let result = {
let mut db = Arc::clone(&state.db).write_owned().await;
let db = &state.engine;
db.mutate_as(
&branch,
&request.query_source,
@ -925,7 +993,7 @@ async fn server_schema_get(
},
)?;
let schema_source = {
let db = Arc::clone(&state.db).read_owned().await;
let db = &state.engine;
db.schema_source().to_string()
};
Ok(Json(SchemaOutput { schema_source }))
@ -968,7 +1036,7 @@ async fn server_schema_apply(
},
)?;
let result = {
let mut db = Arc::clone(&state.db).write_owned().await;
let db = &state.engine;
db.apply_schema(&request.schema_source)
.await
.map_err(ApiError::from_omni)?
@ -1008,7 +1076,7 @@ async fn server_ingest(
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
let branch_exists = {
let db = Arc::clone(&state.db).read_owned().await;
let db = &state.engine;
db.branch_list()
.await
.map_err(ApiError::from_omni)?
@ -1040,7 +1108,7 @@ async fn server_ingest(
)?;
let result = {
let mut db = Arc::clone(&state.db).write_owned().await;
let db = &state.engine;
db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
.await
.map_err(ApiError::from_omni)?
@ -1086,7 +1154,7 @@ async fn server_branch_list(
},
)?;
let mut branches = {
let db = Arc::clone(&state.db).read_owned().await;
let db = &state.engine;
db.branch_list().await.map_err(ApiError::from_omni)?
};
branches.sort();
@ -1133,7 +1201,7 @@ async fn server_branch_create(
},
)?;
{
let mut db = Arc::clone(&state.db).write_owned().await;
let db = &state.engine;
db.branch_create_from(ReadTarget::branch(&from), &request.name)
.await
.map_err(ApiError::from_omni)?;
@ -1184,7 +1252,7 @@ async fn server_branch_delete(
},
)?;
{
let mut db = Arc::clone(&state.db).write_owned().await;
let db = &state.engine;
db.branch_delete(&branch)
.await
.map_err(ApiError::from_omni)?;
@ -1235,7 +1303,7 @@ async fn server_branch_merge(
},
)?;
let outcome = {
let mut db = Arc::clone(&state.db).write_owned().await;
let db = &state.engine;
db.branch_merge_as(&request.source, &target, actor_id)
.await
.map_err(ApiError::from_omni)?
@ -1284,7 +1352,7 @@ async fn server_commit_list(
},
)?;
let commits = {
let db = Arc::clone(&state.db).read_owned().await;
let db = &state.engine;
db.list_commits(query.branch.as_deref())
.await
.map_err(ApiError::from_omni)?
@ -1333,7 +1401,7 @@ async fn server_commit_show(
},
)?;
let commit = {
let db = Arc::clone(&state.db).read_owned().await;
let db = &state.engine;
db.get_commit(&commit_id)
.await
.map_err(ApiError::from_omni)?

View file

@ -1140,6 +1140,8 @@
"bad_request",
"not_found",
"conflict",
"too_many_requests",
"service_unavailable",
"internal"
]
},