omnigraph/crates/omnigraph-server/src/lib.rs

1806 lines
60 KiB
Rust
Raw Normal View History

2026-04-10 20:49:41 +03:00
pub mod api;
pub mod auth;
2026-04-10 20:49:41 +03:00
pub mod config;
pub mod policy;
server: add WorkloadController for per-actor admission (PR 2 Step E) PR 2 removes the global server `RwLock<Omnigraph>` (Step F). Without admission control, one heavy actor would exhaust shared capacity (Lance I/O threads, manifest churn, network) and starve other actors. The WorkloadController bounds per-actor in-flight count + bytes and provides a global rewrite-pool semaphore for compaction / index builds. New file: `crates/omnigraph-server/src/workload.rs` (~250 LOC + 5 tests). API: - `WorkloadController::new(inflight_cap, byte_cap, rewrite_cap)` / `from_env()` / `with_defaults()`. - `try_admit(actor_id, est_bytes) -> Result<AdmissionGuard, RejectReason>` acquires both an in-flight count permit and adds est_bytes to the per-actor counter atomically; returns RejectReason on either gate. - `try_admit_rewrite() -> Result<RewriteGuard, RejectReason>` for the global rewrite pool (Step F maps RewriteGuard exhaustion to HTTP 503). - `RejectReason::{InFlightCountExceeded, ByteBudgetExceeded, GlobalRewriteExhausted}`. Race-free admission via `tokio::sync::Semaphore::try_acquire_owned()` for the count gate (master plan Finding 6: independent atomic load+check+add lets two callers both pass a cap-N check; the Semaphore gate is atomic). Bytes use `fetch_add` + decrement-on-rejection so the cap is never exceeded even on rollback. Defaults (override via env): - OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16 - OMNIGRAPH_PER_ACTOR_BYTES_MAX=4_294_967_296 (4 GiB) - OMNIGRAPH_GLOBAL_REWRITE_MAX=4 Tests cover under-cap admission, byte-budget rollback, per-actor isolation, global rewrite cap, and the load-bearing 32-concurrent-vs- cap-16 race test (forces real contention via a broadcast release channel so guards can't recycle permits task-by-task; pins the master plan's race-free invariant). Adds workspace dep `dashmap = "6"` for per-actor state. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 16:59:45 +02:00
pub mod workload;
2026-04-10 20:49:41 +03:00
use std::collections::{HashMap, HashSet};
use std::fs;
use std::io;
use std::io::Write;
2026-04-10 20:49:41 +03:00
use std::path::PathBuf;
use std::sync::Arc;
use api::{
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, HealthOutput, IngestOutput,
IngestRequest, ReadOutput, ReadRequest, SchemaApplyOutput, SchemaApplyRequest, SchemaOutput,
SnapshotQuery, ingest_output, schema_apply_output, snapshot_payload,
2026-04-10 20:49:41 +03:00
};
use axum::body::{Body, Bytes};
2026-04-10 20:49:41 +03:00
use axum::extract::DefaultBodyLimit;
use axum::extract::{Extension, Path, Query, Request, State};
use axum::http::StatusCode;
use axum::http::header::{AUTHORIZATION, CONTENT_TYPE};
use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Response};
use axum::routing::{delete, get, post};
use axum::{Json, Router};
use color_eyre::eyre::{Result, WrapErr, bail};
pub use config::{
AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings,
ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig,
load_config,
};
use futures::stream;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
2026-04-10 20:49:41 +03:00
use omnigraph_compiler::json_params_to_param_map;
use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::{JsonParamMode, ParamMap};
Add aws feature + SecretsManagerTokenSource backend Introduces an opt-in AWS Secrets Manager backend for bearer tokens, behind the `aws` Cargo feature. Default builds (on-prem, local dev) don't pull in the AWS SDK and don't pay its compile cost. - New Cargo feature `aws` gates the `aws-config` + `aws-sdk-secretsmanager` optional deps. Default features remain empty. - New `auth::aws::SecretsManagerTokenSource` implements `TokenSource` by fetching a JSON `{"actor_id": "token", ...}` payload from a named Secrets Manager secret. Credentials resolve via the AWS default chain (env, shared config, IMDSv2 instance role, ECS task role) so no explicit plumbing is needed under an IAM role. - New `resolve_token_source()` dispatches based on the `OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET` env var. If the var is set but the binary was built without `--features aws`, returns a clear rebuild instruction rather than silently falling back. - `serve()` now uses `resolve_token_source()` and logs which source was selected at startup. - `parse_json_secret_payload()` is factored out as a free function so the payload validation (trim whitespace, reject blank actor/token, reject non-object) is unit-testable without the AWS SDK. - New CI job `test_aws_feature` builds + tests with `--features aws`. Not in this PR (follow-ups): - Background refresh loop for rotation. `SecretsManagerTokenSource` advertises `supports_refresh: true` but the AppState-level refresh task isn't wired yet. - Config-YAML dispatch (today the AWS source is selected via env var only; eventually `server.bearer_tokens.source` in `omnigraph.yaml`). Tests: - Default-feature build: 33 lib + 41 integration + 64 openapi. - `--features aws` build: 32 lib (one test is cfg-gated) + 41 + 64. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 03:48:51 +03:00
pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
2026-04-10 20:49:41 +03:00
pub use policy::{
PolicyAction, PolicyCompiler, PolicyConfig, PolicyDecision, PolicyEngine, PolicyExpectation,
PolicyRequest, PolicyTestConfig,
};
use serde_json::Value;
use sha2::{Digest, Sha256};
use subtle::ConstantTimeEq;
2026-04-10 20:49:41 +03:00
use tokio::net::TcpListener;
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
use tokio::sync::mpsc;
2026-04-10 20:49:41 +03:00
use tower_http::trace::TraceLayer;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
use utoipa::OpenApi;
use utoipa::openapi::security::{Http, HttpAuthScheme, SecurityScheme};
type BearerTokenHash = [u8; 32];
fn hash_bearer_token(token: &str) -> BearerTokenHash {
let digest = Sha256::digest(token.as_bytes());
let mut out = [0u8; 32];
out.copy_from_slice(&digest);
out
}
#[derive(OpenApi)]
#[openapi(
info(
title = "Omnigraph API",
description = "HTTP API for the Omnigraph graph database",
),
paths(
server_health,
server_snapshot,
server_read,
server_export,
server_change,
server_schema_apply,
server_schema_get,
server_ingest,
server_branch_list,
server_branch_create,
server_branch_delete,
server_branch_merge,
server_commit_list,
server_commit_show,
),
modifiers(&SecurityAddon),
)]
pub struct ApiDoc;
struct SecurityAddon;
impl utoipa::Modify for SecurityAddon {
fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
openapi
.components
.get_or_insert_with(Default::default)
.add_security_scheme(
"bearer_token",
SecurityScheme::Http(Http::new(HttpAuthScheme::Bearer)),
);
}
}
2026-04-10 20:49:41 +03:00
const DEFAULT_REQUEST_BODY_LIMIT_BYTES: usize = 1_048_576;
const INGEST_REQUEST_BODY_LIMIT_BYTES: usize = 32 * 1024 * 1024;
const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSION");
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub uri: String,
pub bind: String,
pub policy_file: Option<PathBuf>,
}
#[derive(Clone)]
pub struct AppState {
uri: String,
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
/// PR 2 (MR-686): the engine is now `Arc<Omnigraph>` — no global
/// write lock. Concurrent handlers call `&self` engine APIs
/// directly. Per-(table, branch) write queues inside the engine
/// serialize same-key writers; per-actor admission control on
/// `workload` isolates noisy actors.
engine: Arc<Omnigraph>,
/// Per-actor admission control. See `workload::WorkloadController`.
workload: Arc<workload::WorkloadController>,
bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
2026-04-10 20:49:41 +03:00
policy_engine: Option<Arc<PolicyEngine>>,
}
#[derive(Debug, Clone)]
struct AuthenticatedActor(Arc<str>);
struct ExportStreamWriter {
sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
}
impl Write for ExportStreamWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.sender
.send(Ok(Bytes::copy_from_slice(buf)))
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
2026-04-10 20:49:41 +03:00
impl AuthenticatedActor {
fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug)]
pub struct ApiError {
status: StatusCode,
code: ErrorCode,
message: String,
merge_conflicts: Vec<api::MergeConflictOutput>,
manifest_conflict: Option<api::ManifestConflictOutput>,
2026-04-10 20:49:41 +03:00
}
impl AppState {
pub fn new(uri: String, db: Omnigraph) -> Self {
Self::new_with_bearer_tokens(uri, db, Vec::new())
}
pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option<String>) -> Self {
let bearer_tokens = normalize_bearer_token(bearer_token)
.into_iter()
.map(|token| ("default".to_string(), token))
.collect();
Self::new_with_bearer_tokens(uri, db, bearer_tokens)
}
pub fn new_with_bearer_tokens(
uri: String,
db: Omnigraph,
bearer_tokens: Vec<(String, String)>,
) -> Self {
Self::new_with_bearer_tokens_and_policy(uri, db, bearer_tokens, None)
}
pub fn new_with_bearer_tokens_and_policy(
uri: String,
db: Omnigraph,
bearer_tokens: Vec<(String, String)>,
policy_engine: Option<PolicyEngine>,
) -> Self {
let bearer_tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
2026-04-10 20:49:41 +03:00
.into_iter()
.map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
2026-04-10 20:49:41 +03:00
.collect();
Self {
uri,
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
engine: Arc::new(db),
workload: Arc::new(workload::WorkloadController::from_env()),
bearer_tokens: Arc::from(bearer_tokens),
2026-04-10 20:49:41 +03:00
policy_engine: policy_engine.map(Arc::new),
}
}
server+bench: AppState::new_with_workload; bench drops set_var, exercises heavy cap Two cubic findings on bench_actor_isolation.rs flagged together: P2 (lib.rs:202): `unsafe { std::env::set_var(...) }` ran inside `#[tokio::main] async fn main()` AFTER the multi-thread tokio runtime was up. Rust 2024 made `set_var` unsafe because libc's `setenv` is not thread-safe; concurrent env reads from logging or runtime internals can race or read torn state. Fix (correct by design, AGENTS.md rule 9): add a public `AppState::new_with_workload(uri, db, bearer_tokens, workload)` constructor that takes a caller-built `WorkloadController`. Tests and benches override per-actor caps via the constructor instead of mutating global env. Closes the bug class "tests need to mutate global env to override AppState defaults." P2 (lib.rs:130): heavy actor's `oneshot.await` inside the loop serialized — heavy in-flight count was always 1, so cap=1 never tripped on the heavy side. The bench validated isolation (light p99 bounded) but didn't demonstrate the rejection path. Fix: add a `--heavy-concurrency` arg (default 4) and spawn batches as concurrent tokio tasks bounded by an internal semaphore. With heavy_concurrency=4 and inflight_cap=1, the bench now reports heavy_too_many_requests > 0 and heavy_ok == 1 at peak — proving the gate fires for the heavy actor. Sample run on local FS (4 light actors × 30 ops, 20 heavy batches × 50 rows, heavy_concurrency=4, cap=1): heavy_ok: 1 heavy_too_many_requests: 19 light_ok: 120 light_too_many_requests: 0 light_p99: 565 ms (target < 2 s) Heavy saturates its own cap; light actors are completely unaffected. The isolation property is now empirically proven by the rejection counts rather than just by the latency tail. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 17:57:42 +02:00
/// Construct with a caller-provided [`workload::WorkloadController`].
/// Tests and benches use this to override per-actor caps without
/// mutating global env vars (which is unsafe in Rust 2024 once the
/// async runtime is up — `setenv` isn't thread-safe).
pub fn new_with_workload(
uri: String,
db: Omnigraph,
bearer_tokens: Vec<(String, String)>,
workload: workload::WorkloadController,
) -> Self {
let bearer_tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
.into_iter()
.map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
.collect();
Self {
uri,
engine: Arc::new(db),
workload: Arc::new(workload),
bearer_tokens: Arc::from(bearer_tokens),
policy_engine: None,
}
}
2026-04-10 20:49:41 +03:00
pub async fn open(uri: impl Into<String>) -> Result<Self> {
Self::open_with_bearer_token(uri, None).await
}
pub async fn open_with_bearer_token(
uri: impl Into<String>,
bearer_token: Option<String>,
) -> Result<Self> {
let bearer_tokens = normalize_bearer_token(bearer_token)
.into_iter()
.map(|token| ("default".to_string(), token))
.collect();
Self::open_with_bearer_tokens(uri, bearer_tokens).await
}
pub async fn open_with_bearer_tokens(
uri: impl Into<String>,
bearer_tokens: Vec<(String, String)>,
) -> Result<Self> {
let uri = uri.into();
let db = Omnigraph::open(&uri).await?;
Ok(Self::new_with_bearer_tokens(uri, db, bearer_tokens))
}
pub async fn open_with_bearer_tokens_and_policy(
uri: impl Into<String>,
bearer_tokens: Vec<(String, String)>,
policy_file: Option<&PathBuf>,
) -> Result<Self> {
let uri = uri.into();
let db = Omnigraph::open(&uri).await?;
let policy_engine = match policy_file {
Some(path) => Some(PolicyEngine::load(path, &uri)?),
None => None,
};
if policy_engine.is_some() && bearer_tokens.is_empty() {
bail!("policy requires at least one configured bearer token actor");
}
Ok(Self::new_with_bearer_tokens_and_policy(
uri,
db,
bearer_tokens,
policy_engine,
))
}
pub fn uri(&self) -> &str {
&self.uri
}
fn requires_bearer_auth(&self) -> bool {
!self.bearer_tokens.is_empty() || self.policy_engine.is_some()
}
fn authenticate_bearer_token(&self, provided_token: &str) -> Option<Arc<str>> {
// Hash the incoming token and compare against every stored digest in
// constant time. Iterate all entries unconditionally so total work —
// and therefore response timing — doesn't depend on which slot matches.
let provided_hash = hash_bearer_token(provided_token);
let mut matched: Option<Arc<str>> = None;
for (hash, actor) in self.bearer_tokens.iter() {
if bool::from(hash.ct_eq(&provided_hash)) && matched.is_none() {
matched = Some(Arc::clone(actor));
}
}
matched
2026-04-10 20:49:41 +03:00
}
fn policy_engine(&self) -> Option<&PolicyEngine> {
self.policy_engine.as_deref()
}
}
impl ApiError {
pub fn unauthorized(message: impl Into<String>) -> Self {
Self {
status: StatusCode::UNAUTHORIZED,
code: ErrorCode::Unauthorized,
message: message.into(),
merge_conflicts: Vec::new(),
manifest_conflict: None,
2026-04-10 20:49:41 +03:00
}
}
pub fn forbidden(message: impl Into<String>) -> Self {
Self {
status: StatusCode::FORBIDDEN,
code: ErrorCode::Forbidden,
message: message.into(),
merge_conflicts: Vec::new(),
manifest_conflict: None,
2026-04-10 20:49:41 +03:00
}
}
pub fn bad_request(message: impl Into<String>) -> Self {
Self {
status: StatusCode::BAD_REQUEST,
code: ErrorCode::BadRequest,
message: message.into(),
merge_conflicts: Vec::new(),
manifest_conflict: None,
2026-04-10 20:49:41 +03:00
}
}
pub fn not_found(message: impl Into<String>) -> Self {
Self {
status: StatusCode::NOT_FOUND,
code: ErrorCode::NotFound,
message: message.into(),
merge_conflicts: Vec::new(),
manifest_conflict: None,
2026-04-10 20:49:41 +03:00
}
}
pub fn conflict(message: impl Into<String>) -> Self {
Self {
status: StatusCode::CONFLICT,
code: ErrorCode::Conflict,
message: message.into(),
merge_conflicts: Vec::new(),
manifest_conflict: None,
2026-04-10 20:49:41 +03:00
}
}
pub fn internal(message: impl Into<String>) -> Self {
Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
code: ErrorCode::Internal,
message: message.into(),
merge_conflicts: Vec::new(),
manifest_conflict: None,
2026-04-10 20:49:41 +03:00
}
}
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
/// HTTP 429 Too Many Requests — actor exceeded their per-actor
/// admission cap (count or byte budget). Clients should respect the
/// `Retry-After` header. Mapped from `RejectReason::InFlightCountExceeded`
/// and `RejectReason::ByteBudgetExceeded`.
pub fn too_many_requests(message: impl Into<String>) -> Self {
Self {
status: StatusCode::TOO_MANY_REQUESTS,
code: ErrorCode::TooManyRequests,
message: message.into(),
merge_conflicts: Vec::new(),
manifest_conflict: None,
}
}
/// Convert a `WorkloadController` rejection into the matching
/// `ApiError` variant.
pub fn from_workload_reject(reject: workload::RejectReason) -> Self {
match reject {
workload::RejectReason::InFlightCountExceeded { .. }
| workload::RejectReason::ByteBudgetExceeded { .. } => {
Self::too_many_requests(reject.to_string())
}
}
}
2026-04-10 20:49:41 +03:00
fn merge_conflict(conflicts: Vec<api::MergeConflictOutput>) -> Self {
Self {
status: StatusCode::CONFLICT,
code: ErrorCode::Conflict,
message: summarize_merge_conflicts(&conflicts),
merge_conflicts: conflicts,
manifest_conflict: None,
}
}
fn manifest_version_conflict(
message: String,
details: api::ManifestConflictOutput,
) -> Self {
Self {
status: StatusCode::CONFLICT,
code: ErrorCode::Conflict,
message,
merge_conflicts: Vec::new(),
manifest_conflict: Some(details),
2026-04-10 20:49:41 +03:00
}
}
fn from_omni(err: OmniError) -> Self {
match err {
OmniError::Compiler(err) => Self::bad_request(err.to_string()),
OmniError::DataFusion(message) => Self::bad_request(format!("query: {message}")),
OmniError::Manifest(err) => match err.kind {
ManifestErrorKind::BadRequest => Self::bad_request(err.message),
ManifestErrorKind::NotFound => Self::not_found(err.message),
ManifestErrorKind::Conflict => match err.details {
Some(ManifestConflictDetails::ExpectedVersionMismatch {
table_key,
expected,
actual,
}) => Self::manifest_version_conflict(
err.message,
api::ManifestConflictOutput {
table_key,
expected,
actual,
},
),
_ => Self::conflict(err.message),
},
2026-04-10 20:49:41 +03:00
ManifestErrorKind::Internal => Self::internal(err.message),
},
OmniError::MergeConflicts(conflicts) => Self::merge_conflict(
conflicts
.iter()
.map(api::MergeConflictOutput::from)
.collect(),
),
OmniError::Lance(message) => Self::internal(format!("storage: {message}")),
OmniError::Io(err) => Self::internal(format!("io: {err}")),
}
}
}
fn summarize_merge_conflicts(conflicts: &[api::MergeConflictOutput]) -> String {
if conflicts.is_empty() {
return "merge conflicts".to_string();
}
let preview: Vec<String> = conflicts
.iter()
.take(3)
.map(|conflict| match conflict.row_id.as_deref() {
Some(row_id) => format!(
"{}:{} ({})",
conflict.table_key,
row_id,
conflict.kind.as_str()
),
None => format!("{} ({})", conflict.table_key, conflict.kind.as_str()),
})
.collect();
let suffix = if conflicts.len() > preview.len() {
format!("; and {} more", conflicts.len() - preview.len())
} else {
String::new()
};
format!("merge conflicts: {}{}", preview.join("; "), suffix)
}
/// Constant `Retry-After` value (seconds) emitted on 429 responses.
const RETRY_AFTER_SECONDS: &str = "60";
2026-04-10 20:49:41 +03:00
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let mut headers = axum::http::HeaderMap::new();
if matches!(self.code, ErrorCode::TooManyRequests) {
headers.insert(
axum::http::header::RETRY_AFTER,
axum::http::HeaderValue::from_static(RETRY_AFTER_SECONDS),
);
}
2026-04-10 20:49:41 +03:00
(
self.status,
headers,
2026-04-10 20:49:41 +03:00
Json(ErrorOutput {
error: self.message,
code: Some(self.code),
merge_conflicts: self.merge_conflicts,
manifest_conflict: self.manifest_conflict,
2026-04-10 20:49:41 +03:00
}),
)
.into_response()
}
}
pub fn init_tracing() {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
}
pub fn load_server_settings(
config_path: Option<&PathBuf>,
cli_uri: Option<String>,
cli_target: Option<String>,
cli_bind: Option<String>,
) -> Result<ServerConfig> {
let config = load_config(config_path)?;
let uri =
2026-04-14 04:12:14 +03:00
config.resolve_target_uri(cli_uri, cli_target.as_deref(), config.server_graph_name())?;
2026-04-10 20:49:41 +03:00
let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
let policy_file = config.resolve_policy_file();
Ok(ServerConfig {
uri,
bind,
policy_file,
})
}
pub fn build_app(state: AppState) -> Router {
let protected = Router::new()
.route("/snapshot", get(server_snapshot))
.route("/export", post(server_export))
.route("/read", post(server_read))
.route("/change", post(server_change))
.route("/schema", get(server_schema_get))
.route("/schema/apply", post(server_schema_apply))
2026-04-10 20:49:41 +03:00
.route(
"/ingest",
post(server_ingest).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
)
.route(
"/branches",
get(server_branch_list).post(server_branch_create),
)
.route("/branches/{branch}", delete(server_branch_delete))
.route("/branches/merge", post(server_branch_merge))
.route("/commits", get(server_commit_list))
.route("/commits/{commit_id}", get(server_commit_show))
.route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
));
Router::new()
.route("/healthz", get(server_health))
.route("/openapi.json", get(server_openapi))
2026-04-10 20:49:41 +03:00
.merge(protected)
.layer(DefaultBodyLimit::max(DEFAULT_REQUEST_BODY_LIMIT_BYTES))
.layer(TraceLayer::new_for_http())
.with_state(state)
}
pub async fn serve(config: ServerConfig) -> Result<()> {
Add aws feature + SecretsManagerTokenSource backend Introduces an opt-in AWS Secrets Manager backend for bearer tokens, behind the `aws` Cargo feature. Default builds (on-prem, local dev) don't pull in the AWS SDK and don't pay its compile cost. - New Cargo feature `aws` gates the `aws-config` + `aws-sdk-secretsmanager` optional deps. Default features remain empty. - New `auth::aws::SecretsManagerTokenSource` implements `TokenSource` by fetching a JSON `{"actor_id": "token", ...}` payload from a named Secrets Manager secret. Credentials resolve via the AWS default chain (env, shared config, IMDSv2 instance role, ECS task role) so no explicit plumbing is needed under an IAM role. - New `resolve_token_source()` dispatches based on the `OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET` env var. If the var is set but the binary was built without `--features aws`, returns a clear rebuild instruction rather than silently falling back. - `serve()` now uses `resolve_token_source()` and logs which source was selected at startup. - `parse_json_secret_payload()` is factored out as a free function so the payload validation (trim whitespace, reject blank actor/token, reject non-object) is unit-testable without the AWS SDK. - New CI job `test_aws_feature` builds + tests with `--features aws`. Not in this PR (follow-ups): - Background refresh loop for rotation. `SecretsManagerTokenSource` advertises `supports_refresh: true` but the AppState-level refresh task isn't wired yet. - Config-YAML dispatch (today the AWS source is selected via env var only; eventually `server.bearer_tokens.source` in `omnigraph.yaml`). Tests: - Default-feature build: 33 lib + 41 integration + 64 openapi. - `--features aws` build: 32 lib (one test is cfg-gated) + 41 + 64. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 03:48:51 +03:00
let token_source = resolve_token_source().await?;
info!(source = token_source.name(), "loaded bearer token source");
2026-04-10 20:49:41 +03:00
let state = AppState::open_with_bearer_tokens_and_policy(
config.uri.clone(),
token_source.load().await?,
2026-04-10 20:49:41 +03:00
config.policy_file.as_ref(),
)
.await?;
let listener = TcpListener::bind(&config.bind).await?;
info!(uri = %config.uri, bind = %config.bind, "serving omnigraph");
axum::serve(listener, build_app(state))
.with_graceful_shutdown(shutdown_signal())
.await?;
Ok(())
}
async fn shutdown_signal() {
if let Err(err) = tokio::signal::ctrl_c().await {
error!(error = %err, "failed to install ctrl-c handler");
return;
}
info!("shutdown signal received");
}
#[utoipa::path(
get,
path = "/healthz",
tag = "health",
operation_id = "health",
responses(
(status = 200, description = "Server is healthy", body = HealthOutput),
),
)]
/// Liveness probe.
///
/// Returns server status and version. Unauthenticated; safe to call from any
/// caller. Use this to confirm the server is reachable before invoking other
/// endpoints.
2026-04-10 20:49:41 +03:00
async fn server_health() -> Json<HealthOutput> {
Json(HealthOutput {
status: "ok".to_string(),
version: SERVER_VERSION.to_string(),
source_version: SERVER_SOURCE_VERSION.map(str::to_string),
})
}
async fn server_openapi(State(state): State<AppState>) -> Json<utoipa::openapi::OpenApi> {
let mut doc = ApiDoc::openapi();
if !state.requires_bearer_auth() {
strip_security(&mut doc);
}
Json(doc)
}
fn strip_security(doc: &mut utoipa::openapi::OpenApi) {
if let Some(components) = doc.components.as_mut() {
components.security_schemes.clear();
}
for path_item in doc.paths.paths.values_mut() {
for op in [
path_item.get.as_mut(),
path_item.post.as_mut(),
path_item.put.as_mut(),
path_item.delete.as_mut(),
path_item.options.as_mut(),
path_item.head.as_mut(),
path_item.patch.as_mut(),
path_item.trace.as_mut(),
]
.into_iter()
.flatten()
{
op.security = None;
}
}
}
2026-04-10 20:49:41 +03:00
async fn require_bearer_auth(
State(state): State<AppState>,
mut request: Request,
next: Next,
) -> std::result::Result<Response, ApiError> {
if !state.requires_bearer_auth() {
return Ok(next.run(request).await);
}
let Some(header) = request
.headers()
.get(AUTHORIZATION)
.and_then(|value| value.to_str().ok())
else {
return Err(ApiError::unauthorized("missing bearer token"));
};
let Some(provided_token) = header.strip_prefix("Bearer ") else {
return Err(ApiError::unauthorized("missing bearer token"));
};
let Some(actor) = state.authenticate_bearer_token(provided_token) else {
return Err(ApiError::unauthorized("invalid bearer token"));
};
request.extensions_mut().insert(AuthenticatedActor(actor));
Ok(next.run(request).await)
}
fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &PolicyDecision) {
info!(
actor_id = actor_id,
action = %request.action,
branch = request.branch.as_deref().unwrap_or(""),
target_branch = request.target_branch.as_deref().unwrap_or(""),
allowed = decision.allowed,
matched_rule_id = decision.matched_rule_id.as_deref().unwrap_or(""),
"policy decision"
);
}
fn authorize_request(
state: &AppState,
actor: Option<&AuthenticatedActor>,
mut request: PolicyRequest,
2026-04-10 20:49:41 +03:00
) -> std::result::Result<(), ApiError> {
let Some(engine) = state.policy_engine() else {
return Ok(());
};
let Some(actor) = actor else {
return Err(ApiError::unauthorized("missing bearer token"));
};
policy: codify signed-token-claim-only actor identity (MR-731) (#101) Warm-up commit for the policy chassis epic (MR-722). PR #1 of the chassis series — same role as schema-lint v1's commit #1 baseline. Zero behavioral change; establishes the regression test, the load-bearing doc comment, and the user-doc paragraph for an invariant already true in code. Server auth already resolves `actor_id` from the matched bearer token at `omnigraph-server/src/lib.rs:692-694`, overwriting whatever the handler put in the PolicyRequest. The principle is named in docs/dev/invariants.md Hard Invariant 11 ("clients cannot set actor identity directly"). What was missing: a regression test, a load-bearing doc comment at the resolution site, and a user-facing documentation paragraph. This commit adds all three. Why first. The actor-identity invariant is the foundation every other policy decision stands on. If `actor_id` can be spoofed, every chassis primitive (per-row scope, audit log, two-person rule) becomes ungated. Pinning the invariant first means PR #2 (the chassis core) doesn't have to re-prove this assertion. Changes: * crates/omnigraph-server/tests/server.rs — new regression test actor_id_resolves_from_bearer_token_ignoring_client_supplied_headers with three sub-assertions: - spoof-up: bearer for denied actor + X-Actor-Id naming allowed actor → 403 (header doesn't promote) - spoof-down: bearer for allowed actor + X-Actor-Id naming denied actor → 200 (header doesn't demote) - empty-string spoof: empty X-Actor-Id doesn't clear resolved actor Cross-link to MR-777 (auth boundary cases — actor-id collision + malformed bearer) noted in the test docstring. * crates/omnigraph-server/src/lib.rs — expanded doc comment at the actor-resolution site explaining the SECURITY INVARIANT, citing Hard Invariant 11, the Supabase RLS history footgun, and the regression test that pins the contract. Reader thinking "I should let clients override actor_id for impersonation" hits this comment first. * docs/user/policy.md — new "Actor identity (signed-claim-only)" section near the existing Server enforcement section. Closes the user-facing doc gap MR-731's "Done when" requires. Architectural decisions for PR #2+ pinned this session (not implemented here, recorded so future implementers don't re-litigate): - PolicyEngine moves to new `omnigraph-policy` workspace crate so both engine and server can depend on it (Q2). - `enforce(action, scope, actor)` will take a new `ResourceScope` enum, leaving room for MR-725's per-type and per-row variants (Q3). - `PolicyAction::Admin` is kept and wired (Option A) — meta-action for policy-management surfaces (hot reload, audit log query, approvals list) as those consumer features land (Q4). Test results: - cargo test -p omnigraph-server --test server: 45 pass (44 existing + 1 new); no regressions - scripts/check-agents-md.sh: passes (34 links / 33 docs OK) Out of scope (PR #2+): - Omnigraph::with_policy() + enforce() method - omnigraph-policy crate creation - ResourceScope enum - CLI policy injection into Omnigraph - HTTP-layer redundant-check removal - MR-724 Admin action wiring (PR #2) - MR-723 default-deny 3-state (PR #4) - MR-736 severity warn/deny (PR #5) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 02:51:34 +03:00
// SECURITY INVARIANT (MR-731): actor identity comes from the matched
// bearer token, never from a client-supplied request header, query
// parameter, or body field. This line is the single chokepoint where
// the authoritative actor (resolved from the bearer match by
// `require_bearer_auth`) overwrites whatever the handler put in the
// PolicyRequest. Removing or weakening it lets clients spoof identity —
// exactly the Supabase RLS footgun ("trusting raw_user_meta_data is
// asking the attacker if they're an admin"). The principle is codified
// in `docs/dev/invariants.md` Hard Invariant 11 ("clients cannot set
// actor identity directly") and pinned by the regression test
// `actor_id_resolves_from_bearer_token_ignoring_client_supplied_headers`
// in `crates/omnigraph-server/tests/server.rs`.
//
// Side effect: also prevents an empty-string default at any handler
// call site from ever reaching the engine as a policy subject.
request.actor_id = actor.as_str().to_string();
2026-04-10 20:49:41 +03:00
let decision = engine
.authorize(&request)
.map_err(|err| ApiError::internal(format!("policy: {err}")))?;
log_policy_decision(actor.as_str(), &request, &decision);
if decision.allowed {
Ok(())
} else {
Err(ApiError::forbidden(decision.message))
}
}
#[utoipa::path(
get,
path = "/snapshot",
tag = "snapshots",
operation_id = "getSnapshot",
params(SnapshotQuery),
responses(
(status = 200, description = "Database snapshot", body = api::SnapshotOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Read the current snapshot of a branch.
///
/// Returns the manifest version plus per-table metadata (path, version, row
/// count) for every table on the branch. Defaults to `main` when `branch` is
/// omitted. Read-only.
2026-04-10 20:49:41 +03:00
async fn server_snapshot(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Query(query): Query<SnapshotQuery>,
) -> std::result::Result<Json<api::SnapshotOutput>, ApiError> {
let branch = query.branch.unwrap_or_else(|| "main".to_string());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: Some(branch.clone()),
target_branch: None,
},
)?;
let snapshot = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.snapshot_of(ReadTarget::branch(branch.as_str()))
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(snapshot_payload(&branch, &snapshot)))
}
#[utoipa::path(
post,
path = "/read",
tag = "queries",
operation_id = "read",
request_body = ReadRequest,
responses(
(status = 200, description = "Query results", body = ReadOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Execute a GQ read query.
///
/// Runs the query in `query_source` against either a branch or a frozen
/// snapshot (mutually exclusive). When `query_source` defines multiple named
/// queries, pick one with `query_name`. `params` is a JSON object whose keys
/// match the parameters declared by the query. Returns rows as a JSON array
/// plus a `columns` list. Read-only.
2026-04-10 20:49:41 +03:00
async fn server_read(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Json(request): Json<ReadRequest>,
) -> std::result::Result<Json<ReadOutput>, ApiError> {
if request.branch.is_some() && request.snapshot.is_some() {
return Err(ApiError::bad_request(
"read request may specify branch or snapshot, not both",
));
}
let target = read_target_from_request(request.branch, request.snapshot);
let policy_branch = match &target {
ReadTarget::Branch(branch) => Some(branch.clone()),
ReadTarget::Snapshot(_) if state.policy_engine().is_some() && actor.is_some() => {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.resolved_branch_of(target.clone())
.await
.map(|branch| branch.or_else(|| Some("main".to_string())))
.map_err(ApiError::from_omni)?
}
ReadTarget::Snapshot(_) => None,
};
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: policy_branch,
target_branch: None,
},
)?;
let (selected_name, query_params) =
select_named_query(&request.query_source, request.query_name.as_deref())
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let params = query_params_from_json(&query_params, request.params.as_ref())
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let result = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.query(
target.clone(),
&request.query_source,
&selected_name,
&params,
)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(api::read_output(selected_name, &target, result)))
}
#[utoipa::path(
post,
path = "/export",
tag = "queries",
operation_id = "export",
request_body = ExportRequest,
responses(
(status = 200, description = "Exported data as NDJSON", content_type = "application/x-ndjson"),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Stream the contents of a branch as NDJSON.
///
/// Emits one JSON object per line (`application/x-ndjson`). Filter with
/// `type_names` (node/edge type names) and/or `table_keys`; both empty
/// streams the entire branch. Suitable for large exports — the response is
/// streamed, not buffered. Read-only.
2026-04-10 20:49:41 +03:00
async fn server_export(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Json(request): Json<ExportRequest>,
) -> std::result::Result<Response, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.unwrap_or_default(),
action: PolicyAction::Export,
branch: Some(branch.clone()),
target_branch: None,
},
)?;
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let engine = Arc::clone(&state.engine);
let type_names = request.type_names.clone();
let table_keys = request.table_keys.clone();
let (tx, rx) = mpsc::unbounded_channel::<std::result::Result<Bytes, io::Error>>();
tokio::spawn(async move {
let result = {
let mut writer = ExportStreamWriter { sender: tx.clone() };
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
engine
.export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer)
.await
};
if let Err(err) = result {
let _ = tx.send(Err(io::Error::other(err.to_string())));
}
});
let body = Body::from_stream(stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
}));
2026-04-10 20:49:41 +03:00
Ok((
StatusCode::OK,
[(CONTENT_TYPE, "application/x-ndjson; charset=utf-8")],
body,
2026-04-10 20:49:41 +03:00
)
.into_response())
}
#[utoipa::path(
post,
path = "/change",
tag = "mutations",
operation_id = "change",
request_body = ChangeRequest,
responses(
(status = 200, description = "Mutation results", body = ChangeOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 409, description = "Merge conflict", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Apply a GQ mutation to a branch.
///
/// Writes to the named `branch` (defaults to `main`). Mutations are atomic
/// per call and produce a new commit. Returns counts of nodes and edges
/// affected. **Destructive**: on success the branch is updated; rejected
/// mutations may still acquire locks briefly. Returns 409 on merge conflict.
2026-04-10 20:49:41 +03:00
async fn server_change(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Json(request): Json<ChangeRequest>,
) -> std::result::Result<Json<ChangeOutput>, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
2026-04-10 20:49:41 +03:00
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::Change,
branch: Some(branch.clone()),
target_branch: None,
},
)?;
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
// Per-actor admission: bound concurrent in-flight mutations and
// estimated bytes per actor. Cedar runs FIRST so denied requests
// don't consume admission slots. Estimate uses the request body
2026-05-10 14:16:26 +00:00
// size as a coarse proxy; engine memory pressure can run higher.
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let est_bytes = request.query_source.len() as u64
+ request
.params
.as_ref()
.map(|p| p.to_string().len() as u64)
.unwrap_or(0);
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
2026-04-10 20:49:41 +03:00
let (selected_name, query_params) =
select_named_query(&request.query_source, request.query_name.as_deref())
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let params = query_params_from_json(&query_params, request.params.as_ref())
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let result = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.mutate_as(
&branch,
&request.query_source,
&selected_name,
&params,
actor_id,
)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(ChangeOutput {
branch,
query_name: selected_name,
affected_nodes: result.affected_nodes,
affected_edges: result.affected_edges,
actor_id: actor_id.map(str::to_string),
}))
}
#[utoipa::path(
get,
path = "/schema",
tag = "schema",
operation_id = "getSchema",
responses(
(status = 200, description = "Current schema source", body = SchemaOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Read the current schema source.
///
/// Returns the project's schema as a single string in `.pg` source form.
/// Useful for clients that want to introspect available types and tables
/// before constructing GQ queries. Read-only.
async fn server_schema_get(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
) -> std::result::Result<Json<SchemaOutput>, ApiError> {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: None,
target_branch: None,
},
)?;
let schema_source = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
db.schema_source().to_string()
};
Ok(Json(SchemaOutput { schema_source }))
}
#[utoipa::path(
post,
path = "/schema/apply",
tag = "mutations",
operation_id = "applySchema",
request_body = SchemaApplyRequest,
responses(
(status = 200, description = "Schema apply results", body = SchemaApplyOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Apply a schema migration.
///
/// Diffs `schema_source` against the current schema and applies the resulting
/// migration steps (add/drop type, add/drop column, etc.). **Destructive**:
/// some steps drop data. Returns the list of steps applied; if `applied` is
/// false the diff was unsupported and no changes were made.
async fn server_schema_apply(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Json(request): Json<SchemaApplyRequest>,
) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::SchemaApply,
branch: None,
target_branch: Some("main".to_string()),
},
)?;
let est_bytes = request.schema_source.len() as u64;
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
let result = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
db.apply_schema(&request.schema_source)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(schema_apply_output(state.uri(), result)))
}
#[utoipa::path(
post,
path = "/ingest",
tag = "mutations",
operation_id = "ingest",
request_body = IngestRequest,
responses(
(status = 200, description = "Ingest results", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Bulk-ingest NDJSON data into a branch.
///
/// `data` is NDJSON with one record per line. `mode` controls behavior on
/// existing rows: `merge` upserts by id (default), `append` blindly inserts,
/// `overwrite` replaces table contents. If `branch` does not exist it is
/// created from `from` (defaults to `main`). **Destructive** when `mode` is
/// `overwrite` or when ingest produces conflicting writes.
2026-04-10 20:49:41 +03:00
async fn server_ingest(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<Json<IngestOutput>, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let from = request.from.unwrap_or_else(|| "main".to_string());
let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
2026-04-10 20:49:41 +03:00
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
let branch_exists = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.branch_list()
.await
.map_err(ApiError::from_omni)?
.into_iter()
.any(|name| name == branch)
};
if !branch_exists {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::BranchCreate,
branch: Some(from.clone()),
target_branch: Some(branch.clone()),
},
)?;
}
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::Change,
branch: Some(branch.clone()),
target_branch: None,
},
)?;
let est_bytes = request.data.len() as u64;
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
2026-04-10 20:49:41 +03:00
let result = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(ingest_output(
state.uri(),
&result,
actor_id.map(str::to_string),
)))
}
#[utoipa::path(
get,
path = "/branches",
tag = "branches",
operation_id = "listBranches",
responses(
(status = 200, description = "List of branches", body = BranchListOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// List all branches.
///
/// Returns branch names sorted alphabetically. Read-only.
2026-04-10 20:49:41 +03:00
async fn server_branch_list(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
) -> std::result::Result<Json<BranchListOutput>, ApiError> {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: None,
target_branch: None,
},
)?;
let mut branches = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.branch_list().await.map_err(ApiError::from_omni)?
};
branches.sort();
Ok(Json(BranchListOutput { branches }))
}
#[utoipa::path(
post,
path = "/branches",
tag = "branches",
operation_id = "createBranch",
request_body = BranchCreateRequest,
responses(
(status = 200, description = "Branch created", body = BranchCreateOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 409, description = "Branch already exists", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Create a new branch.
///
/// Forks `name` off of `from` (defaults to `main`). The new branch shares
/// table data with its parent until it is mutated. Returns 409 if `name`
/// already exists.
2026-04-10 20:49:41 +03:00
async fn server_branch_create(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Json(request): Json<BranchCreateRequest>,
) -> std::result::Result<Json<BranchCreateOutput>, ApiError> {
let from = request.from.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
2026-04-10 20:49:41 +03:00
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.unwrap_or_default(),
action: PolicyAction::BranchCreate,
branch: Some(from.clone()),
target_branch: Some(request.name.clone()),
},
)?;
// Branch metadata only — small constant bytes estimate. The Lance
// shallow-clone work is bounded by the parent's manifest size, not
// the request body.
let _admission = state
.workload
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
2026-04-10 20:49:41 +03:00
{
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.branch_create_from(ReadTarget::branch(&from), &request.name)
.await
.map_err(ApiError::from_omni)?;
}
Ok(Json(BranchCreateOutput {
uri: state.uri().to_string(),
from,
name: request.name,
actor_id: actor.map(|Extension(actor)| actor.as_str().to_string()),
}))
}
#[utoipa::path(
delete,
path = "/branches/{branch}",
tag = "branches",
operation_id = "deleteBranch",
params(
("branch" = String, Path, description = "Branch name to delete"),
),
responses(
(status = 200, description = "Branch deleted", body = BranchDeleteOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 404, description = "Branch not found", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Delete a branch.
///
/// **Irreversible.** Removes the branch pointer; commits remain reachable
/// only if referenced by another branch. Returns 404 if the branch does not
/// exist.
2026-04-10 20:49:41 +03:00
async fn server_branch_delete(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Path(branch): Path<String>,
) -> std::result::Result<Json<BranchDeleteOutput>, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
2026-04-10 20:49:41 +03:00
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::BranchDelete,
branch: None,
target_branch: Some(branch.clone()),
},
)?;
// Metadata-only manifest tombstone — small constant estimate.
let _admission = state
.workload
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
2026-04-10 20:49:41 +03:00
{
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.branch_delete(&branch)
.await
.map_err(ApiError::from_omni)?;
}
Ok(Json(BranchDeleteOutput {
uri: state.uri().to_string(),
name: branch,
actor_id: actor_id.map(str::to_string),
}))
}
#[utoipa::path(
post,
path = "/branches/merge",
tag = "branches",
operation_id = "mergeBranches",
request_body = BranchMergeRequest,
responses(
(status = 200, description = "Branches merged", body = BranchMergeOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 409, description = "Merge conflict", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Merge one branch into another.
///
/// Merges `source` into `target` (defaults to `main`). Outcome is one of
/// `already_up_to_date`, `fast_forward`, or `merged`. Returns 409 with the
/// list of conflicts if the merge cannot be completed; the target is left
/// unchanged in that case. **Destructive** to `target` on success.
2026-04-10 20:49:41 +03:00
async fn server_branch_merge(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Json(request): Json<BranchMergeRequest>,
) -> std::result::Result<Json<BranchMergeOutput>, ApiError> {
let target = request.target.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
2026-04-10 20:49:41 +03:00
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::BranchMerge,
branch: Some(request.source.clone()),
target_branch: Some(target.clone()),
},
)?;
// Merge body is small JSON; the heavy work is in the engine but is
// bounded per-(table, branch) by the writer queue. Small constant
// estimate suffices for the actor in-flight count.
let _admission = state
.workload
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
2026-04-10 20:49:41 +03:00
let outcome = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.branch_merge_as(&request.source, &target, actor_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(BranchMergeOutput {
source: request.source,
target,
outcome: outcome.into(),
actor_id: actor_id.map(str::to_string),
}))
}
#[utoipa::path(
get,
path = "/commits",
tag = "commits",
operation_id = "listCommits",
params(CommitListQuery),
responses(
(status = 200, description = "List of commits", body = CommitListOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// List commits.
///
/// Filter by `branch` to get the commits on a single branch (most recent
/// first); omit to list across all branches. Read-only.
2026-04-10 20:49:41 +03:00
async fn server_commit_list(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Query(query): Query<CommitListQuery>,
) -> std::result::Result<Json<CommitListOutput>, ApiError> {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: query.branch.clone(),
target_branch: None,
},
)?;
let commits = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.list_commits(query.branch.as_deref())
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(CommitListOutput {
commits: commits.iter().map(api::commit_output).collect(),
}))
}
#[utoipa::path(
get,
path = "/commits/{commit_id}",
tag = "commits",
operation_id = "getCommit",
params(
("commit_id" = String, Path, description = "Commit identifier"),
),
responses(
(status = 200, description = "Commit details", body = api::CommitOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 404, description = "Commit not found", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Get a single commit.
///
/// Returns the commit's manifest version, parent commit(s), and creation
/// metadata. Read-only.
2026-04-10 20:49:41 +03:00
async fn server_commit_show(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Path(commit_id): Path<String>,
) -> std::result::Result<Json<api::CommitOutput>, ApiError> {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: None,
target_branch: None,
},
)?;
let commit = {
server: flip AppState to Arc<Omnigraph>, wire admission on /change (PR 2 Step F) The substantive PR 2 change. Removes the global server `RwLock<Omnigraph>` that has serialized every mutating request across all actors. Disjoint `(table, branch)` writes from different actors now run concurrently, guarded only by the engine's per-(table, branch) write queue (PR 1b) and per-actor admission control (PR 2 Step E). AppState changes: - `db: Arc<RwLock<Omnigraph>>` -> `engine: Arc<Omnigraph>` - New field: `workload: Arc<workload::WorkloadController>` initialized from env (`OMNIGRAPH_PER_ACTOR_INFLIGHT_MAX=16`, `OMNIGRAPH_PER_ACTOR_BYTES_MAX=4GiB`, `OMNIGRAPH_GLOBAL_REWRITE_MAX=4`). - `tokio::sync::RwLock` import dropped. Handler updates (16 sites): - All `Arc::clone(&state.db).read_owned().await` and `write_owned()` calls replaced with `let db = &state.engine`. Engine APIs are now `&self` (Step C) so this works directly. - `/export` clones `Arc<Omnigraph>` once and moves into the spawned task instead of acquiring a long-held read lock. - `/change` handler additionally wires `state.workload.try_admit(&actor_arc, est_bytes)`. Cedar runs FIRST so denied requests don't consume admission slots; admission runs SECOND before the engine call. `est_bytes` uses the request body size as a coarse proxy. API surface additions (`api::ErrorCode`): - `TooManyRequests` -> HTTP 429 (per-actor cap exceeded; respect `Retry-After`) - `ServiceUnavailable` -> HTTP 503 (global rewrite pool exhausted) `ApiError` constructors `too_many_requests` / `service_unavailable` and `from_workload_reject` (maps `RejectReason` variants to HTTP status). Other mutating handlers (`/ingest`, `/branches/*`, `/branches/merge`, `/schema/apply`) currently flow through the Arc<Omnigraph> path without admission gates; wiring those is mechanical and lands as a follow-up. The /change hot path covers the bulk of MR-686's load profile. OpenAPI regenerated to include the new ErrorCode variants. 102 lib + 39 server tests + 5 workload tests pass. The regression sentinel `change_conflict_returns_manifest_conflict_409` continues to pass (revalidation perf opt + per-table queue + publisher CAS preserve manifest_conflict semantics under concurrent writers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 17:08:26 +02:00
let db = &state.engine;
2026-04-10 20:49:41 +03:00
db.get_commit(&commit_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(api::commit_output(&commit)))
}
fn read_target_from_request(branch: Option<String>, snapshot: Option<String>) -> ReadTarget {
if let Some(snapshot) = snapshot {
ReadTarget::snapshot(omnigraph::db::SnapshotId::new(snapshot))
} else {
ReadTarget::branch(branch.unwrap_or_else(|| "main".to_string()))
}
}
fn select_named_query(
query_source: &str,
requested_name: Option<&str>,
) -> Result<(String, Vec<omnigraph_compiler::query::ast::Param>)> {
let parsed = parse_query(query_source)?;
let query = if let Some(name) = requested_name {
parsed
.queries
.into_iter()
.find(|query| query.name == name)
.ok_or_else(|| color_eyre::eyre::eyre!("query '{}' not found", name))?
} else if parsed.queries.len() == 1 {
parsed.queries.into_iter().next().unwrap()
} else {
bail!("query file contains multiple queries; pass --name");
};
Ok((query.name, query.params))
}
fn query_params_from_json(
query_params: &[omnigraph_compiler::query::ast::Param],
params_json: Option<&Value>,
) -> Result<ParamMap> {
json_params_to_param_map(params_json, query_params, JsonParamMode::Standard)
.map_err(|err| color_eyre::eyre::eyre!(err.to_string()))
}
fn normalize_bearer_token(value: Option<String>) -> Option<String> {
value
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn normalize_bearer_actor(value: String) -> Result<String> {
let value = value.trim().to_string();
if value.is_empty() {
bail!("bearer token actor names must not be blank");
}
Ok(value)
}
fn parse_bearer_tokens_json(value: &str) -> Result<Vec<(String, String)>> {
let entries: HashMap<String, String> = serde_json::from_str(value)
.wrap_err("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON must be a JSON object of actor->token")?;
Ok(entries.into_iter().collect())
}
fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)>> {
let contents = fs::read_to_string(path)
.wrap_err_with(|| format!("failed to read bearer tokens file at {path}"))?;
parse_bearer_tokens_json(&contents)
.wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
}
fn validate_bearer_tokens(entries: Vec<(String, String)>) -> Result<Vec<(String, String)>> {
let mut seen_actors = HashSet::new();
let mut seen_tokens = HashSet::new();
let mut normalized = Vec::with_capacity(entries.len());
for (actor, token) in entries {
let actor = normalize_bearer_actor(actor)?;
let Some(token) = normalize_bearer_token(Some(token)) else {
bail!("bearer token for actor '{actor}' must not be blank");
};
if !seen_actors.insert(actor.clone()) {
bail!("duplicate bearer token actor '{actor}'");
}
if !seen_tokens.insert(token.clone()) {
bail!("duplicate bearer token value configured");
}
normalized.push((actor, token));
}
normalized.sort_by(|(left, _), (right, _)| left.cmp(right));
Ok(normalized)
}
fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
let mut entries = Vec::new();
if let Some(token) = normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKEN").ok())
{
entries.push(("default".to_string(), token));
}
if let Some(path) =
normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE").ok())
{
entries.extend(read_bearer_tokens_file(&path)?);
} else if let Some(json) =
normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON").ok())
{
entries.extend(parse_bearer_tokens_json(&json)?);
}
validate_bearer_tokens(entries)
}
#[cfg(test)]
mod tests {
use super::{
hash_bearer_token, load_server_settings, normalize_bearer_token, parse_bearer_tokens_json,
2026-04-10 20:49:41 +03:00
server_bearer_tokens_from_env,
};
use std::env;
use std::fs;
use tempfile::tempdir;
#[test]
fn hash_bearer_token_produces_32_byte_output() {
let hash = hash_bearer_token("any-token");
assert_eq!(hash.len(), 32);
}
#[test]
fn hash_bearer_token_is_deterministic() {
assert_eq!(
hash_bearer_token("stable-input"),
hash_bearer_token("stable-input"),
);
}
#[test]
fn hash_bearer_token_differs_for_different_inputs() {
assert_ne!(hash_bearer_token("token-a"), hash_bearer_token("token-b"));
}
#[test]
fn hash_bearer_token_matches_known_sha256_vector() {
// SHA-256("abc"). If this ever fails, the hash function was swapped.
let hash = hash_bearer_token("abc");
let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
assert_eq!(
hex,
"ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
);
}
2026-04-10 20:49:41 +03:00
#[test]
fn server_settings_load_from_yaml_config() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
&config,
r#"
2026-04-14 04:12:14 +03:00
graphs:
2026-04-10 20:49:41 +03:00
local:
uri: /tmp/demo.omni
server:
2026-04-14 04:12:14 +03:00
graph: local
2026-04-10 20:49:41 +03:00
bind: 0.0.0.0:9090
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config), None, None, None).unwrap();
assert_eq!(settings.uri, "/tmp/demo.omni");
assert_eq!(settings.bind, "0.0.0.0:9090");
}
#[test]
fn server_settings_cli_flags_override_yaml_config() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
&config,
r#"
2026-04-14 04:12:14 +03:00
graphs:
2026-04-10 20:49:41 +03:00
local:
uri: /tmp/demo.omni
server:
2026-04-14 04:12:14 +03:00
graph: local
2026-04-10 20:49:41 +03:00
bind: 127.0.0.1:8080
"#,
)
.unwrap();
let settings = load_server_settings(
Some(&config),
Some("/tmp/override.omni".to_string()),
None,
Some("0.0.0.0:9999".to_string()),
)
.unwrap();
assert_eq!(settings.uri, "/tmp/override.omni");
assert_eq!(settings.bind, "0.0.0.0:9999");
}
#[test]
fn server_settings_can_resolve_named_target() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
&config,
r#"
2026-04-14 04:12:14 +03:00
graphs:
2026-04-10 20:49:41 +03:00
local:
uri: ./demo.omni
dev:
uri: http://127.0.0.1:8080
server:
2026-04-14 04:12:14 +03:00
graph: local
2026-04-10 20:49:41 +03:00
bind: 127.0.0.1:8080
"#,
)
.unwrap();
let settings =
load_server_settings(Some(&config), None, Some("dev".to_string()), None).unwrap();
assert_eq!(settings.uri, "http://127.0.0.1:8080");
}
#[test]
fn server_settings_require_uri_from_cli_or_config() {
let error = load_server_settings(None, None, None, None).unwrap_err();
assert!(error.to_string().contains("URI must be provided"));
}
#[test]
fn normalize_bearer_token_trims_and_filters_blank_values() {
assert_eq!(normalize_bearer_token(None), None);
assert_eq!(normalize_bearer_token(Some(" ".to_string())), None);
assert_eq!(
normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(),
Some("demo-token")
);
}
struct EnvGuard {
saved: Vec<(&'static str, Option<String>)>,
}
impl EnvGuard {
fn set(vars: &[(&'static str, Option<&str>)]) -> Self {
let saved = vars
.iter()
.map(|(name, _)| (*name, env::var(name).ok()))
.collect::<Vec<_>>();
for (name, value) in vars {
unsafe {
match value {
Some(value) => env::set_var(name, value),
None => env::remove_var(name),
}
}
}
Self { saved }
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
for (name, value) in self.saved.drain(..) {
unsafe {
match value {
Some(value) => env::set_var(name, value),
None => env::remove_var(name),
}
}
}
}
}
#[test]
fn parse_bearer_tokens_json_reads_actor_token_map() {
let tokens = parse_bearer_tokens_json(r#"{"alice":" token-a ","bob":"token-b"}"#).unwrap();
assert_eq!(tokens.len(), 2);
assert!(tokens.contains(&("alice".to_string(), " token-a ".to_string())));
assert!(tokens.contains(&("bob".to_string(), "token-b".to_string())));
}
#[test]
fn server_bearer_tokens_from_env_reads_legacy_token_and_token_file() {
let temp = tempdir().unwrap();
let tokens_path = temp.path().join("tokens.json");
fs::write(
&tokens_path,
r#"{"team-01":"token-one","team-02":"token-two"}"#,
)
.unwrap();
let _guard = EnvGuard::set(&[
("OMNIGRAPH_SERVER_BEARER_TOKEN", Some(" legacy-token ")),
(
"OMNIGRAPH_SERVER_BEARER_TOKENS_FILE",
Some(tokens_path.to_str().unwrap()),
),
("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
]);
let tokens = server_bearer_tokens_from_env().unwrap();
assert_eq!(
tokens,
vec![
("default".to_string(), "legacy-token".to_string()),
("team-01".to_string(), "token-one".to_string()),
("team-02".to_string(), "token-two".to_string()),
]
);
}
}