mr-668: router restructure + handler refactor for multi-graph (PR 4a/10)

PR 4a of the MR-668 multi-graph server work. The heaviest single PR —
rewires every handler to extract `Arc<GraphHandle>` from a routing
middleware, replaces `AuthenticatedActor(Arc<str>)` with `ResolvedActor`
everywhere, and adds the `ServerMode` discriminator.

Behavior changes:
- **Single mode** (legacy `omnigraph-server <URI>`): flat routes
  (`/snapshot`, `/read`, `/branches`, …) continue to work exactly as
  v0.6.0. Internally, the registry holds a single handle keyed by the
  sentinel `SINGLE_GRAPH_KEY_ID = "default"`; routing middleware injects
  that handle on every request. No HTTP-visible change.
- **Multi mode** (new): routes nest under `/graphs/{graph_id}/...`.
  Routing middleware extracts the graph id from the path, looks it up
  in the registry, and injects the handle. 404 if not found.
  (Multi-mode startup itself lands in PR 5; this PR provides the
  router-side wiring.)

AppState refactor:
- `engine: Arc<Omnigraph>` and `policy_engine: Option<Arc<PolicyEngine>>`
  fields removed — both now live inside `GraphHandle` in the registry.
- `mode: ServerMode { Single { uri } | Multi { config_path } }` added.
- `registry: Arc<GraphRegistry>` added.
- `server_policy: Option<Arc<PolicyEngine>>` added (placeholder for
  management endpoints in PR 6b; unused today).
- Existing constructors (`new`, `new_with_bearer_token{s,_and_policy}`,
  `new_with_workload`, `open*`) build a single-mode AppState
  internally and remain source-compatible. Tests that constructed
  AppState via these constructors continue to work.
- `with_policy_engine` post-construction setter — rebuilds the
  single-mode handle with the policy attached. Engine-layer
  enforcement is NOT reinstalled (matches the old single-field
  semantics; `open_with_bearer_tokens_and_policy` is the path that
  installs both layers).
- `new_multi` constructor added for PR 5's startup loop.
- `uri()` now returns `Option<&str>` (Some in single, None in multi).

Routing middleware:
- `resolve_graph_handle` injects `Arc<GraphHandle>` as a request
  extension. Mode-aware: single returns the only handle; multi parses
  `/graphs/{graph_id}/...` from the URI. Returns 404 in multi mode
  when the graph id is unregistered. Records `graph_id` on the
  current tracing span.
- `require_bearer_auth` updated to insert `ResolvedActor` (was
  `AuthenticatedActor`).

Handler refactor — every protected handler:
- Gains `Extension(handle): Extension<Arc<GraphHandle>>` param.
- Replaces `state.engine` → `handle.engine`.
- Replaces `state.policy_engine()` → `handle.policy.as_deref()`.
- Replaces `state.uri()` → `handle.uri.as_str()` (or `.clone()`
  where String is needed).
- Replaces `Arc::clone(&state.engine)` → `Arc::clone(&handle.engine)`
  (the spawn-and-clone pattern in `server_export` — proof that a
  long-running export survives the registry being mutated later).

authorize_request signature:
- Was: `(state: &AppState, actor: Option<&AuthenticatedActor>, request: PolicyRequest)`.
- Now: `(actor: Option<&ResolvedActor>, policy: Option<&PolicyEngine>, request: PolicyRequest)`.
- Per-graph callers pass `handle.policy.as_deref()`. The (future PR 6b)
  management endpoints will pass `state.server_policy.as_deref()`.

MR-731 invariant preserved:
- The single chokepoint `request.actor_id = actor.actor_id.as_ref().to_string()`
  inside `authorize_request` still overwrites any client-supplied
  actor identity. Regression test
  `actor_id_resolves_from_bearer_token_ignoring_client_supplied_headers`
  at `tests/server.rs:1114-1216` passes unchanged.

Tests: 0 new (the registry race tests in PR 3 already cover the
data structure; this PR exercises them indirectly via the existing
test suite). 74 lib + 57 server integration + 60 openapi = 191 tests
green. Clippy clean.

LOC: +397 insertions, -153 deletions in `crates/omnigraph-server/src/lib.rs`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-25 19:45:57 +02:00
parent 662cf67ff8
commit 57f5f191ab
No known key found for this signature in database

View file

@ -132,23 +132,58 @@ pub struct ServerConfig {
pub allow_unauthenticated: bool,
}
#[derive(Clone)]
pub struct AppState {
uri: String,
/// PR 2 (MR-686): the engine is now `Arc<Omnigraph>` — no global
/// write lock. Concurrent handlers call `&self` engine APIs
/// directly. Per-(table, branch) write queues inside the engine
/// serialize same-key writers; per-actor admission control on
/// `workload` isolates noisy actors.
engine: Arc<Omnigraph>,
/// Per-actor admission control. See `workload::WorkloadController`.
workload: Arc<workload::WorkloadController>,
bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
policy_engine: Option<Arc<PolicyEngine>>,
/// Server runtime topology. Single mode = legacy `omnigraph-server <URI>`
/// invocation, one graph, flat HTTP routes. Multi mode = `--config
/// omnigraph.yaml` with a `graphs:` map, N graphs, cluster routes
/// (`/graphs/{graph_id}/...`). Mode is determined at startup by
/// `load_server_settings`.
///
/// Both modes share the same handler bodies — the routing middleware
/// (`resolve_graph_handle`) injects `Arc<GraphHandle>` as a request
/// extension so handlers never see the mode discriminator directly.
#[derive(Clone, Debug)]
pub enum ServerMode {
/// Single-graph invocation. The `uri` is the only graph's URI.
/// Backward compatible with v0.6.0 deployments.
Single { uri: String },
/// Multi-graph invocation (MR-668). `config_path` is the
/// `omnigraph.yaml` the server reads at startup and (in PR 7,
/// deferred) would rewrite on `POST /graphs`. With DELETE deferred,
/// the current scope does not rewrite the config file — `POST`
/// will (PR 7).
Multi { config_path: Option<PathBuf> },
}
#[derive(Debug, Clone)]
struct AuthenticatedActor(Arc<str>);
/// Sentinel `GraphId` for single-graph mode. The single graph is
/// registered under this key in the registry; the routing middleware
/// uses it to inject the handle on every flat-route request. The
/// value is operator-invisible — it does NOT need to be unique with
/// any user-facing graph id since single-mode never serves cluster
/// routes.
pub(crate) const SINGLE_GRAPH_KEY_ID: &str = "default";
#[derive(Clone)]
pub struct AppState {
/// Topology + (single mode only) the single graph's URI for
/// startup wiring. The registry below is the runtime source of truth.
mode: ServerMode,
/// PR 2 (MR-686) + PR 4a (MR-668): the engine and per-graph policy
/// now live inside `GraphHandle`s in the registry. Reads via
/// `ArcSwap` are lock-free; mutations (currently only `insert`)
/// serialize through the registry's internal mutex.
registry: Arc<GraphRegistry>,
/// Per-actor admission control. Process-wide (not per-graph) —
/// see MR-668 decision Q6.
workload: Arc<workload::WorkloadController>,
bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
/// Server-level Cedar policy. Used by management endpoints (`POST
/// /graphs`, `GET /graphs`) which act on the registry resource,
/// not on a per-graph resource. Loaded from `server.policy.file`
/// in `omnigraph.yaml`. `None` outside multi mode and when no
/// server policy is configured. Per-graph policies live on each
/// `GraphHandle.policy`.
server_policy: Option<Arc<PolicyEngine>>,
}
struct ExportStreamWriter {
sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
@ -167,12 +202,6 @@ impl Write for ExportStreamWriter {
}
}
impl AuthenticatedActor {
fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug)]
pub struct ApiError {
status: StatusCode,
@ -209,34 +238,10 @@ impl AppState {
bearer_tokens: Vec<(String, String)>,
policy_engine: Option<PolicyEngine>,
) -> Self {
let bearer_tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
.into_iter()
.map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
.collect();
let policy_engine: Option<Arc<PolicyEngine>> = policy_engine.map(Arc::new);
// MR-722 chassis: inject the policy checker into the engine so
// `Omnigraph::apply_schema_as` (and PR #3's fan-out of the
// remaining writers) gates at engine-layer too. HTTP-layer
// `authorize_request` still fires first; the engine-layer gate
// is the redundant-but-correct backstop, plus the only path
// that protects SDK / embedded callers. PR #3 removes the HTTP
// redundancy once we're confident the engine gate covers it.
let db = if let Some(engine) = policy_engine.as_ref() {
// Unsizing coercion: Arc<PolicyEngine> → Arc<dyn PolicyChecker>.
// Needs the explicit `as` cast — Rust 2024 doesn't infer it through
// `Arc::clone`.
let checker = Arc::clone(engine) as Arc<dyn omnigraph_policy::PolicyChecker>;
db.with_policy(checker)
} else {
db
};
Self {
uri,
engine: Arc::new(db),
workload: Arc::new(workload::WorkloadController::from_env()),
bearer_tokens: Arc::from(bearer_tokens),
policy_engine,
}
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let per_graph_policy: Option<Arc<PolicyEngine>> = policy_engine.map(Arc::new);
let workload = Arc::new(workload::WorkloadController::from_env());
Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, workload)
}
/// Construct with a caller-provided [`workload::WorkloadController`].
@ -249,17 +254,8 @@ impl AppState {
bearer_tokens: Vec<(String, String)>,
workload: workload::WorkloadController,
) -> Self {
let bearer_tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
.into_iter()
.map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
.collect();
Self {
uri,
engine: Arc::new(db),
workload: Arc::new(workload),
bearer_tokens: Arc::from(bearer_tokens),
policy_engine: None,
}
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
Self::build_single_mode(uri, db, bearer_tokens, None, Arc::new(workload))
}
/// Install a `PolicyEngine` post-construction (MR-723). Used by
@ -267,10 +263,37 @@ impl AppState {
/// alongside a permit-all policy — the existing `new_with_*` and
/// `new_with_workload` constructors don't compose. Production
/// callers should use `open_with_bearer_tokens_and_policy` which
/// installs the policy on both the HTTP state and the engine.
pub fn with_policy_engine(mut self, engine: PolicyEngine) -> Self {
self.policy_engine = Some(Arc::new(engine));
self
/// also installs the policy on the engine.
///
/// PR 4a: rebuilds the single-mode handle with the policy attached
/// on the HTTP-layer (`handle.policy`). The engine inside the handle
/// is reused as-is — engine-layer policy enforcement is NOT
/// reinstalled by this path (the old single-field `policy_engine`
/// API had the same semantics — engine-layer enforcement was only
/// applied by constructors that took `policy_engine: Option<...>`
/// at build time). Tests that depend on engine-layer enforcement
/// should use `open_with_bearer_tokens_and_policy` instead.
pub fn with_policy_engine(self, engine: PolicyEngine) -> Self {
let policy_arc: Arc<PolicyEngine> = Arc::new(engine);
let existing = single_mode_handle(&self.registry)
.expect("with_policy_engine called on a non-single-mode AppState");
let new_handle = Arc::new(GraphHandle {
key: existing.key.clone(),
uri: existing.uri.clone(),
engine: Arc::clone(&existing.engine),
policy: Some(policy_arc),
});
let registry = Arc::new(
GraphRegistry::from_handles(vec![new_handle])
.expect("rebuilt single-mode registry must accept one handle"),
);
Self {
mode: self.mode,
registry,
workload: self.workload,
bearer_tokens: self.bearer_tokens,
server_policy: self.server_policy,
}
}
pub async fn open(uri: impl Into<String>) -> Result<Self> {
@ -319,15 +342,108 @@ impl AppState {
))
}
pub fn uri(&self) -> &str {
&self.uri
/// Single-mode shared construction: wraps the bare engine + per-graph
/// policy in a `GraphHandle` registered under the sentinel
/// `SINGLE_GRAPH_KEY_ID` key. Per-graph policy enforcement on the
/// engine (MR-722) is re-applied via `Omnigraph::with_policy`.
fn build_single_mode(
uri: String,
db: Omnigraph,
bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
policy_engine: Option<Arc<PolicyEngine>>,
workload: Arc<workload::WorkloadController>,
) -> Self {
// Engine-layer policy gate (MR-722). With a per-graph policy
// installed, every `_as` writer on `Omnigraph` calls into the
// PolicyChecker. HTTP-layer `authorize_request` is the first
// gate; engine-layer is the redundant-but-correct backstop.
let db = if let Some(policy) = policy_engine.as_ref() {
let checker = Arc::clone(policy) as Arc<dyn omnigraph_policy::PolicyChecker>;
db.with_policy(checker)
} else {
db
};
let key = GraphKey::cluster(
GraphId::try_from(SINGLE_GRAPH_KEY_ID)
.expect("single-graph sentinel key must validate"),
);
let handle = Arc::new(GraphHandle {
key,
uri: uri.clone(),
engine: Arc::new(db),
policy: policy_engine,
});
let registry = Arc::new(
GraphRegistry::from_handles(vec![handle])
.expect("single-mode registry construction is infallible"),
);
Self {
mode: ServerMode::Single { uri },
registry,
workload,
bearer_tokens,
server_policy: None,
}
}
/// Multi-mode constructor — used by PR 5's startup loop. Operators
/// reach this by invoking `omnigraph-server --config omnigraph.yaml`
/// with a non-empty `graphs:` map.
///
/// Caller supplies the already-opened `GraphHandle`s and (optionally)
/// the path to the source config file. `server_policy` is loaded
/// from `server.policy.file` if configured.
pub fn new_multi(
handles: Vec<Arc<GraphHandle>>,
bearer_tokens: Vec<(String, String)>,
server_policy: Option<PolicyEngine>,
workload: workload::WorkloadController,
config_path: Option<PathBuf>,
) -> std::result::Result<Self, InsertError> {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let registry = Arc::new(GraphRegistry::from_handles(handles)?);
Ok(Self {
mode: ServerMode::Multi { config_path },
registry,
workload: Arc::new(workload),
bearer_tokens,
server_policy: server_policy.map(Arc::new),
})
}
/// Topology accessor. Handlers don't typically inspect this — they
/// extract `Arc<GraphHandle>` via the routing middleware — but
/// `build_app` does to decide flat vs nested route mounting.
pub fn mode(&self) -> &ServerMode {
&self.mode
}
/// The configured URI in single mode. Returns `None` in multi mode —
/// each graph has its own URI on `handle.uri` instead.
pub fn uri(&self) -> Option<&str> {
match &self.mode {
ServerMode::Single { uri } => Some(uri.as_str()),
ServerMode::Multi { .. } => None,
}
}
pub fn registry(&self) -> &Arc<GraphRegistry> {
&self.registry
}
fn requires_bearer_auth(&self) -> bool {
!self.bearer_tokens.is_empty() || self.policy_engine.is_some()
if !self.bearer_tokens.is_empty() {
return true;
}
if self.server_policy.is_some() {
return true;
}
// Any per-graph policy also requires auth — otherwise the
// policy gate would receive unauthenticated requests.
self.registry.list().iter().any(|h| h.policy.is_some())
}
fn authenticate_bearer_token(&self, provided_token: &str) -> Option<Arc<str>> {
fn authenticate_bearer_token(&self, provided_token: &str) -> Option<ResolvedActor> {
// Hash the incoming token and compare against every stored digest in
// constant time. Iterate all entries unconditionally so total work —
// and therefore response timing — doesn't depend on which slot matches.
@ -338,11 +454,29 @@ impl AppState {
matched = Some(Arc::clone(actor));
}
}
matched
matched.map(ResolvedActor::cluster_static)
}
}
fn policy_engine(&self) -> Option<&PolicyEngine> {
self.policy_engine.as_deref()
fn hash_bearer_tokens(
bearer_tokens: Vec<(String, String)>,
) -> Arc<[(BearerTokenHash, Arc<str>)]> {
let tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
.into_iter()
.map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
.collect();
Arc::from(tokens)
}
/// Look up the single-mode handle from the registry. Returns `None`
/// if the registry is empty or has multiple entries (the latter would
/// indicate a constructor bug).
fn single_mode_handle(registry: &GraphRegistry) -> Option<Arc<GraphHandle>> {
let list = registry.list();
if list.len() == 1 {
list.into_iter().next()
} else {
None
}
}
@ -631,7 +765,14 @@ pub fn classify_server_runtime_state(
}
pub fn build_app(state: AppState) -> Router {
let protected = Router::new()
// The per-graph protected routes, identical in single + multi mode.
// Two middleware layers wrap them (outer first, inner last):
// 1. `require_bearer_auth` — extracts the bearer token and injects
// `ResolvedActor` (or rejects 401).
// 2. `resolve_graph_handle` — injects `Arc<GraphHandle>` based on
// the active mode (single: the only handle; multi: lookup by
// `{graph_id}` in the URI path).
let per_graph_protected = Router::new()
.route("/snapshot", get(server_snapshot))
.route("/export", post(server_export))
.route("/read", post(server_read))
@ -650,11 +791,24 @@ pub fn build_app(state: AppState) -> Router {
.route("/branches/merge", post(server_branch_merge))
.route("/commits", get(server_commit_list))
.route("/commits/{commit_id}", get(server_commit_show))
.route_layer(middleware::from_fn_with_state(
state.clone(),
resolve_graph_handle,
))
.route_layer(middleware::from_fn_with_state(
state.clone(),
require_bearer_auth,
));
// Mount the protected routes differently per mode:
// * Single → flat routes (legacy: `/snapshot`, `/read`, etc.)
// * Multi → nested under `/graphs/{graph_id}/...`
// Mode is inferred at startup; the same router code branches once.
let protected: Router<AppState> = match state.mode() {
ServerMode::Single { .. } => per_graph_protected,
ServerMode::Multi { .. } => Router::new().nest("/graphs/{graph_id}", per_graph_protected),
};
Router::new()
.route("/healthz", get(server_health))
.route("/openapi.json", get(server_openapi))
@ -785,11 +939,76 @@ async fn require_bearer_auth(
let Some(actor) = state.authenticate_bearer_token(provided_token) else {
return Err(ApiError::unauthorized("invalid bearer token"));
};
request.extensions_mut().insert(AuthenticatedActor(actor));
request.extensions_mut().insert(actor);
Ok(next.run(request).await)
}
/// Routing middleware (MR-668 PR 4a). Resolves the active graph for the
/// request and injects `Arc<GraphHandle>` as an extension so handlers can
/// extract it via `Extension<Arc<GraphHandle>>`.
///
/// **Single mode**: the registry has exactly one handle (keyed by the
/// `SINGLE_GRAPH_KEY_ID` sentinel). Routes are flat — every request
/// resolves to that single handle, regardless of the URI path.
///
/// **Multi mode**: routes are nested under `/graphs/{graph_id}/...`. The
/// middleware extracts `{graph_id}` from the URI path and looks it up in
/// the registry. Returns 404 if the graph is not registered.
///
/// The middleware fires AFTER `require_bearer_auth`, so the actor is
/// already in the request extensions (or auth was off entirely).
async fn resolve_graph_handle(
State(state): State<AppState>,
mut request: Request,
next: Next,
) -> std::result::Result<Response, ApiError> {
let handle = match &state.mode {
ServerMode::Single { .. } => single_mode_handle(&state.registry).ok_or_else(|| {
ApiError::internal(
"single-mode registry is empty or has multiple handles \
(programmer error in AppState constructor)"
.to_string(),
)
})?,
ServerMode::Multi { .. } => {
// Extract the {graph_id} path segment from `/graphs/{graph_id}/...`.
// The router only mounts the per-graph nest under that prefix,
// so any request reaching this middleware in Multi mode must
// have the prefix — but defense in depth still validates.
let path = request.uri().path();
let graph_id_str = path
.strip_prefix("/graphs/")
.and_then(|rest| rest.split('/').next())
.filter(|s| !s.is_empty())
.ok_or_else(|| {
ApiError::bad_request(
"cluster route missing /graphs/{graph_id} prefix".to_string(),
)
})?;
let graph_id = GraphId::try_from(graph_id_str.to_string())
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let key = GraphKey::cluster(graph_id.clone());
match state.registry.get(&key) {
RegistryLookup::Ready(handle) => handle,
RegistryLookup::Gone => {
return Err(ApiError::not_found(format!(
"graph '{graph_id}' not found"
)));
}
}
}
};
// Record the graph id on the current tracing span for per-request
// observability. Operators correlating logs across requests reach for
// this field; in single mode it's the sentinel `default`.
tracing::Span::current().record("graph_id", handle.key.graph_id.as_str());
request.extensions_mut().insert(handle);
Ok(next.run(request).await)
}
fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &PolicyDecision) {
info!(
actor_id = actor_id,
@ -802,12 +1021,24 @@ fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &Polic
);
}
/// HTTP-layer Cedar policy gate. Two sources of the policy engine:
/// * Per-graph handler — passes `handle.policy.as_deref()` so the
/// graph's Cedar rules govern read/change/branch_*/schema_apply.
/// * Management handler (PR 6b, deferred for now) — passes
/// `state.server_policy.as_deref()` so server-level Cedar rules
/// govern `graph_create`/`graph_list`/`graph_delete`.
///
/// The MR-731 invariant lives inside this function: actor identity is
/// overwritten from the resolved bearer match, never trusted from the
/// caller-built `PolicyRequest.actor_id`. See
/// `actor_id_resolves_from_bearer_token_ignoring_client_supplied_headers`
/// at `tests/server.rs:1114-1216`.
fn authorize_request(
state: &AppState,
actor: Option<&AuthenticatedActor>,
actor: Option<&ResolvedActor>,
policy: Option<&PolicyEngine>,
mut request: PolicyRequest,
) -> std::result::Result<(), ApiError> {
let Some(engine) = state.policy_engine() else {
let Some(engine) = policy else {
// MR-723 default-deny path. We're here when no PolicyEngine is
// installed. Two startup-validated shapes can reach this:
//
@ -849,11 +1080,11 @@ fn authorize_request(
//
// Side effect: also prevents an empty-string default at any handler
// call site from ever reaching the engine as a policy subject.
request.actor_id = actor.as_str().to_string();
request.actor_id = actor.actor_id.as_ref().to_string();
let decision = engine
.authorize(&request)
.map_err(|err| ApiError::internal(format!("policy: {err}")))?;
log_policy_decision(actor.as_str(), &request, &decision);
log_policy_decision(actor.actor_id.as_ref(), &request, &decision);
if decision.allowed {
Ok(())
} else {
@ -880,18 +1111,19 @@ fn authorize_request(
/// count) for every table on the branch. Defaults to `main` when `branch` is
/// omitted. Read-only.
async fn server_snapshot(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
State(_state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Query(query): Query<SnapshotQuery>,
) -> std::result::Result<Json<api::SnapshotOutput>, ApiError> {
let branch = query.branch.unwrap_or_else(|| "main".to_string());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.map(|Extension(actor)| actor.actor_id.as_ref().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: Some(branch.clone()),
@ -899,7 +1131,7 @@ async fn server_snapshot(
},
)?;
let snapshot = {
let db = &state.engine;
let db = &handle.engine;
db.snapshot_of(ReadTarget::branch(branch.as_str()))
.await
.map_err(ApiError::from_omni)?
@ -929,8 +1161,9 @@ async fn server_snapshot(
/// match the parameters declared by the query. Returns rows as a JSON array
/// plus a `columns` list. Read-only.
async fn server_read(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
State(_state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<ReadRequest>,
) -> std::result::Result<Json<ReadOutput>, ApiError> {
if request.branch.is_some() && request.snapshot.is_some() {
@ -942,8 +1175,8 @@ async fn server_read(
let target = read_target_from_request(request.branch, request.snapshot);
let policy_branch = match &target {
ReadTarget::Branch(branch) => Some(branch.clone()),
ReadTarget::Snapshot(_) if state.policy_engine().is_some() && actor.is_some() => {
let db = &state.engine;
ReadTarget::Snapshot(_) if handle.policy.is_some() && actor.is_some() => {
let db = &handle.engine;
db.resolved_branch_of(target.clone())
.await
.map(|branch| branch.or_else(|| Some("main".to_string())))
@ -952,12 +1185,12 @@ async fn server_read(
ReadTarget::Snapshot(_) => None,
};
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.map(|Extension(actor)| actor.actor_id.as_ref().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: policy_branch,
@ -971,7 +1204,7 @@ async fn server_read(
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let result = {
let db = &state.engine;
let db = &handle.engine;
db.query(
target.clone(),
&request.query_source,
@ -1005,25 +1238,26 @@ async fn server_read(
/// streams the entire branch. Suitable for large exports — the response is
/// streamed, not buffered. Read-only.
async fn server_export(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
State(_state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<ExportRequest>,
) -> std::result::Result<Response, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.map(|Extension(actor)| actor.actor_id.as_ref().to_string())
.unwrap_or_default(),
action: PolicyAction::Export,
branch: Some(branch.clone()),
target_branch: None,
},
)?;
let engine = Arc::clone(&state.engine);
let engine = Arc::clone(&handle.engine);
let type_names = request.type_names.clone();
let table_keys = request.table_keys.clone();
let (tx, rx) = mpsc::unbounded_channel::<std::result::Result<Bytes, io::Error>>();
@ -1073,18 +1307,19 @@ async fn server_export(
/// mutations may still acquire locks briefly. Returns 409 on merge conflict.
async fn server_change(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<ChangeRequest>,
) -> std::result::Result<Json<ChangeOutput>, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::Change,
@ -1113,7 +1348,7 @@ async fn server_change(
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let result = {
let db = &state.engine;
let db = &handle.engine;
db.mutate_as(
&branch,
&request.query_source,
@ -1151,16 +1386,17 @@ async fn server_change(
/// Useful for clients that want to introspect available types and tables
/// before constructing GQ queries. Read-only.
async fn server_schema_get(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
State(_state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<SchemaOutput>, ApiError> {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.map(|Extension(actor)| actor.actor_id.as_ref().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: None,
@ -1168,7 +1404,7 @@ async fn server_schema_get(
},
)?;
let schema_source = {
let db = &state.engine;
let db = &handle.engine;
db.schema_source().to_string()
};
Ok(Json(SchemaOutput { schema_source }))
@ -1197,17 +1433,18 @@ async fn server_schema_get(
/// false the diff was unsupported and no changes were made.
async fn server_schema_apply(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<SchemaApplyRequest>,
) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::SchemaApply,
@ -1221,7 +1458,7 @@ async fn server_schema_apply(
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
let result = {
let db = &state.engine;
let db = &handle.engine;
// Engine-layer policy enforcement (MR-722): pass the resolved
// actor through so apply_schema_as can call enforce() with the
// authoritative identity. With a policy installed in AppState,
@ -1238,7 +1475,7 @@ async fn server_schema_apply(
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(schema_apply_output(state.uri(), result)))
Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
}
#[utoipa::path(
@ -1265,7 +1502,8 @@ async fn server_schema_apply(
/// `overwrite` or when ingest produces conflicting writes.
async fn server_ingest(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<Json<IngestOutput>, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
@ -1273,12 +1511,12 @@ async fn server_ingest(
let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref());
let branch_exists = {
let db = &state.engine;
let db = &handle.engine;
db.branch_list()
.await
.map_err(ApiError::from_omni)?
@ -1288,8 +1526,8 @@ async fn server_ingest(
if !branch_exists {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::BranchCreate,
@ -1299,8 +1537,8 @@ async fn server_ingest(
)?;
}
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::Change,
@ -1315,14 +1553,14 @@ async fn server_ingest(
.map_err(ApiError::from_workload_reject)?;
let result = {
let db = &state.engine;
let db = &handle.engine;
db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(ingest_output(
state.uri(),
handle.uri.as_str(),
&result,
actor_id.map(str::to_string),
)))
@ -1344,16 +1582,17 @@ async fn server_ingest(
///
/// Returns branch names sorted alphabetically. Read-only.
async fn server_branch_list(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
State(_state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<BranchListOutput>, ApiError> {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.map(|Extension(actor)| actor.actor_id.as_ref().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: None,
@ -1361,7 +1600,7 @@ async fn server_branch_list(
},
)?;
let mut branches = {
let db = &state.engine;
let db = &handle.engine;
db.branch_list().await.map_err(ApiError::from_omni)?
};
branches.sort();
@ -1391,21 +1630,22 @@ async fn server_branch_list(
/// already exists.
async fn server_branch_create(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<BranchCreateRequest>,
) -> std::result::Result<Json<BranchCreateOutput>, ApiError> {
let from = request.from.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.map(|Extension(actor)| actor.actor_id.as_ref().to_string())
.unwrap_or_default(),
action: PolicyAction::BranchCreate,
branch: Some(from.clone()),
@ -1420,20 +1660,20 @@ async fn server_branch_create(
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
{
let db = &state.engine;
let db = &handle.engine;
db.branch_create_from_as(
ReadTarget::branch(&from),
&request.name,
actor.as_ref().map(|Extension(a)| a.as_str()),
actor.as_ref().map(|Extension(a)| a.actor_id.as_ref()),
)
.await
.map_err(ApiError::from_omni)?;
}
Ok(Json(BranchCreateOutput {
uri: state.uri().to_string(),
uri: handle.uri.clone(),
from,
name: request.name,
actor_id: actor.map(|Extension(actor)| actor.as_str().to_string()),
actor_id: actor.map(|Extension(actor)| actor.actor_id.as_ref().to_string()),
}))
}
@ -1461,17 +1701,18 @@ async fn server_branch_create(
/// exist.
async fn server_branch_delete(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Path(branch): Path<String>,
) -> std::result::Result<Json<BranchDeleteOutput>, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::BranchDelete,
@ -1485,13 +1726,13 @@ async fn server_branch_delete(
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
{
let db = &state.engine;
let db = &handle.engine;
db.branch_delete_as(&branch, actor_id)
.await
.map_err(ApiError::from_omni)?;
}
Ok(Json(BranchDeleteOutput {
uri: state.uri().to_string(),
uri: handle.uri.clone(),
name: branch,
actor_id: actor_id.map(str::to_string),
}))
@ -1521,18 +1762,19 @@ async fn server_branch_delete(
/// unchanged in that case. **Destructive** to `target` on success.
async fn server_branch_merge(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<BranchMergeRequest>,
) -> std::result::Result<Json<BranchMergeOutput>, ApiError> {
let target = request.target.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.0))
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.as_ref().map(|Extension(actor)| actor.as_str());
let actor_id = actor.as_ref().map(|Extension(actor)| actor.actor_id.as_ref());
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor_id.map(str::to_string).unwrap_or_default(),
action: PolicyAction::BranchMerge,
@ -1548,7 +1790,7 @@ async fn server_branch_merge(
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
let outcome = {
let db = &state.engine;
let db = &handle.engine;
db.branch_merge_as(&request.source, &target, actor_id)
.await
.map_err(ApiError::from_omni)?
@ -1579,17 +1821,18 @@ async fn server_branch_merge(
/// Filter by `branch` to get the commits on a single branch (most recent
/// first); omit to list across all branches. Read-only.
async fn server_commit_list(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
State(_state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Query(query): Query<CommitListQuery>,
) -> std::result::Result<Json<CommitListOutput>, ApiError> {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.map(|Extension(actor)| actor.actor_id.as_ref().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: query.branch.clone(),
@ -1597,7 +1840,7 @@ async fn server_commit_list(
},
)?;
let commits = {
let db = &state.engine;
let db = &handle.engine;
db.list_commits(query.branch.as_deref())
.await
.map_err(ApiError::from_omni)?
@ -1628,17 +1871,18 @@ async fn server_commit_list(
/// Returns the commit's manifest version, parent commit(s), and creation
/// metadata. Read-only.
async fn server_commit_show(
State(state): State<AppState>,
actor: Option<Extension<AuthenticatedActor>>,
State(_state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Path(commit_id): Path<String>,
) -> std::result::Result<Json<api::CommitOutput>, ApiError> {
authorize_request(
&state,
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
actor_id: actor
.as_ref()
.map(|Extension(actor)| actor.as_str().to_string())
.map(|Extension(actor)| actor.actor_id.as_ref().to_string())
.unwrap_or_default(),
action: PolicyAction::Read,
branch: None,
@ -1646,7 +1890,7 @@ async fn server_commit_show(
},
)?;
let commit = {
let db = &state.engine;
let db = &handle.engine;
db.get_commit(&commit_id)
.await
.map_err(ApiError::from_omni)?