diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index ade2ed9..8ee9f6d 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -132,23 +132,58 @@ pub struct ServerConfig { pub allow_unauthenticated: bool, } -#[derive(Clone)] -pub struct AppState { - uri: String, - /// PR 2 (MR-686): the engine is now `Arc` — no global - /// write lock. Concurrent handlers call `&self` engine APIs - /// directly. Per-(table, branch) write queues inside the engine - /// serialize same-key writers; per-actor admission control on - /// `workload` isolates noisy actors. - engine: Arc, - /// Per-actor admission control. See `workload::WorkloadController`. - workload: Arc, - bearer_tokens: Arc<[(BearerTokenHash, Arc)]>, - policy_engine: Option>, +/// Server runtime topology. Single mode = legacy `omnigraph-server ` +/// invocation, one graph, flat HTTP routes. Multi mode = `--config +/// omnigraph.yaml` with a `graphs:` map, N graphs, cluster routes +/// (`/graphs/{graph_id}/...`). Mode is determined at startup by +/// `load_server_settings`. +/// +/// Both modes share the same handler bodies — the routing middleware +/// (`resolve_graph_handle`) injects `Arc` as a request +/// extension so handlers never see the mode discriminator directly. +#[derive(Clone, Debug)] +pub enum ServerMode { + /// Single-graph invocation. The `uri` is the only graph's URI. + /// Backward compatible with v0.6.0 deployments. + Single { uri: String }, + /// Multi-graph invocation (MR-668). `config_path` is the + /// `omnigraph.yaml` the server reads at startup and (in PR 7, + /// deferred) would rewrite on `POST /graphs`. With DELETE deferred, + /// the current scope does not rewrite the config file — `POST` + /// will (PR 7). + Multi { config_path: Option }, } -#[derive(Debug, Clone)] -struct AuthenticatedActor(Arc); +/// Sentinel `GraphId` for single-graph mode. The single graph is +/// registered under this key in the registry; the routing middleware +/// uses it to inject the handle on every flat-route request. The +/// value is operator-invisible — it does NOT need to be unique with +/// any user-facing graph id since single-mode never serves cluster +/// routes. +pub(crate) const SINGLE_GRAPH_KEY_ID: &str = "default"; + +#[derive(Clone)] +pub struct AppState { + /// Topology + (single mode only) the single graph's URI for + /// startup wiring. The registry below is the runtime source of truth. + mode: ServerMode, + /// PR 2 (MR-686) + PR 4a (MR-668): the engine and per-graph policy + /// now live inside `GraphHandle`s in the registry. Reads via + /// `ArcSwap` are lock-free; mutations (currently only `insert`) + /// serialize through the registry's internal mutex. + registry: Arc, + /// Per-actor admission control. Process-wide (not per-graph) — + /// see MR-668 decision Q6. + workload: Arc, + bearer_tokens: Arc<[(BearerTokenHash, Arc)]>, + /// Server-level Cedar policy. Used by management endpoints (`POST + /// /graphs`, `GET /graphs`) which act on the registry resource, + /// not on a per-graph resource. Loaded from `server.policy.file` + /// in `omnigraph.yaml`. `None` outside multi mode and when no + /// server policy is configured. Per-graph policies live on each + /// `GraphHandle.policy`. + server_policy: Option>, +} struct ExportStreamWriter { sender: mpsc::UnboundedSender>, @@ -167,12 +202,6 @@ impl Write for ExportStreamWriter { } } -impl AuthenticatedActor { - fn as_str(&self) -> &str { - &self.0 - } -} - #[derive(Debug)] pub struct ApiError { status: StatusCode, @@ -209,34 +238,10 @@ impl AppState { bearer_tokens: Vec<(String, String)>, policy_engine: Option, ) -> Self { - let bearer_tokens: Vec<(BearerTokenHash, Arc)> = bearer_tokens - .into_iter() - .map(|(actor, token)| (hash_bearer_token(&token), Arc::::from(actor))) - .collect(); - let policy_engine: Option> = policy_engine.map(Arc::new); - // MR-722 chassis: inject the policy checker into the engine so - // `Omnigraph::apply_schema_as` (and PR #3's fan-out of the - // remaining writers) gates at engine-layer too. HTTP-layer - // `authorize_request` still fires first; the engine-layer gate - // is the redundant-but-correct backstop, plus the only path - // that protects SDK / embedded callers. PR #3 removes the HTTP - // redundancy once we're confident the engine gate covers it. - let db = if let Some(engine) = policy_engine.as_ref() { - // Unsizing coercion: Arc → Arc. - // Needs the explicit `as` cast — Rust 2024 doesn't infer it through - // `Arc::clone`. - let checker = Arc::clone(engine) as Arc; - db.with_policy(checker) - } else { - db - }; - Self { - uri, - engine: Arc::new(db), - workload: Arc::new(workload::WorkloadController::from_env()), - bearer_tokens: Arc::from(bearer_tokens), - policy_engine, - } + let bearer_tokens = hash_bearer_tokens(bearer_tokens); + let per_graph_policy: Option> = policy_engine.map(Arc::new); + let workload = Arc::new(workload::WorkloadController::from_env()); + Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, workload) } /// Construct with a caller-provided [`workload::WorkloadController`]. @@ -249,17 +254,8 @@ impl AppState { bearer_tokens: Vec<(String, String)>, workload: workload::WorkloadController, ) -> Self { - let bearer_tokens: Vec<(BearerTokenHash, Arc)> = bearer_tokens - .into_iter() - .map(|(actor, token)| (hash_bearer_token(&token), Arc::::from(actor))) - .collect(); - Self { - uri, - engine: Arc::new(db), - workload: Arc::new(workload), - bearer_tokens: Arc::from(bearer_tokens), - policy_engine: None, - } + let bearer_tokens = hash_bearer_tokens(bearer_tokens); + Self::build_single_mode(uri, db, bearer_tokens, None, Arc::new(workload)) } /// Install a `PolicyEngine` post-construction (MR-723). Used by @@ -267,10 +263,37 @@ impl AppState { /// alongside a permit-all policy — the existing `new_with_*` and /// `new_with_workload` constructors don't compose. Production /// callers should use `open_with_bearer_tokens_and_policy` which - /// installs the policy on both the HTTP state and the engine. - pub fn with_policy_engine(mut self, engine: PolicyEngine) -> Self { - self.policy_engine = Some(Arc::new(engine)); - self + /// also installs the policy on the engine. + /// + /// PR 4a: rebuilds the single-mode handle with the policy attached + /// on the HTTP-layer (`handle.policy`). The engine inside the handle + /// is reused as-is — engine-layer policy enforcement is NOT + /// reinstalled by this path (the old single-field `policy_engine` + /// API had the same semantics — engine-layer enforcement was only + /// applied by constructors that took `policy_engine: Option<...>` + /// at build time). Tests that depend on engine-layer enforcement + /// should use `open_with_bearer_tokens_and_policy` instead. + pub fn with_policy_engine(self, engine: PolicyEngine) -> Self { + let policy_arc: Arc = Arc::new(engine); + let existing = single_mode_handle(&self.registry) + .expect("with_policy_engine called on a non-single-mode AppState"); + let new_handle = Arc::new(GraphHandle { + key: existing.key.clone(), + uri: existing.uri.clone(), + engine: Arc::clone(&existing.engine), + policy: Some(policy_arc), + }); + let registry = Arc::new( + GraphRegistry::from_handles(vec![new_handle]) + .expect("rebuilt single-mode registry must accept one handle"), + ); + Self { + mode: self.mode, + registry, + workload: self.workload, + bearer_tokens: self.bearer_tokens, + server_policy: self.server_policy, + } } pub async fn open(uri: impl Into) -> Result { @@ -319,15 +342,108 @@ impl AppState { )) } - pub fn uri(&self) -> &str { - &self.uri + /// Single-mode shared construction: wraps the bare engine + per-graph + /// policy in a `GraphHandle` registered under the sentinel + /// `SINGLE_GRAPH_KEY_ID` key. Per-graph policy enforcement on the + /// engine (MR-722) is re-applied via `Omnigraph::with_policy`. + fn build_single_mode( + uri: String, + db: Omnigraph, + bearer_tokens: Arc<[(BearerTokenHash, Arc)]>, + policy_engine: Option>, + workload: Arc, + ) -> Self { + // Engine-layer policy gate (MR-722). With a per-graph policy + // installed, every `_as` writer on `Omnigraph` calls into the + // PolicyChecker. HTTP-layer `authorize_request` is the first + // gate; engine-layer is the redundant-but-correct backstop. + let db = if let Some(policy) = policy_engine.as_ref() { + let checker = Arc::clone(policy) as Arc; + db.with_policy(checker) + } else { + db + }; + let key = GraphKey::cluster( + GraphId::try_from(SINGLE_GRAPH_KEY_ID) + .expect("single-graph sentinel key must validate"), + ); + let handle = Arc::new(GraphHandle { + key, + uri: uri.clone(), + engine: Arc::new(db), + policy: policy_engine, + }); + let registry = Arc::new( + GraphRegistry::from_handles(vec![handle]) + .expect("single-mode registry construction is infallible"), + ); + Self { + mode: ServerMode::Single { uri }, + registry, + workload, + bearer_tokens, + server_policy: None, + } + } + + /// Multi-mode constructor — used by PR 5's startup loop. Operators + /// reach this by invoking `omnigraph-server --config omnigraph.yaml` + /// with a non-empty `graphs:` map. + /// + /// Caller supplies the already-opened `GraphHandle`s and (optionally) + /// the path to the source config file. `server_policy` is loaded + /// from `server.policy.file` if configured. + pub fn new_multi( + handles: Vec>, + bearer_tokens: Vec<(String, String)>, + server_policy: Option, + workload: workload::WorkloadController, + config_path: Option, + ) -> std::result::Result { + let bearer_tokens = hash_bearer_tokens(bearer_tokens); + let registry = Arc::new(GraphRegistry::from_handles(handles)?); + Ok(Self { + mode: ServerMode::Multi { config_path }, + registry, + workload: Arc::new(workload), + bearer_tokens, + server_policy: server_policy.map(Arc::new), + }) + } + + /// Topology accessor. Handlers don't typically inspect this — they + /// extract `Arc` via the routing middleware — but + /// `build_app` does to decide flat vs nested route mounting. + pub fn mode(&self) -> &ServerMode { + &self.mode + } + + /// The configured URI in single mode. Returns `None` in multi mode — + /// each graph has its own URI on `handle.uri` instead. + pub fn uri(&self) -> Option<&str> { + match &self.mode { + ServerMode::Single { uri } => Some(uri.as_str()), + ServerMode::Multi { .. } => None, + } + } + + pub fn registry(&self) -> &Arc { + &self.registry } fn requires_bearer_auth(&self) -> bool { - !self.bearer_tokens.is_empty() || self.policy_engine.is_some() + if !self.bearer_tokens.is_empty() { + return true; + } + if self.server_policy.is_some() { + return true; + } + // Any per-graph policy also requires auth — otherwise the + // policy gate would receive unauthenticated requests. + self.registry.list().iter().any(|h| h.policy.is_some()) } - fn authenticate_bearer_token(&self, provided_token: &str) -> Option> { + fn authenticate_bearer_token(&self, provided_token: &str) -> Option { // Hash the incoming token and compare against every stored digest in // constant time. Iterate all entries unconditionally so total work — // and therefore response timing — doesn't depend on which slot matches. @@ -338,11 +454,29 @@ impl AppState { matched = Some(Arc::clone(actor)); } } - matched + matched.map(ResolvedActor::cluster_static) } +} - fn policy_engine(&self) -> Option<&PolicyEngine> { - self.policy_engine.as_deref() +fn hash_bearer_tokens( + bearer_tokens: Vec<(String, String)>, +) -> Arc<[(BearerTokenHash, Arc)]> { + let tokens: Vec<(BearerTokenHash, Arc)> = bearer_tokens + .into_iter() + .map(|(actor, token)| (hash_bearer_token(&token), Arc::::from(actor))) + .collect(); + Arc::from(tokens) +} + +/// Look up the single-mode handle from the registry. Returns `None` +/// if the registry is empty or has multiple entries (the latter would +/// indicate a constructor bug). +fn single_mode_handle(registry: &GraphRegistry) -> Option> { + let list = registry.list(); + if list.len() == 1 { + list.into_iter().next() + } else { + None } } @@ -631,7 +765,14 @@ pub fn classify_server_runtime_state( } pub fn build_app(state: AppState) -> Router { - let protected = Router::new() + // The per-graph protected routes, identical in single + multi mode. + // Two middleware layers wrap them (outer first, inner last): + // 1. `require_bearer_auth` — extracts the bearer token and injects + // `ResolvedActor` (or rejects 401). + // 2. `resolve_graph_handle` — injects `Arc` based on + // the active mode (single: the only handle; multi: lookup by + // `{graph_id}` in the URI path). + let per_graph_protected = Router::new() .route("/snapshot", get(server_snapshot)) .route("/export", post(server_export)) .route("/read", post(server_read)) @@ -650,11 +791,24 @@ pub fn build_app(state: AppState) -> Router { .route("/branches/merge", post(server_branch_merge)) .route("/commits", get(server_commit_list)) .route("/commits/{commit_id}", get(server_commit_show)) + .route_layer(middleware::from_fn_with_state( + state.clone(), + resolve_graph_handle, + )) .route_layer(middleware::from_fn_with_state( state.clone(), require_bearer_auth, )); + // Mount the protected routes differently per mode: + // * Single → flat routes (legacy: `/snapshot`, `/read`, etc.) + // * Multi → nested under `/graphs/{graph_id}/...` + // Mode is inferred at startup; the same router code branches once. + let protected: Router = match state.mode() { + ServerMode::Single { .. } => per_graph_protected, + ServerMode::Multi { .. } => Router::new().nest("/graphs/{graph_id}", per_graph_protected), + }; + Router::new() .route("/healthz", get(server_health)) .route("/openapi.json", get(server_openapi)) @@ -785,11 +939,76 @@ async fn require_bearer_auth( let Some(actor) = state.authenticate_bearer_token(provided_token) else { return Err(ApiError::unauthorized("invalid bearer token")); }; - request.extensions_mut().insert(AuthenticatedActor(actor)); + request.extensions_mut().insert(actor); Ok(next.run(request).await) } +/// Routing middleware (MR-668 PR 4a). Resolves the active graph for the +/// request and injects `Arc` as an extension so handlers can +/// extract it via `Extension>`. +/// +/// **Single mode**: the registry has exactly one handle (keyed by the +/// `SINGLE_GRAPH_KEY_ID` sentinel). Routes are flat — every request +/// resolves to that single handle, regardless of the URI path. +/// +/// **Multi mode**: routes are nested under `/graphs/{graph_id}/...`. The +/// middleware extracts `{graph_id}` from the URI path and looks it up in +/// the registry. Returns 404 if the graph is not registered. +/// +/// The middleware fires AFTER `require_bearer_auth`, so the actor is +/// already in the request extensions (or auth was off entirely). +async fn resolve_graph_handle( + State(state): State, + mut request: Request, + next: Next, +) -> std::result::Result { + let handle = match &state.mode { + ServerMode::Single { .. } => single_mode_handle(&state.registry).ok_or_else(|| { + ApiError::internal( + "single-mode registry is empty or has multiple handles \ + (programmer error in AppState constructor)" + .to_string(), + ) + })?, + ServerMode::Multi { .. } => { + // Extract the {graph_id} path segment from `/graphs/{graph_id}/...`. + // The router only mounts the per-graph nest under that prefix, + // so any request reaching this middleware in Multi mode must + // have the prefix — but defense in depth still validates. + let path = request.uri().path(); + let graph_id_str = path + .strip_prefix("/graphs/") + .and_then(|rest| rest.split('/').next()) + .filter(|s| !s.is_empty()) + .ok_or_else(|| { + ApiError::bad_request( + "cluster route missing /graphs/{graph_id} prefix".to_string(), + ) + })?; + let graph_id = GraphId::try_from(graph_id_str.to_string()) + .map_err(|err| ApiError::bad_request(err.to_string()))?; + let key = GraphKey::cluster(graph_id.clone()); + match state.registry.get(&key) { + RegistryLookup::Ready(handle) => handle, + RegistryLookup::Gone => { + return Err(ApiError::not_found(format!( + "graph '{graph_id}' not found" + ))); + } + } + } + }; + + // Record the graph id on the current tracing span for per-request + // observability. Operators correlating logs across requests reach for + // this field; in single mode it's the sentinel `default`. + tracing::Span::current().record("graph_id", handle.key.graph_id.as_str()); + + request.extensions_mut().insert(handle); + Ok(next.run(request).await) +} + fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &PolicyDecision) { info!( actor_id = actor_id, @@ -802,12 +1021,24 @@ fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &Polic ); } +/// HTTP-layer Cedar policy gate. Two sources of the policy engine: +/// * Per-graph handler — passes `handle.policy.as_deref()` so the +/// graph's Cedar rules govern read/change/branch_*/schema_apply. +/// * Management handler (PR 6b, deferred for now) — passes +/// `state.server_policy.as_deref()` so server-level Cedar rules +/// govern `graph_create`/`graph_list`/`graph_delete`. +/// +/// The MR-731 invariant lives inside this function: actor identity is +/// overwritten from the resolved bearer match, never trusted from the +/// caller-built `PolicyRequest.actor_id`. See +/// `actor_id_resolves_from_bearer_token_ignoring_client_supplied_headers` +/// at `tests/server.rs:1114-1216`. fn authorize_request( - state: &AppState, - actor: Option<&AuthenticatedActor>, + actor: Option<&ResolvedActor>, + policy: Option<&PolicyEngine>, mut request: PolicyRequest, ) -> std::result::Result<(), ApiError> { - let Some(engine) = state.policy_engine() else { + let Some(engine) = policy else { // MR-723 default-deny path. We're here when no PolicyEngine is // installed. Two startup-validated shapes can reach this: // @@ -849,11 +1080,11 @@ fn authorize_request( // // Side effect: also prevents an empty-string default at any handler // call site from ever reaching the engine as a policy subject. - request.actor_id = actor.as_str().to_string(); + request.actor_id = actor.actor_id.as_ref().to_string(); let decision = engine .authorize(&request) .map_err(|err| ApiError::internal(format!("policy: {err}")))?; - log_policy_decision(actor.as_str(), &request, &decision); + log_policy_decision(actor.actor_id.as_ref(), &request, &decision); if decision.allowed { Ok(()) } else { @@ -880,18 +1111,19 @@ fn authorize_request( /// count) for every table on the branch. Defaults to `main` when `branch` is /// omitted. Read-only. async fn server_snapshot( - State(state): State, - actor: Option>, + State(_state): State, + Extension(handle): Extension>, + actor: Option>, Query(query): Query, ) -> std::result::Result, ApiError> { let branch = query.branch.unwrap_or_else(|| "main".to_string()); authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor .as_ref() - .map(|Extension(actor)| actor.as_str().to_string()) + .map(|Extension(actor)| actor.actor_id.as_ref().to_string()) .unwrap_or_default(), action: PolicyAction::Read, branch: Some(branch.clone()), @@ -899,7 +1131,7 @@ async fn server_snapshot( }, )?; let snapshot = { - let db = &state.engine; + let db = &handle.engine; db.snapshot_of(ReadTarget::branch(branch.as_str())) .await .map_err(ApiError::from_omni)? @@ -929,8 +1161,9 @@ async fn server_snapshot( /// match the parameters declared by the query. Returns rows as a JSON array /// plus a `columns` list. Read-only. async fn server_read( - State(state): State, - actor: Option>, + State(_state): State, + Extension(handle): Extension>, + actor: Option>, Json(request): Json, ) -> std::result::Result, ApiError> { if request.branch.is_some() && request.snapshot.is_some() { @@ -942,8 +1175,8 @@ async fn server_read( let target = read_target_from_request(request.branch, request.snapshot); let policy_branch = match &target { ReadTarget::Branch(branch) => Some(branch.clone()), - ReadTarget::Snapshot(_) if state.policy_engine().is_some() && actor.is_some() => { - let db = &state.engine; + ReadTarget::Snapshot(_) if handle.policy.is_some() && actor.is_some() => { + let db = &handle.engine; db.resolved_branch_of(target.clone()) .await .map(|branch| branch.or_else(|| Some("main".to_string()))) @@ -952,12 +1185,12 @@ async fn server_read( ReadTarget::Snapshot(_) => None, }; authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor .as_ref() - .map(|Extension(actor)| actor.as_str().to_string()) + .map(|Extension(actor)| actor.actor_id.as_ref().to_string()) .unwrap_or_default(), action: PolicyAction::Read, branch: policy_branch, @@ -971,7 +1204,7 @@ async fn server_read( .map_err(|err| ApiError::bad_request(err.to_string()))?; let result = { - let db = &state.engine; + let db = &handle.engine; db.query( target.clone(), &request.query_source, @@ -1005,25 +1238,26 @@ async fn server_read( /// streams the entire branch. Suitable for large exports — the response is /// streamed, not buffered. Read-only. async fn server_export( - State(state): State, - actor: Option>, + State(_state): State, + Extension(handle): Extension>, + actor: Option>, Json(request): Json, ) -> std::result::Result { let branch = request.branch.unwrap_or_else(|| "main".to_string()); authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor .as_ref() - .map(|Extension(actor)| actor.as_str().to_string()) + .map(|Extension(actor)| actor.actor_id.as_ref().to_string()) .unwrap_or_default(), action: PolicyAction::Export, branch: Some(branch.clone()), target_branch: None, }, )?; - let engine = Arc::clone(&state.engine); + let engine = Arc::clone(&handle.engine); let type_names = request.type_names.clone(); let table_keys = request.table_keys.clone(); let (tx, rx) = mpsc::unbounded_channel::>(); @@ -1073,18 +1307,19 @@ async fn server_export( /// mutations may still acquire locks briefly. Returns 409 on merge conflict. async fn server_change( State(state): State, - actor: Option>, + Extension(handle): Extension>, + actor: Option>, Json(request): Json, ) -> std::result::Result, ApiError> { let branch = request.branch.unwrap_or_else(|| "main".to_string()); let actor_arc = actor .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.0)) + .map(|Extension(actor)| Arc::clone(&actor.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); - let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); + let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref()); authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor_id.map(str::to_string).unwrap_or_default(), action: PolicyAction::Change, @@ -1113,7 +1348,7 @@ async fn server_change( .map_err(|err| ApiError::bad_request(err.to_string()))?; let result = { - let db = &state.engine; + let db = &handle.engine; db.mutate_as( &branch, &request.query_source, @@ -1151,16 +1386,17 @@ async fn server_change( /// Useful for clients that want to introspect available types and tables /// before constructing GQ queries. Read-only. async fn server_schema_get( - State(state): State, - actor: Option>, + State(_state): State, + Extension(handle): Extension>, + actor: Option>, ) -> std::result::Result, ApiError> { authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor .as_ref() - .map(|Extension(actor)| actor.as_str().to_string()) + .map(|Extension(actor)| actor.actor_id.as_ref().to_string()) .unwrap_or_default(), action: PolicyAction::Read, branch: None, @@ -1168,7 +1404,7 @@ async fn server_schema_get( }, )?; let schema_source = { - let db = &state.engine; + let db = &handle.engine; db.schema_source().to_string() }; Ok(Json(SchemaOutput { schema_source })) @@ -1197,17 +1433,18 @@ async fn server_schema_get( /// false the diff was unsupported and no changes were made. async fn server_schema_apply( State(state): State, - actor: Option>, + Extension(handle): Extension>, + actor: Option>, Json(request): Json, ) -> std::result::Result, ApiError> { let actor_arc = actor .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.0)) + .map(|Extension(actor)| Arc::clone(&actor.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); - let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); + let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref()); authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor_id.map(str::to_string).unwrap_or_default(), action: PolicyAction::SchemaApply, @@ -1221,7 +1458,7 @@ async fn server_schema_apply( .try_admit(&actor_arc, est_bytes) .map_err(ApiError::from_workload_reject)?; let result = { - let db = &state.engine; + let db = &handle.engine; // Engine-layer policy enforcement (MR-722): pass the resolved // actor through so apply_schema_as can call enforce() with the // authoritative identity. With a policy installed in AppState, @@ -1238,7 +1475,7 @@ async fn server_schema_apply( .await .map_err(ApiError::from_omni)? }; - Ok(Json(schema_apply_output(state.uri(), result))) + Ok(Json(schema_apply_output(handle.uri.as_str(), result))) } #[utoipa::path( @@ -1265,7 +1502,8 @@ async fn server_schema_apply( /// `overwrite` or when ingest produces conflicting writes. async fn server_ingest( State(state): State, - actor: Option>, + Extension(handle): Extension>, + actor: Option>, Json(request): Json, ) -> std::result::Result, ApiError> { let branch = request.branch.unwrap_or_else(|| "main".to_string()); @@ -1273,12 +1511,12 @@ async fn server_ingest( let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge); let actor_arc = actor .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.0)) + .map(|Extension(actor)| Arc::clone(&actor.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); - let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); + let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref()); let branch_exists = { - let db = &state.engine; + let db = &handle.engine; db.branch_list() .await .map_err(ApiError::from_omni)? @@ -1288,8 +1526,8 @@ async fn server_ingest( if !branch_exists { authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor_id.map(str::to_string).unwrap_or_default(), action: PolicyAction::BranchCreate, @@ -1299,8 +1537,8 @@ async fn server_ingest( )?; } authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor_id.map(str::to_string).unwrap_or_default(), action: PolicyAction::Change, @@ -1315,14 +1553,14 @@ async fn server_ingest( .map_err(ApiError::from_workload_reject)?; let result = { - let db = &state.engine; + let db = &handle.engine; db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id) .await .map_err(ApiError::from_omni)? }; Ok(Json(ingest_output( - state.uri(), + handle.uri.as_str(), &result, actor_id.map(str::to_string), ))) @@ -1344,16 +1582,17 @@ async fn server_ingest( /// /// Returns branch names sorted alphabetically. Read-only. async fn server_branch_list( - State(state): State, - actor: Option>, + State(_state): State, + Extension(handle): Extension>, + actor: Option>, ) -> std::result::Result, ApiError> { authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor .as_ref() - .map(|Extension(actor)| actor.as_str().to_string()) + .map(|Extension(actor)| actor.actor_id.as_ref().to_string()) .unwrap_or_default(), action: PolicyAction::Read, branch: None, @@ -1361,7 +1600,7 @@ async fn server_branch_list( }, )?; let mut branches = { - let db = &state.engine; + let db = &handle.engine; db.branch_list().await.map_err(ApiError::from_omni)? }; branches.sort(); @@ -1391,21 +1630,22 @@ async fn server_branch_list( /// already exists. async fn server_branch_create( State(state): State, - actor: Option>, + Extension(handle): Extension>, + actor: Option>, Json(request): Json, ) -> std::result::Result, ApiError> { let from = request.from.unwrap_or_else(|| "main".to_string()); let actor_arc = actor .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.0)) + .map(|Extension(actor)| Arc::clone(&actor.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor .as_ref() - .map(|Extension(actor)| actor.as_str().to_string()) + .map(|Extension(actor)| actor.actor_id.as_ref().to_string()) .unwrap_or_default(), action: PolicyAction::BranchCreate, branch: Some(from.clone()), @@ -1420,20 +1660,20 @@ async fn server_branch_create( .try_admit(&actor_arc, 256) .map_err(ApiError::from_workload_reject)?; { - let db = &state.engine; + let db = &handle.engine; db.branch_create_from_as( ReadTarget::branch(&from), &request.name, - actor.as_ref().map(|Extension(a)| a.as_str()), + actor.as_ref().map(|Extension(a)| a.actor_id.as_ref()), ) .await .map_err(ApiError::from_omni)?; } Ok(Json(BranchCreateOutput { - uri: state.uri().to_string(), + uri: handle.uri.clone(), from, name: request.name, - actor_id: actor.map(|Extension(actor)| actor.as_str().to_string()), + actor_id: actor.map(|Extension(actor)| actor.actor_id.as_ref().to_string()), })) } @@ -1461,17 +1701,18 @@ async fn server_branch_create( /// exist. async fn server_branch_delete( State(state): State, - actor: Option>, + Extension(handle): Extension>, + actor: Option>, Path(branch): Path, ) -> std::result::Result, ApiError> { let actor_arc = actor .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.0)) + .map(|Extension(actor)| Arc::clone(&actor.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); - let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); + let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref()); authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor_id.map(str::to_string).unwrap_or_default(), action: PolicyAction::BranchDelete, @@ -1485,13 +1726,13 @@ async fn server_branch_delete( .try_admit(&actor_arc, 256) .map_err(ApiError::from_workload_reject)?; { - let db = &state.engine; + let db = &handle.engine; db.branch_delete_as(&branch, actor_id) .await .map_err(ApiError::from_omni)?; } Ok(Json(BranchDeleteOutput { - uri: state.uri().to_string(), + uri: handle.uri.clone(), name: branch, actor_id: actor_id.map(str::to_string), })) @@ -1521,18 +1762,19 @@ async fn server_branch_delete( /// unchanged in that case. **Destructive** to `target` on success. async fn server_branch_merge( State(state): State, - actor: Option>, + Extension(handle): Extension>, + actor: Option>, Json(request): Json, ) -> std::result::Result, ApiError> { let target = request.target.unwrap_or_else(|| "main".to_string()); let actor_arc = actor .as_ref() - .map(|Extension(actor)| Arc::clone(&actor.0)) + .map(|Extension(actor)| Arc::clone(&actor.actor_id)) .unwrap_or_else(|| Arc::::from("anonymous")); - let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str()); + let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref()); authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor_id.map(str::to_string).unwrap_or_default(), action: PolicyAction::BranchMerge, @@ -1548,7 +1790,7 @@ async fn server_branch_merge( .try_admit(&actor_arc, 256) .map_err(ApiError::from_workload_reject)?; let outcome = { - let db = &state.engine; + let db = &handle.engine; db.branch_merge_as(&request.source, &target, actor_id) .await .map_err(ApiError::from_omni)? @@ -1579,17 +1821,18 @@ async fn server_branch_merge( /// Filter by `branch` to get the commits on a single branch (most recent /// first); omit to list across all branches. Read-only. async fn server_commit_list( - State(state): State, - actor: Option>, + State(_state): State, + Extension(handle): Extension>, + actor: Option>, Query(query): Query, ) -> std::result::Result, ApiError> { authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor .as_ref() - .map(|Extension(actor)| actor.as_str().to_string()) + .map(|Extension(actor)| actor.actor_id.as_ref().to_string()) .unwrap_or_default(), action: PolicyAction::Read, branch: query.branch.clone(), @@ -1597,7 +1840,7 @@ async fn server_commit_list( }, )?; let commits = { - let db = &state.engine; + let db = &handle.engine; db.list_commits(query.branch.as_deref()) .await .map_err(ApiError::from_omni)? @@ -1628,17 +1871,18 @@ async fn server_commit_list( /// Returns the commit's manifest version, parent commit(s), and creation /// metadata. Read-only. async fn server_commit_show( - State(state): State, - actor: Option>, + State(_state): State, + Extension(handle): Extension>, + actor: Option>, Path(commit_id): Path, ) -> std::result::Result, ApiError> { authorize_request( - &state, actor.as_ref().map(|Extension(actor)| actor), + handle.policy.as_deref(), PolicyRequest { actor_id: actor .as_ref() - .map(|Extension(actor)| actor.as_str().to_string()) + .map(|Extension(actor)| actor.actor_id.as_ref().to_string()) .unwrap_or_default(), action: PolicyAction::Read, branch: None, @@ -1646,7 +1890,7 @@ async fn server_commit_show( }, )?; let commit = { - let db = &state.engine; + let db = &handle.engine; db.get_commit(&commit_id) .await .map_err(ApiError::from_omni)?