Merge remote-tracking branch 'origin/main' into ragnorc/shaping-config-integration

# Conflicts:
#	crates/omnigraph-cluster/src/lib.rs
#	crates/omnigraph-cluster/src/serve.rs
#	crates/omnigraph-server/src/lib.rs
#	crates/omnigraph-server/src/settings.rs
#	docs/user/clusters/config.md
This commit is contained in:
aaltshuler 2026-06-16 04:13:00 +03:00
commit 4f8c71fa23
75 changed files with 6557 additions and 6879 deletions

View file

@ -1,14 +1,15 @@
//! Server-level concurrent HTTP benchmark for MR-686 (PR 0 baseline).
//!
//! Drives concurrent `/change` requests against an in-process Omnigraph HTTP
//! server. Measures the global `Arc<RwLock<Omnigraph>>` lock penalty on
//! current `main` so PR 1 + PR 2 can be evaluated against a real baseline.
//! server. Originally written to measure the global `Arc<RwLock<Omnigraph>>`
//! lock penalty as an MR-686 baseline; that lock has since been removed
//! (engine write APIs are `&self`, the server holds a lockless
//! `Arc<Omnigraph>`), so this now measures the concurrent write path itself
//! (per-`(table, branch)` queue contention + Lance I/O).
//!
//! Per the MR-686 plan: this is the load-bearing bench. `Omnigraph::mutate_as`
//! is `&mut self`, so an engine-level concurrent bench either serializes on the
//! borrow checker (measures nothing) or drives multiple handles (measures Lance
//! contention, not the server bottleneck). Driving the HTTP server is the only
//! way to measure the actual `RwLock<Omnigraph>` contention this work removes.
//! Driving the HTTP server is still the right level: an engine-level bench on
//! a single handle measures Lance contention, not the server's request-path
//! concurrency.
//!
//! Usage:
//! ```sh

File diff suppressed because it is too large Load diff

View file

@ -51,25 +51,15 @@ pub(crate) async fn server_graphs_list(
State(state): State<AppState>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<GraphListResponse>, ApiError> {
// 405 in single mode — there's no registry to enumerate, and the
// legacy URL surface didn't expose this endpoint.
let registry = match state.routing() {
GraphRouting::Single { .. } => {
return Err(ApiError::method_not_allowed(
"GET /graphs is only available in multi-graph mode",
));
}
GraphRouting::Multi { registry, .. } => registry,
};
let registry = &state.routing().registry;
// Server-level Cedar gate. `state.server_policy` is loaded from
// `server.policy.file` in `omnigraph.yaml` at startup. When no
// server policy is configured, `authorize_request_server` falls
// through to the MR-723 default-deny semantics (every non-Read
// action denied for an authenticated actor). `GraphList` is not
// `Read`, so without a server policy the request gets 403 — which
// is the right default (don't leak the registry until the operator
// explicitly authorizes it).
// Server-level Cedar gate. `state.server_policy` is loaded from the
// cluster-scoped policy bundle at startup. When no server policy is
// configured, `authorize_request_server` falls through to the MR-723
// default-deny semantics (every non-Read action denied for an
// authenticated actor). `GraphList` is not `Read`, so without a server
// policy the request gets 403 — which is the right default (don't leak
// the registry until the operator explicitly authorizes it).
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
state.server_policy.as_deref(),
@ -93,17 +83,15 @@ pub(crate) async fn server_graphs_list(
}
pub(crate) async fn server_openapi(State(state): State<AppState>) -> Json<utoipa::openapi::OpenApi> {
let mut doc = ApiDoc::openapi();
// `served_openapi` is the single nesting source — the protected
// routes always live under `/graphs/{graph_id}/...` (public/management
// paths `/healthz`, `/graphs` stay flat). Building from it here means
// the runtime spec and the committed `openapi.json` share one nesting
// pass and can't drift.
let mut doc = crate::served_openapi();
if !state.requires_bearer_auth() {
strip_security(&mut doc);
}
// MR-668: in multi mode, the protected routes live under
// `/graphs/{graph_id}/...`. Rewrite the doc so the spec matches
// the routes the router actually serves. Public paths (`/healthz`)
// stay flat in both modes.
if matches!(state.routing(), GraphRouting::Multi { .. }) {
nest_paths_under_cluster_prefix(&mut doc);
}
Json(doc)
}
@ -248,16 +236,11 @@ pub(crate) async fn require_bearer_auth(
Ok(next.run(request).await)
}
/// Routing middleware (MR-668). Resolves the active graph for the
/// request and injects `Arc<GraphHandle>` as an extension so handlers can
/// extract it via `Extension<Arc<GraphHandle>>`.
/// Routing middleware (RFC-011 cluster-only). Resolves the active graph
/// for the request and injects `Arc<GraphHandle>` as an extension so
/// handlers can extract it via `Extension<Arc<GraphHandle>>`.
///
/// **Single mode**: the routing field holds the single handle directly.
/// Routes are flat; every request resolves to that handle, regardless
/// of the URI path. No registry walk, no sentinel key, no
/// programmer-error guard.
///
/// **Multi mode**: routes are nested under `/graphs/{graph_id}/...`. The
/// Routes are always nested under `/graphs/{graph_id}/...`. The
/// middleware extracts `{graph_id}` from the URI path and looks it up in
/// the registry. Returns 404 if the graph is not registered.
///
@ -268,39 +251,33 @@ pub(crate) async fn resolve_graph_handle(
mut request: Request,
next: Next,
) -> std::result::Result<Response, ApiError> {
let handle = match &state.routing {
GraphRouting::Single { handle } => Arc::clone(handle),
GraphRouting::Multi { registry, .. } => {
// `Router::nest("/graphs/{graph_id}", inner)` rewrites
// `request.uri().path()` to the inner suffix (e.g. `/snapshot`).
// The pre-rewrite URI is preserved in the `OriginalUri`
// request extension by axum's router; we read from there to
// extract `{graph_id}`. Fall back to the current URI only if
// the extension is missing, which shouldn't happen for
// nested routes but is safe defensive code.
let original_path: String = request
.extensions()
.get::<OriginalUri>()
.map(|OriginalUri(uri)| uri.path().to_string())
.unwrap_or_else(|| request.uri().path().to_string());
let graph_id_str = original_path
.strip_prefix("/graphs/")
.and_then(|rest| rest.split('/').next())
.filter(|s| !s.is_empty())
.ok_or_else(|| {
ApiError::bad_request(
"cluster route missing /graphs/{graph_id} prefix".to_string(),
)
})?;
let graph_id = GraphId::try_from(graph_id_str.to_string())
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let key = GraphKey::cluster(graph_id.clone());
match registry.get(&key) {
RegistryLookup::Ready(handle) => handle,
RegistryLookup::Gone => {
return Err(ApiError::not_found(format!("graph '{graph_id}' not found")));
}
}
let registry = &state.routing.registry;
// `Router::nest("/graphs/{graph_id}", inner)` rewrites
// `request.uri().path()` to the inner suffix (e.g. `/snapshot`).
// The pre-rewrite URI is preserved in the `OriginalUri`
// request extension by axum's router; we read from there to
// extract `{graph_id}`. Fall back to the current URI only if
// the extension is missing, which shouldn't happen for
// nested routes but is safe defensive code.
let original_path: String = request
.extensions()
.get::<OriginalUri>()
.map(|OriginalUri(uri)| uri.path().to_string())
.unwrap_or_else(|| request.uri().path().to_string());
let graph_id_str = original_path
.strip_prefix("/graphs/")
.and_then(|rest| rest.split('/').next())
.filter(|s| !s.is_empty())
.ok_or_else(|| {
ApiError::bad_request("cluster route missing /graphs/{graph_id} prefix".to_string())
})?;
let graph_id = GraphId::try_from(graph_id_str.to_string())
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let key = GraphKey::cluster(graph_id.clone());
let handle = match registry.get(&key) {
RegistryLookup::Ready(handle) => handle,
RegistryLookup::Gone => {
return Err(ApiError::not_found(format!("graph '{graph_id}' not found")));
}
};
@ -382,22 +359,25 @@ pub(crate) fn authorize(
// runtime state means the docstring contract on
// `server_graphs_list` ("don't leak the registry until the
// operator explicitly authorizes it") holds uniformly; the
// operator's only path to enabling it is configuring an
// explicit `server.policy.file` in omnigraph.yaml.
// operator's only path to enabling it is configuring a
// cluster-scoped policy bundle, applying the cluster, and
// restarting the server.
if request.action.resource_kind() == PolicyResourceKind::Server {
return Ok(Authz::Denied(
"server-scoped actions require an explicit `server.policy.file` \
configured in omnigraph.yaml the management surface is closed \
by default in every runtime state, including --unauthenticated, \
so that server topology is never exposed without operator opt-in."
"server-scoped actions require an explicit cluster policy bundle \
applied with `omnigraph cluster apply` and served after restart \
the management surface is closed by default in every runtime state, \
including --unauthenticated, so that server topology is never exposed \
without operator opt-in."
.to_string(),
));
}
if actor.is_some() && request.action != PolicyAction::Read {
return Ok(Authz::Denied(
"server runs in default-deny mode (bearer tokens configured but no \
policy file). Only `read` actions are permitted; configure \
`policy.file` in omnigraph.yaml to enable other actions."
applied policy bundle). Only `read` actions are permitted; configure \
a graph or cluster policy bundle in the cluster config, run \
`omnigraph cluster apply`, and restart the server to enable other actions."
.to_string(),
));
}
@ -510,7 +490,7 @@ pub(crate) fn deprecation_headers(successor_link: &'static str) -> [(HeaderName,
operation_id = "read",
request_body = ReadRequest,
responses(
(status = 200, description = "Query results (response includes `Deprecation: true` + `Link: </query>; rel=\"successor-version\"`)", body = ReadOutput),
(status = 200, description = "Query results (response includes `Deprecation: true` + `Link: <query>; rel=\"successor-version\"`)", body = ReadOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
@ -524,7 +504,7 @@ pub(crate) fn deprecation_headers(successor_link: &'static str) -> [(HeaderName,
/// route is kept indefinitely for byte-stable back-compat. New integrations
/// should target `POST /query`, which has clean field names (`query` /
/// `name`) and a 400-on-mutation guard. Responses from this route include
/// `Deprecation: true` and `Link: </query>; rel="successor-version"`
/// `Deprecation: true` and `Link: <query>; rel="successor-version"`
/// headers per RFC 9745 / RFC 8288 so SDKs and proxies can surface the
/// signal.
pub(crate) async fn server_read(
@ -544,7 +524,7 @@ pub(crate) async fn server_read(
)
.await?;
Ok((
deprecation_headers("</query>; rel=\"successor-version\""),
deprecation_headers("<query>; rel=\"successor-version\""),
Json(api::read_output(selected_name, &target, result)),
))
}
@ -793,7 +773,7 @@ pub(crate) async fn run_query(
operation_id = "change",
request_body = ChangeRequest,
responses(
(status = 200, description = "Mutation results (response includes `Deprecation: true` + `Link: </mutate>; rel=\"successor-version\"`)", body = ChangeOutput),
(status = 200, description = "Mutation results (response includes `Deprecation: true` + `Link: <mutate>; rel=\"successor-version\"`)", body = ChangeOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
@ -809,7 +789,7 @@ pub(crate) async fn run_query(
/// kept indefinitely for back-compat. New integrations should target
/// `POST /mutate`, which has identical semantics and a name that pairs
/// cleanly with `POST /query`. Responses from this route include
/// `Deprecation: true` and `Link: </mutate>; rel="successor-version"`
/// `Deprecation: true` and `Link: <mutate>; rel="successor-version"`
/// headers per RFC 9745 / RFC 8288 so SDKs and proxies can surface the
/// signal.
pub(crate) async fn server_change(
@ -830,7 +810,7 @@ pub(crate) async fn server_change(
)
.await?;
Ok((
deprecation_headers("</mutate>; rel=\"successor-version\""),
deprecation_headers("<mutate>; rel=\"successor-version\""),
Json(output),
))
}
@ -980,6 +960,22 @@ pub(crate) async fn server_invoke_query(
let query_name = stored.name.clone();
let is_mutation = stored.is_mutation();
// RFC-011 D3: the CLI verb asserts the stored query's kind. `query <name>`
// sends `expect_mutation: false`, `mutate <name>` sends `true`; a mismatch
// is rejected here so the wrong verb errors instead of silently running.
if let Some(expected) = req.expect_mutation {
if expected != is_mutation {
let (actual, verb) = if is_mutation {
("mutation", "mutate")
} else {
("read", "query")
};
return Err(ApiError::bad_request(format!(
"'{query_name}' is a {actual} — use omnigraph {verb} {query_name}"
)));
}
}
info!(
graph = %handle.uri,
actor = ?actor_ref.map(|a| a.actor_id.as_ref()),
@ -1117,12 +1113,16 @@ pub(crate) async fn server_schema_get(
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 409, description = "Schema apply is disabled for cluster-backed serving; use `omnigraph cluster apply` and restart", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Apply a schema migration.
///
/// Cluster-backed servers reject this route with `409 Conflict`; operators
/// must apply schema changes through `omnigraph cluster apply` and restart.
///
/// Diffs `schema_source` against the current schema and applies the resulting
/// migration steps (add/drop type, add/drop column, etc.). **Destructive**:
/// some steps drop data. Returns the list of steps applied; if `applied` is
@ -1149,6 +1149,17 @@ pub(crate) async fn server_schema_apply(
target_branch: Some("main".to_string()),
},
)?;
// Disable HTTP schema apply on cluster-backed serving AFTER the Cedar gate,
// so an unauthorized actor gets a 403 (not a 409 that would disclose the
// server is cluster-backed): 401 → 403 → 409, never leak topology before
// authorization. An authorized actor gets the actionable 409 signpost.
if state.routing().config_path.is_some() {
return Err(ApiError::conflict(
"server-side schema apply is disabled for cluster-backed serving; \
update the cluster config, run `omnigraph cluster apply`, and restart \
the server.",
));
}
let est_bytes = request.schema_source.len() as u64;
let _admission = state
.workload
@ -1180,6 +1191,25 @@ pub(crate) async fn server_schema_apply(
.await
.map_err(ApiError::from_omni)?
};
// Prompt index convergence (iss-848): schema apply records `@index` intent
// but defers the physical build. On a long-lived server, materialize it
// promptly rather than waiting for the next `optimize` cron — spawned
// detached so it never blocks or fails the apply response. Best-effort: a
// failure is logged and the index still converges on the next optimize.
// The CLI is one-shot, so it has no equivalent; its convergence path is the
// operator's optimize cadence.
if result.applied {
let engine = Arc::clone(&handle.engine);
tokio::spawn(async move {
if let Err(err) = engine.ensure_indices().await {
tracing::warn!(
target: "omnigraph::server",
error = %err,
"post-apply ensure_indices failed; indexes will converge on the next optimize",
);
}
});
}
Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
}
@ -1311,7 +1341,7 @@ pub(crate) async fn server_load(
operation_id = "ingest",
request_body = IngestRequest,
responses(
(status = 200, description = "Load results (response includes `Deprecation: true` + `Link: </load>; rel=\"successor-version\"`)", body = IngestOutput),
(status = 200, description = "Load results (response includes `Deprecation: true` + `Link: <load>; rel=\"successor-version\"`)", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
@ -1325,7 +1355,7 @@ pub(crate) async fn server_load(
/// Bulk-load NDJSON data into a branch. Behavior is unchanged; the route is
/// kept indefinitely for back-compat. New integrations should target
/// `POST /load`, which has identical semantics. Responses from this route
/// include `Deprecation: true` and `Link: </load>; rel="successor-version"`
/// include `Deprecation: true` and `Link: <load>; rel="successor-version"`
/// headers per RFC 9745 / RFC 8288 so SDKs and proxies can surface the signal.
pub(crate) async fn server_ingest(
State(state): State<AppState>,
@ -1341,7 +1371,7 @@ pub(crate) async fn server_ingest(
)
.await?;
Ok((
deprecation_headers("</load>; rel=\"successor-version\""),
deprecation_headers("<load>; rel=\"successor-version\""),
Json(output),
))
}
@ -1725,4 +1755,3 @@ pub(crate) fn query_params_from_json(
json_params_to_param_map(params_json, query_params, JsonParamMode::Standard)
.map_err(|err| color_eyre::eyre::eyre!(err.to_string()))
}

View file

@ -1,11 +1,10 @@
pub mod api;
mod handlers;
mod settings;
pub use settings::{load_server_settings, classify_server_runtime_state, server_config_is_multi, ServerRuntimeState};
pub use settings::{load_server_settings, classify_server_runtime_state, ServerRuntimeState};
use settings::*;
use handlers::*;
pub mod auth;
pub mod config;
pub mod graph_id;
pub mod identity;
pub mod policy;
@ -46,11 +45,6 @@ use axum::response::{IntoResponse, Response};
use axum::routing::{delete, get, post};
use axum::{Json, Router};
use color_eyre::eyre::{Result, WrapErr, bail, eyre};
pub use config::{
AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings,
ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig,
graph_resource_id_for_selection, load_config,
};
use futures::stream;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
@ -122,6 +116,20 @@ fn hash_bearer_token(token: &str) -> BearerTokenHash {
)]
pub struct ApiDoc;
/// The canonical served OpenAPI shape (RFC-011 cluster-only): the static
/// `ApiDoc` with every protected path nested under `/graphs/{graph_id}/…`
/// and `cluster_`-prefixed operation ids. `/healthz` and `/graphs` stay
/// flat. This is the single source of nesting — both the runtime
/// `server_openapi` handler and the committed `openapi.json` derive from
/// it, so the published spec can never describe routes the server does
/// not serve. The handler additionally strips security in open mode; the
/// committed spec retains it.
pub fn served_openapi() -> utoipa::openapi::OpenApi {
let mut doc = ApiDoc::openapi();
handlers::nest_paths_under_cluster_prefix(&mut doc);
doc
}
struct SecurityAddon;
impl utoipa::Modify for SecurityAddon {
@ -143,11 +151,10 @@ const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSIO
#[derive(Debug, Clone)]
pub struct ServerConfig {
/// Server topology + the graphs to open at startup. Single-mode
/// invocations (`omnigraph-server <URI>` or `--target <name>`)
/// produce `ServerConfigMode::Single`; multi-mode invocations
/// (`--config omnigraph.yaml` with a non-empty `graphs:` map and
/// no single-mode selector) produce `ServerConfigMode::Multi`.
/// Server topology + the graphs to open at startup. RFC-011
/// cluster-only: the server always boots from a cluster
/// (`--cluster <dir | s3://…>`) and serves N graphs under cluster
/// routes.
pub mode: ServerConfigMode,
pub bind: String,
/// Operator opt-in for fully-unauthenticated dev mode (MR-723).
@ -161,49 +168,33 @@ pub struct ServerConfig {
pub allow_unauthenticated: bool,
}
/// What `load_server_settings` produces after applying the four-rule
/// mode inference matrix (MR-668 decision 2).
/// What `load_server_settings` produces. RFC-011 cluster-only: the
/// server always boots from a cluster's applied revision into a
/// multi-graph deployment (N ≥ 1 graphs).
#[derive(Debug, Clone)]
pub enum ServerConfigMode {
/// Legacy invocation — one graph at the given URI. Either:
/// * `omnigraph-server <URI>` (CLI positional), or
/// * `omnigraph-server --target <name> --config omnigraph.yaml`, or
/// * `omnigraph-server --config omnigraph.yaml` with `server.graph`
/// set to a named target.
Single {
uri: String,
/// Cedar graph resource id for the single graph. A named selection
/// uses the graph name; an anonymous URI uses the normalized URI to
/// preserve legacy single-graph policy identity.
graph_id: String,
/// Top-level `policy.file` (single-graph Cedar policy).
policy_file: Option<PathBuf>,
/// Top-level stored-query registry, loaded and identity-checked
/// at settings-build time; type-checked against the schema when
/// the engine opens.
queries: QueryRegistry,
},
/// Multi-graph invocation — `--config omnigraph.yaml` with a
/// non-empty `graphs:` map and no single-mode selector.
/// Cluster boot — `--cluster <dir | s3://…>` resolves the applied
/// revision into per-graph startup configs plus an optional
/// server-level policy.
Multi {
/// Per-graph startup configs, sorted by graph id (BTreeMap
/// iteration order). The parallel-open loop iterates this.
graphs: Vec<GraphStartupConfig>,
/// Path to the config file the server was started from. Kept on
/// the mode so future runtime mutation (deferred — see release
/// notes) can locate the source of truth without re-parsing CLI
/// args.
/// The cluster boot source (config directory or storage root).
/// Kept on the mode so future runtime mutation (deferred — see
/// release notes) can locate the source of truth without
/// re-parsing CLI args.
config_path: PathBuf,
/// `server.policy.file` (server-level Cedar policy for the
/// management endpoints). Wired into `GET /graphs` authorization.
/// Server-level Cedar policy for the management endpoints
/// (`GET /graphs`). Wired into `GET /graphs` authorization.
server_policy: Option<PolicySource>,
},
}
/// Where a Cedar policy bundle comes from at startup. File-based for
/// omnigraph.yaml deployments; inline (digest-verified catalog content)
/// for cluster-mode boots, where the catalog may live on object storage
/// and the server must not re-read mutable state after the snapshot.
/// Where a Cedar policy bundle comes from at startup. Cluster-local files are
/// used during config application; inline digest-verified catalog content is
/// used for serving, where the catalog may live on object storage and the
/// server must not re-read mutable state after the snapshot.
#[derive(Debug, Clone)]
pub enum PolicySource {
File(PathBuf),
@ -227,36 +218,25 @@ pub struct GraphStartupConfig {
pub queries: QueryRegistry,
}
/// Runtime routing for the server. Single mode = legacy
/// `omnigraph-server <URI>` invocation, one graph, flat HTTP routes.
/// Multi mode = `--config omnigraph.yaml` with a non-empty `graphs:`
/// map, N graphs, cluster routes (`/graphs/{graph_id}/...`). Mode is
/// determined at startup by `load_server_settings`.
/// Runtime routing for the server (RFC-011 cluster-only). Every
/// deployment serves cluster routes (`/graphs/{graph_id}/...`) backed by
/// a registry of N graphs (N ≥ 1). The single-graph convenience
/// constructors build a one-graph registry keyed by `default`; the
/// cluster boot path builds an N-graph registry. There is no longer a
/// flat-route mode.
///
/// In single mode the handle lives here directly — there is no
/// registry, no sentinel key, no walk-and-assert. In multi mode the
/// registry carries N handles and the middleware dispatches on the
/// URL's `{graph_id}` segment.
/// `config_path` is the boot source (the cluster directory or storage
/// root); preserved here so future runtime mutation (deferred) can find
/// the source of truth without re-parsing CLI args. The server treats
/// the source as operator-owned and never writes it.
///
/// Both modes share the same handler bodies — the routing middleware
/// All handler bodies are mode-agnostic — the routing middleware
/// (`resolve_graph_handle`) injects `Arc<GraphHandle>` as a request
/// extension so handlers never see the routing discriminator.
/// extension by looking up the `{graph_id}` URL segment in the registry.
#[derive(Clone)]
pub enum GraphRouting {
/// Single-graph deployment: one handle, flat routes (`/snapshot`,
/// `/read`, …). The `handle.uri` field carries the URI the engine
/// was opened from. Backward compatible with v0.6.0 deployments.
Single { handle: Arc<GraphHandle> },
/// Multi-graph deployment: many handles, cluster routes
/// (`/graphs/{graph_id}/...`). `config_path` is the `omnigraph.yaml`
/// the server reads at startup; preserved here so future runtime
/// mutation (deferred) can find the source of truth without
/// re-parsing CLI args. The server treats the file as
/// operator-owned and never writes it.
Multi {
registry: Arc<GraphRegistry>,
config_path: Option<PathBuf>,
},
pub struct GraphRouting {
pub registry: Arc<GraphRegistry>,
pub config_path: Option<PathBuf>,
}
#[derive(Clone)]
@ -272,12 +252,10 @@ pub struct AppState {
/// see MR-668 decision Q6.
workload: Arc<workload::WorkloadController>,
bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
/// Server-level Cedar policy. Used by management endpoints (`POST
/// /graphs`, `GET /graphs`) which act on the registry resource,
/// not on a per-graph resource. Loaded from `server.policy.file`
/// in `omnigraph.yaml`. `None` outside multi mode and when no
/// server policy is configured. Per-graph policies live on each
/// `GraphHandle.policy`.
/// Server-level Cedar policy. Used by management endpoints (`GET
/// /graphs`) which act on the registry resource, not on a per-graph
/// resource. Loaded from the cluster-scoped policy binding when
/// configured. Per-graph policies live on each `GraphHandle.policy`.
server_policy: Option<Arc<PolicyEngine>>,
}
@ -502,11 +480,13 @@ impl AppState {
))
}
/// Single-mode shared construction: wraps the bare engine + per-graph
/// policy in a `GraphHandle` carried directly by `GraphRouting::Single`.
/// Per-graph policy enforcement on the engine (MR-722) is re-applied
/// via `Omnigraph::with_policy` so HTTP and engine layers can never
/// diverge.
/// Single-graph convenience construction (RFC-011 cluster-only):
/// wraps the bare engine + per-graph policy in a `GraphHandle` keyed
/// by `default`, then builds a one-graph registry so the deployment
/// serves the same `/graphs/{graph_id}/...` cluster routes as any
/// other. Per-graph policy enforcement on the engine (MR-722) is
/// re-applied via `Omnigraph::with_policy` so HTTP and engine layers
/// can never diverge.
fn build_single_mode(
uri: String,
db: Omnigraph,
@ -525,18 +505,13 @@ impl AppState {
} else {
db
};
// `GraphHandle.key` is required by the struct, but in single
// mode it is never a registry key (there's no registry) and
// never compared against user input (routes are flat, no
// `{graph_id}` parameter). The label appears only in tracing
// output from `resolve_graph_handle`. The literal below is a
// log label, not a routing key — when the future cluster
// catalog ships, single mode may carry the catalog-assigned
// id here instead.
// The convenience constructors address the single graph by the
// reserved id `default` — both the registry key and the URL
// segment (`/graphs/default/...`).
let uri = normalize_root_uri(&uri).unwrap_or(uri);
let key = GraphKey::cluster(
GraphId::try_from("default").expect("'default' is a valid GraphId log label"),
);
let graph_id =
GraphId::try_from("default").expect("'default' is a valid GraphId");
let key = GraphKey::cluster(graph_id);
let handle = Arc::new(GraphHandle {
key,
uri,
@ -544,8 +519,15 @@ impl AppState {
policy: policy_engine,
queries,
});
let registry = Arc::new(
GraphRegistry::from_handles(vec![handle])
.expect("a single handle never collides on graph id"),
);
Self {
routing: GraphRouting::Single { handle },
routing: GraphRouting {
registry,
config_path: None,
},
workload,
bearer_tokens,
server_policy: None,
@ -553,12 +535,11 @@ impl AppState {
}
/// Multi-mode constructor — used by the startup loop. Operators
/// reach this by invoking `omnigraph-server --config omnigraph.yaml`
/// with a non-empty `graphs:` map.
/// reach this by invoking `omnigraph-server --cluster <dir|s3://...>`.
///
/// Caller supplies the already-opened `GraphHandle`s and (optionally)
/// the path to the source config file. `server_policy` is loaded
/// from `server.policy.file` if configured.
/// the path to the source cluster. `server_policy` is loaded from the
/// cluster-scoped policy binding if configured.
pub fn new_multi(
handles: Vec<Arc<GraphHandle>>,
bearer_tokens: Vec<(String, String)>,
@ -569,7 +550,7 @@ impl AppState {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let registry = Arc::new(GraphRegistry::from_handles(handles)?);
Ok(Self {
routing: GraphRouting::Multi {
routing: GraphRouting {
registry,
config_path,
},
@ -581,9 +562,7 @@ impl AppState {
/// Runtime routing accessor. Handlers don't typically inspect this —
/// they extract `Arc<GraphHandle>` via the routing middleware — but
/// `build_app` matches on it to decide flat vs nested route
/// mounting, and a handful of management endpoints (`GET /graphs`,
/// the OpenAPI cluster rewrite) match on the discriminant.
/// `server_graphs_list` reads the registry through it.
pub fn routing(&self) -> &GraphRouting {
&self.routing
}
@ -597,13 +576,9 @@ impl AppState {
}
// Any per-graph policy also requires auth — otherwise the
// policy gate would receive unauthenticated requests. Reading
// from `routing` is O(1) in both arms: single mode is a direct
// `handle.policy.is_some()` check, multi mode reads the
// cached `any_per_graph_policy` flag on the registry snapshot.
match &self.routing {
GraphRouting::Single { handle } => handle.policy.is_some(),
GraphRouting::Multi { registry, .. } => registry.snapshot_ref().any_per_graph_policy,
}
// the cached `any_per_graph_policy` flag off the registry
// snapshot is O(1).
self.routing.registry.snapshot_ref().any_per_graph_policy
}
fn authenticate_bearer_token(&self, provided_token: &str) -> Option<ResolvedActor> {
@ -898,18 +873,6 @@ fn validate_and_attach(
})
}
/// Format every load error (parse / identity failure) into a multi-line
/// boot-abort message.
fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> String {
let joined = errors
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join("\n ");
format!("graph '{label}': stored-query registry failed to load:\n {joined}")
}
pub fn build_app(state: AppState) -> Router {
// The per-graph protected routes, identical in single + multi mode.
// Two middleware layers wrap them (outer first, inner last):
@ -975,13 +938,9 @@ pub fn build_app(state: AppState) -> Router {
// Management endpoints (`GET /graphs`) live alongside the per-graph
// router. They go through bearer auth but NOT through
// `resolve_graph_handle` — they operate on the registry directly.
// The endpoint is mounted in both modes; in single mode the handler
// returns 405 so clients see "resource exists, wrong context"
// rather than 404 "no such resource."
//
// Runtime add/remove (`POST /graphs`, `DELETE /graphs/{id}`) is not
// exposed in v0.6.0 — operators add graphs by editing
// `omnigraph.yaml` and restarting.
// exposed — operators run `cluster apply` and restart.
let management = Router::new()
.route("/graphs", get(server_graphs_list))
.route_layer(middleware::from_fn_with_state(
@ -989,15 +948,11 @@ pub fn build_app(state: AppState) -> Router {
require_bearer_auth,
));
// Mount the protected routes differently per mode:
// * Single → flat routes (legacy: `/snapshot`, `/read`, etc.)
// * Multi → nested under `/graphs/{graph_id}/...`
let protected: Router<AppState> = match state.routing() {
GraphRouting::Single { .. } => per_graph_protected.merge(management),
GraphRouting::Multi { .. } => Router::new()
.nest("/graphs/{graph_id}", per_graph_protected)
.merge(management),
};
// RFC-011 cluster-only: per-graph routes always nest under
// `/graphs/{graph_id}/...`; there are no flat single-graph routes.
let protected: Router<AppState> = Router::new()
.nest("/graphs/{graph_id}", per_graph_protected)
.merge(management);
Router::new()
.route("/healthz", get(server_health))
@ -1018,7 +973,6 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
// policy OR any per-graph policy file. Mirrors the
// `requires_bearer_auth` semantics on AppState.
let has_policy_configured = match &config.mode {
ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(),
ServerConfigMode::Multi {
graphs,
server_policy,
@ -1039,36 +993,14 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
ServerRuntimeState::DefaultDeny => warn!(
"bearer tokens are configured but no policy file is set — running in \
default-deny mode (only `read` actions are permitted for authenticated \
actors). Configure `policy.file` in omnigraph.yaml to enable Cedar rules."
actors). Configure a graph or cluster policy bundle in the cluster config, \
run `omnigraph cluster apply`, and restart to enable Cedar rules."
),
ServerRuntimeState::PolicyEnabled => {}
}
let bind = config.bind.clone();
let state = match config.mode {
ServerConfigMode::Single {
uri,
graph_id,
policy_file,
queries,
} => {
let uri_for_log = uri.clone();
info!(
uri = %uri_for_log,
graph_id = %graph_id,
bind = %bind,
mode = "single",
"serving omnigraph"
);
AppState::open_single_with_queries_for_graph_id(
uri,
tokens,
policy_file.as_ref(),
queries,
Some(graph_id),
)
.await?
}
ServerConfigMode::Multi {
graphs,
config_path,
@ -1076,7 +1008,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
} => {
info!(
bind = %bind,
mode = "multi",
mode = "cluster",
graph_count = graphs.len(),
config = %config_path.display(),
"serving omnigraph"
@ -1197,4 +1129,3 @@ async fn shutdown_signal() {
}
info!("shutdown signal received");
}

View file

@ -8,16 +8,10 @@ use omnigraph_server::{ServerConfig, init_tracing, load_server_settings, serve};
#[command(name = "omnigraph-server")]
#[command(about = "HTTP server for the Omnigraph graph database")]
struct Cli {
/// Graph URI
uri: Option<String>,
#[arg(long)]
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
/// Boot from a cluster: either a config directory (storage resolved
/// through cluster.yaml) or a storage-root URI directly
/// (s3://bucket/prefix — config-free serving from the bucket).
/// Exclusive: cannot combine with <URI>, --target, or --config.
/// The server's only boot source (RFC-011 cluster-only).
#[arg(long)]
cluster: Option<PathBuf>,
#[arg(long)]
@ -36,14 +30,7 @@ async fn main() -> Result<()> {
init_tracing();
let cli = Cli::parse();
let settings: ServerConfig = load_server_settings(
cli.config.as_ref(),
cli.cluster.as_ref(),
cli.uri,
cli.target,
cli.bind,
cli.unauthenticated,
)
.await?;
let settings: ServerConfig =
load_server_settings(cli.cluster.as_ref(), cli.bind, cli.unauthenticated).await?;
serve(settings).await
}

View file

@ -13,7 +13,6 @@
//! Renaming either is a breaking change to callers, by design.
use std::collections::BTreeMap;
use std::fs;
use std::sync::Arc;
use omnigraph_compiler::catalog::Catalog;
@ -22,8 +21,6 @@ use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::query::typecheck::typecheck_query_decl;
use omnigraph_compiler::types::{PropType, ScalarType};
use crate::config::{OmnigraphConfig, QueryEntry};
/// One loaded stored query. `source` is the full `.gq` file text — the
/// invocation handler hands it to `run_query` / `run_mutate` verbatim,
/// which reuse the same parse/IR/exec path as the inline routes (no
@ -68,8 +65,9 @@ pub struct QueryRegistry {
by_name: BTreeMap<String, StoredQuery>,
}
/// In-memory registry entry before file I/O. Used by [`QueryRegistry::load`]
/// (after reading each `.gq` from disk) and directly by tests.
/// In-memory registry spec: a query's name + already-read `.gq` source. The
/// input to [`QueryRegistry::from_specs`] — built by the server's cluster boot
/// and by the CLI's `queries` tooling from a cluster serving snapshot.
#[derive(Debug, Clone)]
pub struct RegistrySpec {
pub name: String,
@ -169,47 +167,6 @@ impl QueryRegistry {
}
}
/// Read each registry entry's `.gq` file from disk and build the
/// registry. `entries` is either the top-level `queries` map (single
/// mode) or a graph's `queries` map (multi mode); `config` resolves
/// each entry's relative `file:` path against `base_dir`.
pub fn load(
config: &OmnigraphConfig,
entries: &BTreeMap<String, QueryEntry>,
) -> Result<Self, Vec<LoadError>> {
let mut specs = Vec::with_capacity(entries.len());
let mut errors = Vec::new();
for (name, entry) in entries {
let path = config.resolve_query_file(&entry.file);
match fs::read_to_string(&path) {
Ok(source) => specs.push(RegistrySpec {
name: name.clone(),
source,
expose: entry.mcp.expose,
tool_name: entry.mcp.tool_name.clone(),
}),
Err(err) => errors.push(LoadError {
query: Some(name.clone()),
message: format!("cannot read '{}': {err}", path.display()),
}),
}
}
// Parse/identity/uniqueness-check the readable specs even when some
// files failed to read, so every broken entry (I/O, parse, identity,
// tool-name collision) surfaces in one pass rather than one per
// restart. I/O errors come first (in `entries` key order), then the
// spec errors. A non-empty `errors` always fails the load.
match Self::from_specs(specs) {
Ok(registry) if errors.is_empty() => Ok(registry),
Ok(_) => Err(errors),
Err(spec_errors) => {
errors.extend(spec_errors);
Err(errors)
}
}
}
pub fn lookup(&self, name: &str) -> Option<&StoredQuery> {
self.by_name.get(name)
}
@ -653,36 +610,4 @@ embedding: Vector(4)
assert!(entry2.params.is_empty(), "no declared params → empty list");
}
// --- load() error collection (file I/O + parse in one pass) ---
#[test]
fn load_collects_io_and_parse_errors_in_one_pass() {
use crate::config::load_config;
let temp = tempfile::tempdir().unwrap();
std::fs::write(
temp.path().join("good.gq"),
"query good() { match { $u: User } return { $u.name } }",
)
.unwrap();
std::fs::write(temp.path().join("broken.gq"), "query broken( {{ not valid").unwrap();
// `missing.gq` is deliberately not written (an I/O failure).
std::fs::write(
temp.path().join("omnigraph.yaml"),
"queries:\n good:\n file: ./good.gq\n \
missing:\n file: ./missing.gq\n broken:\n file: ./broken.gq\n",
)
.unwrap();
let config = load_config(Some(&temp.path().join("omnigraph.yaml"))).unwrap();
let errors = QueryRegistry::load(&config, config.query_entries()).unwrap_err();
let joined = errors.iter().map(|e| e.to_string()).collect::<Vec<_>>().join("\n");
// Both the missing file AND the parse error surface in one pass —
// the I/O failure must not mask the parse failure.
assert!(joined.contains("missing"), "I/O error must surface: {joined}");
assert!(
joined.contains("broken") && joined.contains("parse error"),
"the parse error in a readable file must surface in the same pass: {joined}"
);
assert!(!joined.contains("'good'"), "the valid entry is not an error: {joined}");
}
}

View file

@ -1,14 +1,13 @@
//! Server settings: omnigraph.yaml/CLI/env resolution, mode inference
//! (single vs multi vs cluster), bearer-token sources, and runtime-state
//! classification (moved verbatim from lib.rs in the modularization).
//! Server settings: cluster/CLI/env resolution, bearer-token sources, and
//! runtime-state classification (moved verbatim from lib.rs in the
//! modularization).
use super::*;
/// Build serving settings from a cluster directory's applied revision
/// (RFC-005 §D2): graphs at derived roots, stored queries from verified
/// catalog blob content, policy bundles from blob paths with their applied
/// bindings. Always multi-graph routing. The unauthenticated/env handling
/// matches the omnigraph.yaml path.
/// bindings. Always multi-graph routing.
pub(crate) async fn load_cluster_settings(
cluster_dir: &PathBuf,
cli_bind: Option<String>,
@ -131,163 +130,24 @@ pub(crate) async fn load_cluster_settings(
})
}
/// RFC-011 cluster-only boot: the server serves exclusively from a
/// cluster's applied revision (`--cluster <dir | s3://…>`). The legacy
/// omnigraph.yaml / `--target` / positional-URI single-graph boot paths
/// were removed — a deployment serves from exactly one source.
pub async fn load_server_settings(
config_path: Option<&PathBuf>,
cli_cluster: Option<&PathBuf>,
cli_uri: Option<String>,
cli_target: Option<String>,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
) -> Result<ServerConfig> {
// Rule 0 (RFC-005): --cluster is an exclusive boot source. It is checked
// before anything reads omnigraph.yaml — in cluster mode that file is
// never opened, not even the implicit current-directory search.
if let Some(cluster_dir) = cli_cluster {
if cli_uri.is_some() || cli_target.is_some() || config_path.is_some() {
bail!(
"--cluster is an exclusive boot source; it cannot combine with a graph URI, --target, or --config (axiom 15: a deployment serves from one source)"
);
}
return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await;
}
let config = load_config(config_path)?;
let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
// Either `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1` flips
// this. Treat any non-empty, non-"0"/"false" string as truthy —
// standard 12-factor "any value is true" reading of the env var.
let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
.ok()
.map(|v| {
let trimmed = v.trim();
!trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
})
.unwrap_or(false);
let allow_unauthenticated = cli_allow_unauthenticated || env_unauth;
// MR-668 decision 2 — four-rule mode inference matrix.
//
// 1. CLI `<URI>` positional → Single (URI = the value)
// 2. CLI `--target <name>` → Single (URI = graphs.<name>.uri)
// 3. `server.graph` in config → Single (URI = graphs.<server.graph>.uri)
// 4. `--config` + non-empty `graphs:` + no single-mode selector
// → Multi (every entry in `graphs:`)
// 5. otherwise → error with migration hint
//
// Rules 1-3 are mutually compatible (CLI URI wins over `--target`
// wins over `server.graph`), reusing the existing
// `resolve_target_uri` precedence.
let has_cli_uri = cli_uri.is_some();
let has_cli_target = cli_target.is_some();
let has_server_graph = config.server_graph_name().is_some();
let has_graphs_map = !config.graphs.is_empty();
let has_explicit_config = config_path.is_some();
let mode = if has_cli_uri || has_cli_target || has_server_graph {
// Rules 1, 2, or 3 → Single mode.
let raw_uri = config.resolve_target_uri(
cli_uri,
cli_target.as_deref(),
config.server_graph_name(),
)?;
let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
format!("normalize single-graph URI '{raw_uri}' from server settings")
})?;
// Config follows graph IDENTITY, not mode: a bare URI is anonymous
// (top-level config); a graph chosen by name uses its per-graph
// `graphs.<name>.{policy,queries}`. `resolve_target_uri` already
// errored on an unknown name, so a `Some(name)` here is a known graph.
let selected: Option<&str> = if has_cli_uri {
None
} else {
cli_target.as_deref().or_else(|| config.server_graph_name())
};
// A named selection must not leave a populated top-level block
// silently unused — refuse boot and point at the per-graph block. The
// same rule the CLI selection gate enforces, shared via one helper so
// the boot check and `omnigraph queries validate`/`list` can't drift.
config.ensure_top_level_blocks_honored(selected)?;
// Load + identity-check now (no engine needed); the schema
// type-check happens when the engine opens.
let policy_file = config.resolve_policy_file_for(selected);
let queries = QueryRegistry::load(&config, config.query_entries_for(selected))
.map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(&uri, &errs)))?;
let graph_id = graph_resource_id_for_selection(selected, &uri);
ServerConfigMode::Single {
uri,
graph_id,
policy_file,
queries,
}
} else if has_explicit_config && has_graphs_map {
// Multi mode: every graph uses its per-graph block; top-level
// policy/queries are never honored, so a populated one is an error.
let unhonored = config.populated_top_level_blocks();
if !unhonored.is_empty() {
bail!(
"multi-graph mode: top-level {} {} not honored — each graph uses its own \
`graphs.<graph_id>.` block. Move per-graph rules there (and any \
`graph_list` policy to `server.policy.file`).",
unhonored.join(" and "),
if unhonored.len() == 1 { "is" } else { "are" },
);
}
// Rule 4 → Multi mode. Build a startup config per graph.
let mut graphs = Vec::with_capacity(config.graphs.len());
for (name, target) in &config.graphs {
// Validate the graph id can construct a `GraphId` newtype.
// Doing this here (not at registry insert) so a malformed
// omnigraph.yaml fails at startup with a clear error.
GraphId::try_from(name.clone()).map_err(|err| {
color_eyre::eyre::eyre!("invalid graph id '{name}' in omnigraph.yaml: {err}")
})?;
let raw_uri = config.resolve_uri_value(&target.uri);
let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
format!("normalize URI '{raw_uri}' for graph '{name}' in omnigraph.yaml")
})?;
// Per-graph `queries:`, selected through the shared
// `query_entries_for` so server and CLI resolve identically.
// Load + identity-check now; the schema type-check happens
// when this graph's engine opens.
let queries = QueryRegistry::load(&config, config.query_entries_for(Some(name.as_str())))
.map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(name, &errs)))?;
graphs.push(GraphStartupConfig {
graph_id: name.clone(),
uri,
policy: config.resolve_target_policy_file(name).map(PolicySource::File),
embedding: None,
queries,
});
}
let config_path = config_path
.cloned()
.expect("has_explicit_config implies config_path is Some");
let server_policy = config.resolve_server_policy_file().map(PolicySource::File);
ServerConfigMode::Multi {
graphs,
config_path,
server_policy,
}
} else {
// Rule 5 → error with migration hint.
let Some(cluster_dir) = cli_cluster else {
bail!(
"no graph to serve: pass a URI (`omnigraph-server <URI>`), select a target \
(`--target <name> --config omnigraph.yaml`), set `server.graph: <name>` in \
omnigraph.yaml, or for multi-graph mode add a `graphs:` map to the config \
file referenced by `--config`."
"omnigraph-server boots from a cluster: pass --cluster <dir|s3://…> \
(the cluster's applied revision is the deployment artifact). The legacy \
single-graph boot (positional <URI>, --target, --config omnigraph.yaml) \
was removed in RFC-011."
);
};
Ok(ServerConfig {
mode,
bind,
allow_unauthenticated,
})
}
/// Whether the loaded config will run the server in multi-graph mode.
/// Useful for the test that constructs `ServerConfig` directly.
pub fn server_config_is_multi(config: &ServerConfig) -> bool {
matches!(config.mode, ServerConfigMode::Multi { .. })
load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await
}
/// MR-723 server runtime state, classified from the three-state matrix
@ -337,7 +197,8 @@ pub fn classify_server_runtime_state(
"server has no bearer tokens and no policy file configured. This is a fully \
open server pass `--unauthenticated` (or set OMNIGRAPH_UNAUTHENTICATED=1) \
if you actually want that, otherwise configure bearer tokens (see \
docs/user/operations/server.md) and/or `policy.file` in omnigraph.yaml."
docs/user/operations/server.md) and a graph or cluster policy bundle in \
the cluster config, then run `omnigraph cluster apply` and restart."
),
(false, false, true) => Ok(ServerRuntimeState::Open),
(true, false, _) => Ok(ServerRuntimeState::DefaultDeny),
@ -427,8 +288,8 @@ pub(crate) fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
mod tests {
use super::{
GraphStartupConfig, ServerConfig, ServerConfigMode, ServerRuntimeState,
classify_server_runtime_state, hash_bearer_token, load_server_settings,
normalize_bearer_token, parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
classify_server_runtime_state, hash_bearer_token, normalize_bearer_token,
parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
};
use serial_test::serial;
use std::env;
@ -587,108 +448,15 @@ mod tests {
}
#[tokio::test]
async fn server_settings_load_from_yaml_config() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
&config,
r#"
graphs:
local:
uri: /tmp/demo.omni
server:
graph: local
bind: 0.0.0.0:9090
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config), None, None, None, None, false).await.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
assert_eq!(uri, "/tmp/demo.omni");
assert_eq!(graph_id, "local");
}
ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
}
assert_eq!(settings.bind, "0.0.0.0:9090");
}
#[tokio::test]
async fn server_settings_cli_flags_override_yaml_config() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
&config,
r#"
graphs:
local:
uri: /tmp/demo.omni
server:
graph: local
bind: 127.0.0.1:8080
"#,
)
.unwrap();
let settings = load_server_settings(
Some(&config),
None,
Some("/tmp/override.omni".to_string()),
None,
Some("0.0.0.0:9999".to_string()),
false,
)
.await
.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
assert_eq!(uri, "/tmp/override.omni");
assert_eq!(graph_id, "/tmp/override.omni");
}
ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
}
assert_eq!(settings.bind, "0.0.0.0:9999");
}
#[tokio::test]
async fn server_settings_can_resolve_named_target() {
let temp = tempdir().unwrap();
let config = temp.path().join("omnigraph.yaml");
fs::write(
&config,
r#"
graphs:
local:
uri: ./demo.omni
dev:
uri: http://127.0.0.1:8080
server:
graph: local
bind: 127.0.0.1:8080
"#,
)
.unwrap();
let settings =
load_server_settings(Some(&config), None, None, Some("dev".to_string()), None, false)
.await
.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
assert_eq!(uri, "http://127.0.0.1:8080");
assert_eq!(graph_id, "dev");
}
ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
}
}
#[tokio::test]
async fn server_settings_require_uri_from_cli_or_config() {
let error = load_server_settings(None, None, None, None, None, false).await.unwrap_err();
async fn server_settings_require_cluster_boot_source() {
// RFC-011 cluster-only: with no --cluster the server refuses to
// start and names the cluster-required remedy.
let error = super::load_server_settings(None, None, false)
.await
.unwrap_err();
assert!(
error.to_string().contains("no graph to serve"),
"expected mode-inference error, got: {error}",
error.to_string().contains("boots from a cluster"),
"expected cluster-required error, got: {error}",
);
}
@ -799,17 +567,21 @@ server:
]);
let temp = tempdir().unwrap();
// Graph path doesn't need to exist — classifier fires before
// `AppState::open_with_bearer_tokens_and_policy`.
// any engine open.
let config = ServerConfig {
mode: ServerConfigMode::Single {
uri: temp
.path()
.join("graph.omni")
.to_string_lossy()
.into_owned(),
graph_id: "default".to_string(),
policy_file: None,
queries: crate::queries::QueryRegistry::default(),
mode: ServerConfigMode::Multi {
graphs: vec![GraphStartupConfig {
graph_id: "default".to_string(),
uri: temp
.path()
.join("graph.omni")
.to_string_lossy()
.into_owned(),
policy: None,
queries: crate::queries::QueryRegistry::default(),
}],
config_path: temp.path().join("cluster"),
server_policy: None,
},
bind: "127.0.0.1:0".to_string(),
allow_unauthenticated: false,
@ -824,75 +596,6 @@ server:
);
}
#[tokio::test]
#[serial]
async fn unauthenticated_env_var_classification() {
// MR-723 PR A: closes the gap where the env-var read path inside
// `load_server_settings` was structurally implemented but not
// exercised by any test. Three properties to pin, all in one
// sequential test because `cargo test` runs the mod test suite
// in parallel and `OMNIGRAPH_UNAUTHENTICATED` is process-global
// — interleaving with another test that sets the same env var
// (concurrent classifier tests, even the bearer-token suite
// sharing `EnvGuard`) corrupts the read. Sequential within one
// test fn is the simplest race-free shape.
let temp = tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
graphs:
local:
uri: /tmp/demo-unauth.omni
server:
graph: local
"#,
)
.unwrap();
// Truthy values flip Open mode on, even with CLI flag off.
for value in ["1", "true", "yes", "TRUE", "anything"] {
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await
.expect("settings load should succeed");
assert!(
settings.allow_unauthenticated,
"OMNIGRAPH_UNAUTHENTICATED={value:?} should enable Open mode",
);
}
// Falsy values keep refusal behavior, even with CLI flag off.
for value in ["0", "false", "FALSE", ""] {
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await
.expect("settings load should succeed");
assert!(
!settings.allow_unauthenticated,
"OMNIGRAPH_UNAUTHENTICATED={value:?} should NOT enable Open mode",
);
}
// Unset env var: also false.
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, false).await
.expect("settings load should succeed");
assert!(
!settings.allow_unauthenticated,
"OMNIGRAPH_UNAUTHENTICATED unset should NOT enable Open mode",
);
drop(_guard);
// CLI flag wins even when env is falsy — `serve()` honors the
// OR of both inputs.
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]);
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await
.expect("settings load should succeed");
assert!(
settings.allow_unauthenticated,
"--unauthenticated CLI flag should win even when env is falsy",
);
}
#[test]
fn classify_policy_enabled_requires_tokens() {
// State 3: tokens + policy → PolicyEnabled, regardless of the

View file

@ -50,7 +50,7 @@ async fn protected_routes_require_bearer_token() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -85,7 +85,7 @@ async fn protected_routes_accept_valid_bearer_token_while_healthz_stays_open() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::GET)
.header("authorization", "Bearer demo-token")
.body(Body::empty())
@ -108,7 +108,7 @@ async fn protected_routes_accept_any_configured_team_bearer_token() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::GET)
.header("authorization", "Bearer token-two")
.body(Body::empty())
@ -158,7 +158,7 @@ rules:
let (ok_status, _) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.header("authorization", "Bearer token-a")
.body(Body::empty())
@ -172,7 +172,7 @@ rules:
let (denied_status, denied_body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.header("authorization", "Bearer token-b")
.body(Body::empty())
@ -190,7 +190,7 @@ rules:
let (bad_status, _) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.header("authorization", "Bearer wrong-token")
.body(Body::empty())
@ -245,7 +245,7 @@ rules:
let (spoof_up_status, spoof_up_body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.header("authorization", "Bearer token-b")
.header("x-actor-id", "act-a")
@ -270,7 +270,7 @@ rules:
let (spoof_down_status, _) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.header("authorization", "Bearer token-a")
.header("x-actor-id", "act-b")
@ -290,7 +290,7 @@ rules:
let (empty_spoof_status, _) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.header("authorization", "Bearer token-b")
.header("x-actor-id", "")
@ -316,7 +316,7 @@ async fn policy_allows_read_but_distinguishes_401_from_403() {
let (missing_status, missing_body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -332,7 +332,7 @@ async fn policy_allows_read_but_distinguishes_401_from_403() {
let (snapshot_status, snapshot_body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.header("authorization", "Bearer team-token")
.body(Body::empty())
@ -350,7 +350,7 @@ async fn policy_allows_read_but_distinguishes_401_from_403() {
let (forbidden_status, forbidden_body) = json_response(
&app,
Request::builder()
.uri("/export")
.uri(g("/export"))
.method(Method::POST)
.header("authorization", "Bearer team-token")
.header("content-type", "application/json")
@ -369,7 +369,7 @@ async fn policy_allows_read_but_distinguishes_401_from_403() {
.clone()
.oneshot(
Request::builder()
.uri("/export")
.uri(g("/export"))
.method(Method::POST)
.header("authorization", "Bearer admin-token")
.header("content-type", "application/json")
@ -410,7 +410,7 @@ async fn policy_uses_resolved_branch_for_snapshot_reads() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("authorization", "Bearer team-token")
.header("content-type", "application/json")
@ -458,7 +458,7 @@ async fn policy_blocks_change_on_protected_main_but_allows_unprotected_branch()
let (main_status, main_body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("authorization", "Bearer team-token")
.header("content-type", "application/json")
@ -482,7 +482,7 @@ async fn policy_blocks_change_on_protected_main_but_allows_unprotected_branch()
let (feature_status, feature_body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("authorization", "Bearer team-token")
.header("content-type", "application/json")
@ -533,7 +533,7 @@ async fn policy_blocks_non_admin_merge_to_main_and_allows_admin() {
let (deny_status, deny_body) = json_response(
&app,
Request::builder()
.uri("/branches/merge")
.uri(g("/branches/merge"))
.method(Method::POST)
.header("authorization", "Bearer team-token")
.header("content-type", "application/json")
@ -551,7 +551,7 @@ async fn policy_blocks_non_admin_merge_to_main_and_allows_admin() {
let (allow_status, allow_body) = json_response(
&app,
Request::builder()
.uri("/branches/merge")
.uri(g("/branches/merge"))
.method(Method::POST)
.header("authorization", "Bearer admin-token")
.header("content-type", "application/json")
@ -578,7 +578,7 @@ async fn authenticated_change_stamps_actor_on_commits() {
let (change_status, change_body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("authorization", "Bearer token-one")
.header("content-type", "application/json")
@ -592,7 +592,7 @@ async fn authenticated_change_stamps_actor_on_commits() {
let (commits_status, commits_body) = json_response(
&app,
Request::builder()
.uri("/commits?branch=main")
.uri(g("/commits?branch=main"))
.method(Method::GET)
.header("authorization", "Bearer token-one")
.body(Body::empty())
@ -623,7 +623,7 @@ async fn authenticated_branch_merge_stamps_merge_actor_on_head_commit() {
let (create_status, _) = json_response(
&app,
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::POST)
.header("authorization", "Bearer token-one")
.header("content-type", "application/json")
@ -642,7 +642,7 @@ async fn authenticated_branch_merge_stamps_merge_actor_on_head_commit() {
let (change_status, _) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("authorization", "Bearer token-one")
.header("content-type", "application/json")
@ -659,7 +659,7 @@ async fn authenticated_branch_merge_stamps_merge_actor_on_head_commit() {
let (merge_status, merge_body) = json_response(
&app,
Request::builder()
.uri("/branches/merge")
.uri(g("/branches/merge"))
.method(Method::POST)
.header("authorization", "Bearer token-two")
.header("content-type", "application/json")
@ -673,7 +673,7 @@ async fn authenticated_branch_merge_stamps_merge_actor_on_head_commit() {
let (commit_status, commit_body) = json_response(
&app,
Request::builder()
.uri("/commits?branch=main")
.uri(g("/commits?branch=main"))
.method(Method::GET)
.header("authorization", "Bearer token-two")
.body(Body::empty())
@ -691,7 +691,6 @@ async fn authenticated_branch_merge_stamps_merge_actor_on_head_commit() {
#[tokio::test(flavor = "multi_thread")]
async fn engine_layer_policy_fires_via_direct_arc_omnigraph_from_new_single() {
use omnigraph_server::GraphRouting;
let temp = init_loaded_graph().await;
let graph = graph_path(temp.path());
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
@ -717,9 +716,14 @@ async fn engine_layer_policy_fires_via_direct_arc_omnigraph_from_new_single() {
// embedded consumer holding `Arc<Omnigraph>` would. If `new_single`
// failed to apply `with_policy` to the engine, this `mutate_as`
// would succeed — the HTTP-layer is bypassed entirely.
let handle = match state.routing() {
GraphRouting::Single { handle } => Arc::clone(handle),
GraphRouting::Multi { .. } => panic!("expected single-mode routing"),
// RFC-011 cluster-only: the single-graph convenience constructor
// registers the graph under the reserved id `default`.
let key = omnigraph_server::GraphKey::cluster(
omnigraph_server::GraphId::try_from("default").unwrap(),
);
let handle = match state.routing().registry.get(&key) {
omnigraph_server::RegistryLookup::Ready(handle) => handle,
omnigraph_server::RegistryLookup::Gone => panic!("default graph must be registered"),
};
let engine = Arc::clone(&handle.engine);
@ -758,7 +762,7 @@ async fn oversized_request_body_returns_payload_too_large() {
.clone()
.oneshot(
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(oversized))
@ -781,7 +785,7 @@ async fn default_deny_mode_allows_read_for_authenticated_actor() {
let (status, _body) = json_response(
&app,
Request::builder()
.uri("/snapshot")
.uri(g("/snapshot"))
.method(Method::GET)
.header(AUTHORIZATION, "Bearer demo-token")
.body(Body::empty())
@ -808,7 +812,7 @@ async fn default_deny_mode_rejects_change_with_forbidden() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header(AUTHORIZATION, "Bearer demo-token")
.header("content-type", "application/json")
@ -840,7 +844,7 @@ async fn default_deny_mode_rejects_schema_apply_with_forbidden() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/schema/apply")
.uri(g("/schema/apply"))
.method(Method::POST)
.header(AUTHORIZATION, "Bearer demo-token")
.header("content-type", "application/json")

View file

@ -18,10 +18,7 @@ use support::*;
mod multi_graph_startup {
use super::*;
use omnigraph::storage::normalize_root_uri;
use omnigraph_server::{
GraphHandle, GraphId, GraphKey, GraphRegistry, InsertError, ServerConfig, ServerConfigMode,
load_server_settings,
};
use omnigraph_server::{GraphHandle, GraphId, GraphKey, GraphRegistry, InsertError};
use std::sync::Arc;
async fn build_multi_mode_app(graph_ids: &[&str]) -> (Vec<tempfile::TempDir>, Router) {
@ -280,10 +277,11 @@ mod multi_graph_startup {
);
}
/// Flat routes 404 in multi mode — the router only mounts under
/// `/graphs/{graph_id}/...` so `/snapshot` doesn't resolve.
/// RFC-011 cluster-only: flat per-graph routes never resolve — the
/// router only mounts under `/graphs/{graph_id}/...` so a root
/// `/snapshot` returns 404.
#[tokio::test(flavor = "multi_thread")]
async fn flat_routes_404_in_multi_mode() {
async fn flat_routes_404_at_root() {
let (_dirs, app) = build_multi_mode_app(&["alpha"]).await;
let resp = app
.oneshot(
@ -298,28 +296,6 @@ mod multi_graph_startup {
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
/// `GraphId` validation runs at startup — a reserved name in
/// `omnigraph.yaml` produces a clear error rather than getting
/// rejected per-request.
#[tokio::test]
async fn load_server_settings_rejects_reserved_graph_id() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
graphs:
policies:
uri: /tmp/g1.omni
"#,
)
.unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, false).await.unwrap_err();
assert!(
err.to_string().contains("invalid graph id 'policies'"),
"expected reserved-name rejection, got: {err}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn registry_rejects_duplicate_normalized_graph_uris() {
@ -375,372 +351,6 @@ graphs:
assert_eq!(listed[0].uri, graph_uri);
}
// ── Four-rule mode inference matrix ───────────────────────────────
/// Rule 1: CLI positional URI → Single.
#[tokio::test]
async fn mode_inference_cli_uri_is_single() {
let settings = load_server_settings(
None,
None,
Some("/tmp/cli.omni".to_string()),
None,
None,
true, // allow unauth so we get past the runtime-state check
)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/cli.omni"),
ServerConfigMode::Multi { .. } => panic!("expected Single (rule 1), got Multi"),
}
}
/// Rule 2: --target picks one graph from `graphs:` map → Single.
#[tokio::test]
async fn mode_inference_cli_target_is_single() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
graphs:
alpha:
uri: /tmp/alpha.omni
beta:
uri: /tmp/beta.omni
"#,
)
.unwrap();
let settings =
load_server_settings(Some(&config_path), None, None, Some("alpha".into()), None, true)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"),
ServerConfigMode::Multi { .. } => panic!("expected Single (rule 2), got Multi"),
}
}
/// Rule 3: `server.graph` set → Single (target picked from config).
#[tokio::test]
async fn mode_inference_server_graph_is_single() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
graphs:
alpha:
uri: /tmp/alpha.omni
beta:
uri: /tmp/beta.omni
server:
graph: beta
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"),
ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"),
}
}
/// Rule 4: `--config` + non-empty `graphs:` + no single-mode selector → Multi.
#[tokio::test]
async fn mode_inference_config_plus_graphs_is_multi() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
graphs:
alpha:
uri: /tmp/alpha.omni
beta:
uri: /tmp/beta.omni
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Multi { graphs, .. } => {
let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect();
// BTreeMap iteration order is alphabetical.
assert_eq!(ids, vec!["alpha", "beta"]);
}
ServerConfigMode::Single { .. } => panic!("expected Multi (rule 4), got Single"),
}
}
#[tokio::test]
async fn mode_inference_multi_rejects_top_level_policy_file() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
policy:
file: ./policy.yaml
graphs:
alpha:
uri: /tmp/alpha.omni
"#,
)
.unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("top-level") && msg.contains("policy.file") && msg.contains("not honored"),
"expected top-level-not-honored guidance, got: {msg}"
);
assert!(
msg.contains("graphs.<graph_id>"),
"expected per-graph migration guidance, got: {msg}"
);
assert!(
msg.contains("server.policy.file"),
"expected server policy migration guidance, got: {msg}"
);
}
#[tokio::test]
async fn mode_inference_multi_rejects_top_level_queries() {
// Symmetric to the policy guard: a top-level `queries:` block in
// multi-graph mode is not honored (each graph uses its own), so it
// is a loud error rather than a silent no-op.
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
"queries:\n q:\n file: ./q.gq\ngraphs:\n alpha:\n uri: /tmp/alpha.omni\n",
)
.unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("queries") && msg.contains("not honored"),
"top-level queries must be rejected in multi-graph mode: {msg}"
);
}
#[tokio::test]
async fn single_mode_named_graph_rejects_top_level_blocks() {
// Serving a graph by name (`--target`/`server.graph`) uses its
// per-graph block; a populated top-level block would be silently
// shadowed, so boot refuses and names the per-graph location.
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
"policy:\n file: ./top.yaml\ngraphs:\n prod:\n uri: /tmp/prod.omni\n",
)
.unwrap();
let err =
load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true)
.await
.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("prod") && msg.contains("policy.file") && msg.contains("graphs.prod"),
"named single-mode + top-level policy must refuse, naming the graph: {msg}"
);
}
#[tokio::test]
async fn single_mode_named_graph_uses_per_graph_policy_and_queries() {
// The identity rule: `--target prod` attaches `graphs.prod`'s own
// policy + queries, not the top-level ones (which are absent here).
let temp = tempfile::tempdir().unwrap();
fs::write(
temp.path().join("prod.gq"),
"query pq() { match { $u: User } return { $u.name } }",
)
.unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
"graphs:\n prod:\n uri: /tmp/prod.omni\n policy:\n file: ./prod-policy.yaml\n \
queries:\n pq:\n file: ./prod.gq\n",
)
.unwrap();
let settings =
load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single {
graph_id,
policy_file,
queries,
..
} => {
assert_eq!(graph_id, "prod", "named single-mode keeps graph identity");
assert!(
policy_file
.as_ref()
.is_some_and(|p| p.ends_with("prod-policy.yaml")),
"per-graph policy attached: {policy_file:?}"
);
assert!(queries.lookup("pq").is_some(), "per-graph query attached");
}
other => panic!("expected Single mode, got {other:?}"),
}
}
#[tokio::test]
async fn mode_inference_normalizes_multi_graph_uris() {
let temp = tempfile::tempdir().unwrap();
let graph = temp.path().join("alpha.omni");
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
format!(
r#"
graphs:
alpha:
uri: file://{}/
"#,
graph.display()
),
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Multi { graphs, .. } => {
assert_eq!(graphs[0].uri, graph.to_string_lossy());
}
ServerConfigMode::Single { .. } => panic!("expected Multi"),
}
}
/// Rule 5: nothing → error with migration hint.
#[tokio::test]
async fn mode_inference_no_inputs_errors_with_migration_hint() {
let err = load_server_settings(None, None, None, None, None, true).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("no graph to serve"),
"expected migration-hint error, got: {msg}"
);
}
/// Rule 4 sub-case: `--config` with empty `graphs:` map and no
/// single-mode selector → rule 5 fires (no graph to serve).
#[tokio::test]
async fn mode_inference_empty_graphs_map_errors() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap();
let err = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap_err();
assert!(err.to_string().contains("no graph to serve"));
}
/// `--config` + `<URI>` together: URI wins → Single (the CLI URI
/// takes precedence over the config's graphs map).
#[tokio::test]
async fn mode_inference_cli_uri_overrides_graphs_map() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
graphs:
alpha:
uri: /tmp/alpha.omni
"#,
)
.unwrap();
let settings = load_server_settings(
Some(&config_path),
None,
Some("/tmp/cli-override.omni".to_string()),
None,
None,
true,
)
.await
.unwrap();
match settings.mode {
ServerConfigMode::Single { uri, .. } => {
assert_eq!(
uri, "/tmp/cli-override.omni",
"CLI URI must win over graphs: map"
);
}
ServerConfigMode::Multi { .. } => {
panic!("expected Single (CLI URI wins), got Multi")
}
}
}
/// Per-graph `policy.file` is resolved relative to the config base_dir.
#[tokio::test]
async fn per_graph_policy_file_is_resolved_relative_to_base_dir() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
graphs:
alpha:
uri: /tmp/alpha.omni
policy:
file: ./policies/alpha.yaml
beta:
uri: /tmp/beta.omni
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
let graphs = match settings.mode {
ServerConfigMode::Multi { graphs, .. } => graphs,
_ => panic!("expected Multi"),
};
// graphs is BTreeMap-iter order (alphabetical).
let alpha = &graphs[0];
let beta = &graphs[1];
assert_eq!(alpha.graph_id, "alpha");
let omnigraph_server::PolicySource::File(alpha_policy) =
alpha.policy.as_ref().unwrap()
else {
panic!("yaml-configured policy must stay file-based");
};
assert_eq!(alpha_policy, &temp.path().join("policies/alpha.yaml"));
assert_eq!(beta.graph_id, "beta");
assert!(beta.policy.is_none());
}
/// `server.policy.file` resolves alongside the graphs map.
#[tokio::test]
async fn server_policy_file_is_resolved_relative_to_base_dir() {
let temp = tempfile::tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(
&config_path,
r#"
server:
policy:
file: ./server-policy.yaml
graphs:
alpha:
uri: /tmp/alpha.omni
"#,
)
.unwrap();
let settings = load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
match settings.mode {
ServerConfigMode::Multi { server_policy, .. } => {
let omnigraph_server::PolicySource::File(path) = server_policy.unwrap() else {
panic!("yaml-configured server policy must stay file-based");
};
assert_eq!(path, temp.path().join("server-policy.yaml"));
}
_ => panic!("expected Multi"),
}
}
/// `GET /graphs` must NOT leak the registry in Open mode without
/// an explicit server policy. Operators who pass `--unauthenticated`
/// opted into trusting the network for graph DATA, not for leaking
@ -786,28 +396,6 @@ graphs:
);
}
/// `GET /graphs` returns 405 in single mode (resource exists in the
/// API surface, just not operational without a `graphs:` map).
#[tokio::test(flavor = "multi_thread")]
async fn get_graphs_returns_405_in_single_mode() {
let temp = init_loaded_graph().await;
let graph = graph_path(temp.path());
let state = AppState::open(graph.to_string_lossy().to_string())
.await
.unwrap();
let app = build_app(state);
let resp = app
.oneshot(
Request::builder()
.method(Method::GET)
.uri("/graphs")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::METHOD_NOT_ALLOWED);
}
/// `GET /graphs` requires bearer auth when tokens are configured.
#[tokio::test(flavor = "multi_thread")]
@ -971,52 +559,4 @@ rules:
);
}
/// Loads an `omnigraph.yaml` with two graphs and verifies multi-mode
/// inference plus graph entry resolution. Cluster-route dispatch is
/// covered by the route tests above.
#[tokio::test(flavor = "multi_thread")]
async fn server_settings_load_multi_graph_config_entries() {
let cfg_dir = tempfile::tempdir().unwrap();
// Real graph storage dirs (the URIs in the config must point to
// a graph init-able location).
let alpha_dir = cfg_dir.path().join("alpha.omni");
let beta_dir = cfg_dir.path().join("beta.omni");
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
Omnigraph::init(alpha_dir.to_str().unwrap(), &schema)
.await
.unwrap();
Omnigraph::init(beta_dir.to_str().unwrap(), &schema)
.await
.unwrap();
let config_path = cfg_dir.path().join("omnigraph.yaml");
fs::write(
&config_path,
format!(
r#"
graphs:
alpha:
uri: {alpha}
beta:
uri: {beta}
"#,
alpha = alpha_dir.display(),
beta = beta_dir.display(),
),
)
.unwrap();
let settings: ServerConfig =
load_server_settings(Some(&config_path), None, None, None, None, true).await.unwrap();
assert!(matches!(settings.mode, ServerConfigMode::Multi { .. }));
match settings.mode {
ServerConfigMode::Multi { graphs, .. } => {
assert_eq!(graphs.len(), 2);
let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect();
assert_eq!(ids, vec!["alpha", "beta"]);
}
_ => unreachable!(),
}
}
}

View file

@ -63,7 +63,7 @@ async fn export_route_returns_jsonl_for_branch_snapshot() {
.clone()
.oneshot(
Request::builder()
.uri("/export")
.uri(g("/export"))
.method(Method::POST)
.header("content-type", "application/json")
.header("authorization", format!("Bearer {}", token))
@ -99,7 +99,7 @@ async fn snapshot_route_returns_manifest_dataset_version() {
let (snapshot_status, snapshot_body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -131,7 +131,7 @@ async fn ingest_creates_branch_returns_metadata_and_stamps_actor() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.uri(g("/ingest"))
.method(Method::POST)
.header("authorization", "Bearer token-one")
.header("content-type", "application/json")
@ -195,7 +195,7 @@ async fn ingest_existing_branch_skips_branch_create_policy_check() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.uri(g("/ingest"))
.method(Method::POST)
.header("authorization", "Bearer team-token")
.header("content-type", "application/json")
@ -223,7 +223,7 @@ async fn ingest_without_from_returns_404_for_missing_branch_and_creates_nothing(
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.uri(g("/ingest"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&ingest).unwrap()))
@ -264,7 +264,7 @@ async fn ingest_without_from_loads_into_existing_branch() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.uri(g("/ingest"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&ingest).unwrap()))
@ -294,7 +294,7 @@ async fn ingest_denies_missing_branch_without_branch_create_permission() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.uri(g("/ingest"))
.method(Method::POST)
.header("authorization", "Bearer team-token")
.header("content-type", "application/json")
@ -327,7 +327,7 @@ async fn ingest_denies_when_actor_lacks_change_permission() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/ingest")
.uri(g("/ingest"))
.method(Method::POST)
.header("authorization", "Bearer team-token")
.header("content-type", "application/json")
@ -357,7 +357,7 @@ async fn ingest_rejects_payloads_over_32_mib() {
.clone()
.oneshot(
Request::builder()
.uri("/ingest")
.uri(g("/ingest"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&oversize).unwrap()))
@ -419,7 +419,7 @@ async fn branch_merge_conflict_response_includes_structured_conflicts() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/branches/merge")
.uri(g("/branches/merge"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&merge).unwrap()))
@ -451,7 +451,7 @@ async fn repeated_read_after_change_sees_updated_state_from_same_app() {
let (change_status, change_body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&change).unwrap()))
@ -471,7 +471,7 @@ async fn repeated_read_after_change_sees_updated_state_from_same_app() {
let (read_status, read_body) = json_response(
&app,
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&read).unwrap()))
@ -497,7 +497,7 @@ async fn query_endpoint_runs_inline_read() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/query")
.uri(g("/query"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&query).unwrap()))
@ -524,7 +524,7 @@ async fn query_endpoint_rejects_mutation_with_400() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/query")
.uri(g("/query"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&query).unwrap()))
@ -555,7 +555,7 @@ async fn mutate_endpoint_runs_inline_mutation() {
.clone()
.oneshot(
Request::builder()
.uri("/mutate")
.uri(g("/mutate"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
@ -580,7 +580,7 @@ async fn mutate_endpoint_runs_inline_mutation() {
#[tokio::test(flavor = "multi_thread")]
async fn change_endpoint_emits_deprecation_headers() {
// `/change` is kept indefinitely for back-compat but flagged at runtime
// per RFC 9745 (`Deprecation: true`) + RFC 8288 (`Link: </mutate>;
// per RFC 9745 (`Deprecation: true`) + RFC 8288 (`Link: <mutate>;
// rel="successor-version"`). The OpenAPI side is covered by
// `openapi_change_is_deprecated` in tests/openapi.rs.
let (_temp, app) = app_for_loaded_graph().await;
@ -595,7 +595,7 @@ async fn change_endpoint_emits_deprecation_headers() {
.clone()
.oneshot(
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
@ -615,7 +615,7 @@ async fn change_endpoint_emits_deprecation_headers() {
);
assert_eq!(
response.headers().get("link").and_then(|v| v.to_str().ok()),
Some("</mutate>; rel=\"successor-version\""),
Some("<mutate>; rel=\"successor-version\""),
"POST /change must point at /mutate via `Link` rel=successor-version (RFC 8288)"
);
}
@ -635,7 +635,7 @@ async fn load_endpoint_loads_into_existing_branch() {
.clone()
.oneshot(
Request::builder()
.uri("/load")
.uri(g("/load"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
@ -658,7 +658,7 @@ async fn load_endpoint_loads_into_existing_branch() {
#[tokio::test(flavor = "multi_thread")]
async fn ingest_endpoint_emits_deprecation_headers() {
// `/ingest` is the deprecated alias of `/load` (RFC-009 Phase 5): flagged
// at runtime per RFC 9745 (`Deprecation: true`) + RFC 8288 (`Link: </load>;
// at runtime per RFC 9745 (`Deprecation: true`) + RFC 8288 (`Link: <load>;
// rel="successor-version"`). The OpenAPI side is covered by
// `openapi_ingest_is_deprecated` in tests/openapi.rs.
let (_temp, app) = app_for_loaded_graph().await;
@ -672,7 +672,7 @@ async fn ingest_endpoint_emits_deprecation_headers() {
.clone()
.oneshot(
Request::builder()
.uri("/ingest")
.uri(g("/ingest"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
@ -692,7 +692,7 @@ async fn ingest_endpoint_emits_deprecation_headers() {
);
assert_eq!(
response.headers().get("link").and_then(|v| v.to_str().ok()),
Some("</load>; rel=\"successor-version\""),
Some("<load>; rel=\"successor-version\""),
"POST /ingest must point at /load via `Link` rel=successor-version (RFC 8288)"
);
}
@ -714,7 +714,7 @@ async fn read_endpoint_emits_deprecation_headers() {
.clone()
.oneshot(
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
@ -734,7 +734,7 @@ async fn read_endpoint_emits_deprecation_headers() {
);
assert_eq!(
response.headers().get("link").and_then(|v| v.to_str().ok()),
Some("</query>; rel=\"successor-version\""),
Some("<query>; rel=\"successor-version\""),
"POST /read must point at /query via `Link` rel=successor-version (RFC 8288)"
);
}
@ -757,7 +757,7 @@ async fn query_endpoint_does_not_emit_deprecation_headers() {
.clone()
.oneshot(
Request::builder()
.uri("/query")
.uri(g("/query"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&request).unwrap()))
@ -789,7 +789,7 @@ async fn change_endpoint_accepts_legacy_field_names() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&legacy_body).unwrap()))
@ -808,7 +808,7 @@ async fn change_endpoint_accepts_legacy_field_names() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&canonical_body).unwrap()))
@ -826,7 +826,7 @@ async fn remote_branch_list_create_merge_flow_works() {
let (list_status, list_body) = json_response(
&app,
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -842,7 +842,7 @@ async fn remote_branch_list_create_merge_flow_works() {
let (create_status, create_body) = json_response(
&app,
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&create).unwrap()))
@ -856,7 +856,7 @@ async fn remote_branch_list_create_merge_flow_works() {
let (list_status, list_body) = json_response(
&app,
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -874,7 +874,7 @@ async fn remote_branch_list_create_merge_flow_works() {
let (change_status, change_body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&change).unwrap()))
@ -895,7 +895,7 @@ async fn remote_branch_list_create_merge_flow_works() {
let (read_status, read_body) = json_response(
&app,
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&read_main_before).unwrap()))
@ -912,7 +912,7 @@ async fn remote_branch_list_create_merge_flow_works() {
let (merge_status, merge_body) = json_response(
&app,
Request::builder()
.uri("/branches/merge")
.uri(g("/branches/merge"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&merge).unwrap()))
@ -934,7 +934,7 @@ async fn remote_branch_list_create_merge_flow_works() {
let (read_status, read_body) = json_response(
&app,
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&read_main_after).unwrap()))
@ -957,7 +957,7 @@ async fn remote_branch_delete_flow_works() {
let (create_status, _) = json_response(
&app,
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&create).unwrap()))
@ -969,7 +969,7 @@ async fn remote_branch_delete_flow_works() {
let (delete_status, delete_body) = json_response(
&app,
Request::builder()
.uri("/branches/feature")
.uri(g("/branches/feature"))
.method(Method::DELETE)
.body(Body::empty())
.unwrap(),
@ -981,7 +981,7 @@ async fn remote_branch_delete_flow_works() {
let (list_status, list_body) = json_response(
&app,
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -1009,7 +1009,7 @@ async fn branch_delete_denies_without_policy_permission() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/branches/feature")
.uri(g("/branches/feature"))
.method(Method::DELETE)
.header("authorization", "Bearer token-team")
.body(Body::empty())
@ -1081,7 +1081,7 @@ query vector_search_string($q: String) {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&read).unwrap()))
@ -1134,7 +1134,7 @@ async fn change_conflict_returns_manifest_conflict_409() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(
@ -1206,7 +1206,7 @@ async fn change_concurrent_inserts_same_key_serialize_without_409() {
})
.unwrap();
let req = Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -1238,7 +1238,7 @@ async fn change_concurrent_inserts_same_key_serialize_without_409() {
let (snapshot_status, snapshot_body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -1319,7 +1319,7 @@ async fn change_concurrent_updates_same_key_serialize_via_publisher_cas() {
})
.unwrap();
let req = Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -1428,7 +1428,7 @@ query insert_c($name: String) {
})
.unwrap();
let req = Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -1445,7 +1445,7 @@ query insert_c($name: String) {
})
.unwrap();
let req = Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -1474,7 +1474,7 @@ query insert_c($name: String) {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -1582,7 +1582,7 @@ async fn ingest_per_actor_admission_cap_returns_429() {
})
.unwrap();
let req = Request::builder()
.uri("/ingest")
.uri(g("/ingest"))
.method(Method::POST)
.header("authorization", "Bearer flooder-token")
.header("content-type", "application/json")

View file

@ -248,7 +248,7 @@ async fn concurrent_branch_ops_morphological_matrix() {
.clone()
.oneshot(
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -369,7 +369,7 @@ async fn concurrent_branch_ops_morphological_matrix() {
.clone()
.oneshot(
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -717,31 +717,15 @@ graphs:
#[tokio::test]
async fn cluster_boot_refusals() {
// Mutual exclusion with --config / URI.
// RFC-011 cluster-only: with no --cluster, boot refuses with the
// cluster-required remedy.
let err = omnigraph_server::load_server_settings(None, None, true)
.await
.unwrap_err();
assert!(err.to_string().contains("boots from a cluster"), "{err}");
let temp = converged_cluster_dir("").await;
let dir = temp.path().to_path_buf();
let err = omnigraph_server::load_server_settings(
Some(&dir.join("omnigraph.yaml")),
Some(&dir),
None,
None,
None,
true,
)
.await
.unwrap_err();
assert!(err.to_string().contains("exclusive boot source"), "{err}");
let err = omnigraph_server::load_server_settings(
None,
Some(&dir),
Some("file:///tmp/x.omni".to_string()),
None,
None,
true,
)
.await
.unwrap_err();
assert!(err.to_string().contains("exclusive boot source"), "{err}");
// Tampered catalog blob refuses boot with the remedy.
let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person");

View file

@ -8,10 +8,9 @@ use axum::body::{Body, to_bytes};
use axum::http::{Method, Request, StatusCode};
use omnigraph::db::Omnigraph;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_server::{ApiDoc, AppState, build_app};
use omnigraph_server::{AppState, build_app, served_openapi};
use serde_json::Value;
use tower::ServiceExt;
use utoipa::OpenApi;
fn fixture(name: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
@ -71,7 +70,10 @@ async fn json_response(app: &Router, request: Request<Body>) -> (StatusCode, Val
}
fn openapi_doc() -> utoipa::openapi::OpenApi {
ApiDoc::openapi()
// RFC-011 cluster-only: the canonical committed spec is the SERVED
// shape — protected routes nested under `/graphs/{graph_id}/…`,
// `/healthz` and `/graphs` flat. This matches what the server serves.
served_openapi()
}
fn openapi_json() -> Value {
@ -159,26 +161,28 @@ fn openapi_info_contains_version() {
// Path coverage tests
// ---------------------------------------------------------------------------
// The canonical served spec keeps `/healthz` and `/graphs` flat; every
// protected route nests under `/graphs/{graph_id}/…`.
const EXPECTED_PATHS: &[&str] = &[
"/healthz",
"/graphs",
"/snapshot",
"/read",
"/query",
"/export",
"/change",
"/mutate",
"/queries",
"/queries/{name}",
"/schema",
"/schema/apply",
"/load",
"/ingest",
"/branches",
"/branches/{branch}",
"/branches/merge",
"/commits",
"/commits/{commit_id}",
"/graphs/{graph_id}/snapshot",
"/graphs/{graph_id}/read",
"/graphs/{graph_id}/query",
"/graphs/{graph_id}/export",
"/graphs/{graph_id}/change",
"/graphs/{graph_id}/mutate",
"/graphs/{graph_id}/queries",
"/graphs/{graph_id}/queries/{name}",
"/graphs/{graph_id}/schema",
"/graphs/{graph_id}/schema/apply",
"/graphs/{graph_id}/load",
"/graphs/{graph_id}/ingest",
"/graphs/{graph_id}/branches",
"/graphs/{graph_id}/branches/{branch}",
"/graphs/{graph_id}/branches/merge",
"/graphs/{graph_id}/commits",
"/graphs/{graph_id}/commits/{commit_id}",
];
#[test]
@ -222,25 +226,25 @@ fn openapi_healthz_is_get() {
#[test]
fn openapi_read_is_post() {
let doc = openapi_json();
assert!(doc["paths"]["/read"]["post"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/read"]["post"].is_object());
}
#[test]
fn openapi_export_is_post() {
let doc = openapi_json();
assert!(doc["paths"]["/export"]["post"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/export"]["post"].is_object());
}
#[test]
fn openapi_change_is_post() {
let doc = openapi_json();
assert!(doc["paths"]["/change"]["post"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/change"]["post"].is_object());
}
#[test]
fn openapi_mutate_is_post() {
let doc = openapi_json();
assert!(doc["paths"]["/mutate"]["post"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/mutate"]["post"].is_object());
}
// Deprecation flagging — `/read` and `/change` are kept indefinitely for
@ -253,7 +257,7 @@ fn openapi_mutate_is_post() {
fn openapi_read_is_deprecated() {
let doc = openapi_json();
assert_eq!(
doc["paths"]["/read"]["post"]["deprecated"],
doc["paths"]["/graphs/{graph_id}/read"]["post"]["deprecated"],
serde_json::Value::Bool(true),
"/read must be flagged deprecated in OpenAPI; use /query instead"
);
@ -263,7 +267,7 @@ fn openapi_read_is_deprecated() {
fn openapi_change_is_deprecated() {
let doc = openapi_json();
assert_eq!(
doc["paths"]["/change"]["post"]["deprecated"],
doc["paths"]["/graphs/{graph_id}/change"]["post"]["deprecated"],
serde_json::Value::Bool(true),
"/change must be flagged deprecated in OpenAPI; use /mutate instead"
);
@ -272,7 +276,7 @@ fn openapi_change_is_deprecated() {
#[test]
fn openapi_query_is_not_deprecated() {
let doc = openapi_json();
let deprecated = doc["paths"]["/query"]["post"]
let deprecated = doc["paths"]["/graphs/{graph_id}/query"]["post"]
.get("deprecated")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
@ -285,7 +289,7 @@ fn openapi_query_is_not_deprecated() {
#[test]
fn openapi_mutate_is_not_deprecated() {
let doc = openapi_json();
let deprecated = doc["paths"]["/mutate"]["post"]
let deprecated = doc["paths"]["/graphs/{graph_id}/mutate"]["post"]
.get("deprecated")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
@ -298,15 +302,15 @@ fn openapi_mutate_is_not_deprecated() {
#[test]
fn openapi_ingest_is_post() {
let doc = openapi_json();
assert!(doc["paths"]["/ingest"]["post"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/ingest"]["post"].is_object());
}
#[test]
fn openapi_load_is_not_deprecated() {
// RFC-009 Phase 5: /load is the canonical bulk-load endpoint.
let doc = openapi_json();
assert!(doc["paths"]["/load"]["post"].is_object());
let deprecated = doc["paths"]["/load"]["post"]
assert!(doc["paths"]["/graphs/{graph_id}/load"]["post"].is_object());
let deprecated = doc["paths"]["/graphs/{graph_id}/load"]["post"]
.get("deprecated")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
@ -321,7 +325,7 @@ fn openapi_ingest_is_deprecated() {
// RFC-009 Phase 5: /ingest is now the deprecated alias of /load.
let doc = openapi_json();
assert_eq!(
doc["paths"]["/ingest"]["post"]["deprecated"],
doc["paths"]["/graphs/{graph_id}/ingest"]["post"]["deprecated"],
serde_json::Value::Bool(true),
"/ingest must be flagged deprecated now that /load is canonical"
);
@ -330,32 +334,32 @@ fn openapi_ingest_is_deprecated() {
#[test]
fn openapi_branches_supports_get_and_post() {
let doc = openapi_json();
assert!(doc["paths"]["/branches"]["get"].is_object());
assert!(doc["paths"]["/branches"]["post"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/branches"]["get"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/branches"]["post"].is_object());
}
#[test]
fn openapi_branch_delete_is_delete() {
let doc = openapi_json();
assert!(doc["paths"]["/branches/{branch}"]["delete"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/branches/{branch}"]["delete"].is_object());
}
#[test]
fn openapi_branch_merge_is_post() {
let doc = openapi_json();
assert!(doc["paths"]["/branches/merge"]["post"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/branches/merge"]["post"].is_object());
}
#[test]
fn openapi_commits_is_get() {
let doc = openapi_json();
assert!(doc["paths"]["/commits"]["get"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/commits"]["get"].is_object());
}
#[test]
fn openapi_commit_show_is_get() {
let doc = openapi_json();
assert!(doc["paths"]["/commits/{commit_id}"]["get"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/commits/{commit_id}"]["get"].is_object());
}
// ---------------------------------------------------------------------------
@ -510,13 +514,13 @@ fn query_request_query_is_required() {
#[test]
fn openapi_query_is_post() {
let doc = openapi_json();
assert!(doc["paths"]["/query"]["post"].is_object());
assert!(doc["paths"]["/graphs/{graph_id}/query"]["post"].is_object());
}
#[test]
fn query_endpoint_documents_mutation_400() {
let doc = openapi_json();
let four_hundred = &doc["paths"]["/query"]["post"]["responses"]["400"];
let four_hundred = &doc["paths"]["/graphs/{graph_id}/query"]["post"]["responses"]["400"];
let description = four_hundred["description"].as_str().unwrap_or_default();
assert!(
description.contains("mutations") || description.contains("POST /mutate"),
@ -727,21 +731,21 @@ fn openapi_defines_bearer_token_security_scheme() {
fn protected_endpoints_reference_bearer_token_security() {
let doc = openapi_json();
let protected_paths = [
("/read", "post"),
("/change", "post"),
("/schema/apply", "post"),
("/queries", "get"),
("/queries/{name}", "post"),
("/load", "post"),
("/ingest", "post"),
("/export", "post"),
("/snapshot", "get"),
("/branches", "get"),
("/branches", "post"),
("/branches/{branch}", "delete"),
("/branches/merge", "post"),
("/commits", "get"),
("/commits/{commit_id}", "get"),
("/graphs/{graph_id}/read", "post"),
("/graphs/{graph_id}/change", "post"),
("/graphs/{graph_id}/schema/apply", "post"),
("/graphs/{graph_id}/queries", "get"),
("/graphs/{graph_id}/queries/{name}", "post"),
("/graphs/{graph_id}/load", "post"),
("/graphs/{graph_id}/ingest", "post"),
("/graphs/{graph_id}/export", "post"),
("/graphs/{graph_id}/snapshot", "get"),
("/graphs/{graph_id}/branches", "get"),
("/graphs/{graph_id}/branches", "post"),
("/graphs/{graph_id}/branches/{branch}", "delete"),
("/graphs/{graph_id}/branches/merge", "post"),
("/graphs/{graph_id}/commits", "get"),
("/graphs/{graph_id}/commits/{commit_id}", "get"),
];
for (path, method) in protected_paths {
@ -773,7 +777,7 @@ fn healthz_does_not_require_security() {
#[test]
fn branch_delete_has_branch_path_parameter() {
let doc = openapi_json();
let params = doc["paths"]["/branches/{branch}"]["delete"]["parameters"]
let params = doc["paths"]["/graphs/{graph_id}/branches/{branch}"]["delete"]["parameters"]
.as_array()
.unwrap();
let has_branch = params
@ -788,7 +792,7 @@ fn branch_delete_has_branch_path_parameter() {
#[test]
fn commit_show_has_commit_id_path_parameter() {
let doc = openapi_json();
let params = doc["paths"]["/commits/{commit_id}"]["get"]["parameters"]
let params = doc["paths"]["/graphs/{graph_id}/commits/{commit_id}"]["get"]["parameters"]
.as_array()
.unwrap();
let has_commit_id = params
@ -803,7 +807,7 @@ fn commit_show_has_commit_id_path_parameter() {
#[test]
fn snapshot_has_branch_query_parameter() {
let doc = openapi_json();
let params = doc["paths"]["/snapshot"]["get"]["parameters"]
let params = doc["paths"]["/graphs/{graph_id}/snapshot"]["get"]["parameters"]
.as_array()
.unwrap();
let has_branch = params
@ -818,7 +822,7 @@ fn snapshot_has_branch_query_parameter() {
#[test]
fn commits_has_branch_query_parameter() {
let doc = openapi_json();
let params = doc["paths"]["/commits"]["get"]["parameters"]
let params = doc["paths"]["/graphs/{graph_id}/commits"]["get"]["parameters"]
.as_array()
.unwrap();
let has_branch = params
@ -858,7 +862,7 @@ fn openapi_operations_have_tags() {
#[test]
fn read_endpoint_200_references_read_output_schema() {
let doc = openapi_json();
let content = &doc["paths"]["/read"]["post"]["responses"]["200"]["content"];
let content = &doc["paths"]["/graphs/{graph_id}/read"]["post"]["responses"]["200"]["content"];
let schema = &content["application/json"]["schema"];
let ref_path = schema["$ref"].as_str().unwrap();
assert!(
@ -870,7 +874,7 @@ fn read_endpoint_200_references_read_output_schema() {
#[test]
fn change_endpoint_200_references_change_output_schema() {
let doc = openapi_json();
let content = &doc["paths"]["/change"]["post"]["responses"]["200"]["content"];
let content = &doc["paths"]["/graphs/{graph_id}/change"]["post"]["responses"]["200"]["content"];
let schema = &content["application/json"]["schema"];
let ref_path = schema["$ref"].as_str().unwrap();
assert!(
@ -895,11 +899,11 @@ fn healthz_200_references_health_output_schema() {
fn error_responses_reference_error_output_schema() {
let doc = openapi_json();
let paths_with_errors = [
("/read", "post", "400"),
("/read", "post", "401"),
("/change", "post", "400"),
("/change", "post", "409"),
("/branches", "post", "409"),
("/graphs/{graph_id}/read", "post", "400"),
("/graphs/{graph_id}/read", "post", "401"),
("/graphs/{graph_id}/change", "post", "400"),
("/graphs/{graph_id}/change", "post", "409"),
("/graphs/{graph_id}/branches", "post", "409"),
];
for (path, method, status) in paths_with_errors {
@ -921,13 +925,13 @@ fn error_responses_reference_error_output_schema() {
fn post_endpoints_have_request_body() {
let doc = openapi_json();
let post_paths = [
("/read", "ReadRequest"),
("/change", "ChangeRequest"),
("/schema/apply", "SchemaApplyRequest"),
("/ingest", "IngestRequest"),
("/export", "ExportRequest"),
("/branches", "BranchCreateRequest"),
("/branches/merge", "BranchMergeRequest"),
("/graphs/{graph_id}/read", "ReadRequest"),
("/graphs/{graph_id}/change", "ChangeRequest"),
("/graphs/{graph_id}/schema/apply", "SchemaApplyRequest"),
("/graphs/{graph_id}/ingest", "IngestRequest"),
("/graphs/{graph_id}/export", "ExportRequest"),
("/graphs/{graph_id}/branches", "BranchCreateRequest"),
("/graphs/{graph_id}/branches/merge", "BranchMergeRequest"),
];
for (path, expected_schema) in post_paths {
@ -948,7 +952,7 @@ fn post_endpoints_have_request_body() {
#[test]
fn invoke_stored_query_request_body_is_optional() {
let doc = openapi_json();
let request_body = &doc["paths"]["/queries/{name}"]["post"]["requestBody"];
let request_body = &doc["paths"]["/graphs/{graph_id}/queries/{name}"]["post"]["requestBody"];
assert!(
request_body.is_object(),
"POST /queries/{{name}} should document its optional request body"
@ -1051,12 +1055,14 @@ async fn auth_mode_spec_has_security_on_protected_operations() {
.body(Body::empty())
.unwrap();
let (_, json) = json_response(&app, request).await;
// RFC-011 cluster-only: the served spec always nests protected
// routes under `/graphs/{graph_id}/...`.
let protected_paths = [
("/read", "post"),
("/change", "post"),
("/snapshot", "get"),
("/branches", "get"),
("/commits", "get"),
("/graphs/{graph_id}/read", "post"),
("/graphs/{graph_id}/change", "post"),
("/graphs/{graph_id}/snapshot", "get"),
("/graphs/{graph_id}/branches", "get"),
("/graphs/{graph_id}/commits", "get"),
];
for (path, method) in protected_paths {
let security = &json["paths"][path][method]["security"];
@ -1073,22 +1079,6 @@ async fn auth_mode_spec_has_security_on_protected_operations() {
}
}
#[tokio::test]
async fn auth_mode_spec_matches_static_generation() {
let (_temp, app) = app_for_loaded_graph_with_auth("secret").await;
let request = Request::builder()
.method(Method::GET)
.uri("/openapi.json")
.body(Body::empty())
.unwrap();
let (_, served) = json_response(&app, request).await;
let static_doc = openapi_json();
assert_eq!(
served, static_doc,
"auth-mode served spec must match static generation"
);
}
#[tokio::test]
async fn auth_mode_healthz_still_has_no_security() {
let (_temp, app) = app_for_loaded_graph_with_auth("secret").await;
@ -1394,8 +1384,9 @@ async fn multi_mode_operation_ids_are_unique() {
}
#[tokio::test]
async fn single_mode_openapi_unchanged_by_cluster_filter() {
// Regression: single mode still emits the legacy flat surface.
async fn served_spec_always_nests_under_cluster_prefix() {
// RFC-011 cluster-only: even a one-graph convenience app serves the
// nested cluster surface and never the flat protected routes.
let (_temp, app) = app_for_loaded_graph().await;
let request = Request::builder()
.method(Method::GET)
@ -1405,16 +1396,37 @@ async fn single_mode_openapi_unchanged_by_cluster_filter() {
let (_, json) = json_response(&app, request).await;
let paths = json["paths"].as_object().unwrap();
let path_keys: HashSet<&str> = paths.keys().map(|k| k.as_str()).collect();
for expected in EXPECTED_PATHS {
assert!(
path_keys.contains(expected),
"single mode must still emit flat path: {expected}"
);
}
for cluster in EXPECTED_CLUSTER_PATHS {
assert!(
!path_keys.contains(cluster),
"single mode must NOT emit cluster path: {cluster}"
path_keys.contains(cluster),
"served spec must emit cluster path: {cluster}. Found: {path_keys:?}"
);
}
// The flat protected routes must NOT appear — only the nested
// cluster surface plus the always-flat `/healthz` and `/graphs`.
let flat_protected = [
"/snapshot",
"/read",
"/query",
"/export",
"/change",
"/mutate",
"/queries",
"/queries/{name}",
"/schema",
"/schema/apply",
"/load",
"/ingest",
"/branches",
"/branches/{branch}",
"/branches/merge",
"/commits",
"/commits/{commit_id}",
];
for flat in flat_protected {
assert!(
!path_keys.contains(flat),
"served spec must NOT emit flat protected path: {flat}"
);
}
}

View file

@ -43,7 +43,7 @@ async fn server_opens_s3_graph_directly_and_serves_snapshot_and_read() {
let (snapshot_status, snapshot_body) = json_response(
&app,
Request::builder()
.uri("/snapshot")
.uri(g("/snapshot"))
.method(Method::GET)
.header("authorization", "Bearer s3-token")
.body(Body::empty())
@ -63,7 +63,7 @@ async fn server_opens_s3_graph_directly_and_serves_snapshot_and_read() {
let (read_status, read_body) = json_response(
&app,
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("authorization", "Bearer s3-token")
.header("content-type", "application/json")
@ -134,11 +134,8 @@ async fn server_boots_cluster_from_bare_storage_uri_and_serves_query() {
}
let settings = omnigraph_server::load_server_settings(
None,
Some(&std::path::PathBuf::from(&root)),
None,
None,
None,
true,
)
.await

View file

@ -2,6 +2,7 @@
//! Moved verbatim from tests/server.rs in the modularization.
use std::fs;
use std::sync::Arc;
use axum::body::Body;
use axum::http::{Method, Request, StatusCode};
@ -11,7 +12,9 @@ use omnigraph::loader::LoadMode;
use omnigraph_server::api::{
ChangeRequest, ErrorOutput, ReadRequest, SchemaApplyRequest, SchemaOutput,
};
use omnigraph_server::{AppState, build_app};
use omnigraph_server::{
AppState, GraphHandle, GraphId, GraphKey, PolicyEngine, build_app, workload,
};
use serde_json::json;
@ -30,7 +33,7 @@ async fn schema_apply_route_updates_graph_for_authorized_admin() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -54,6 +57,111 @@ async fn schema_apply_route_updates_graph_for_authorized_admin() {
);
}
#[tokio::test]
async fn schema_apply_route_refuses_cluster_backed_server_mode() {
let temp = init_graph_with_schema(&fs::read_to_string(fixture("test.pg")).unwrap()).await;
let graph = graph_path(temp.path());
let graph_uri = graph.to_string_lossy().to_string();
let engine = Omnigraph::open(&graph_uri).await.unwrap();
let handle = Arc::new(GraphHandle {
key: GraphKey::cluster(GraphId::try_from("default").unwrap()),
uri: graph_uri.clone(),
engine: Arc::new(engine),
policy: None,
queries: None,
});
let state = AppState::new_multi(
vec![handle],
Vec::new(),
None,
workload::WorkloadController::from_env(),
Some(temp.path().join("cluster.yaml")),
)
.unwrap();
let app = build_app(state);
let request = Request::builder()
.method(Method::POST)
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&SchemaApplyRequest {
schema_source: additive_schema_with_nickname(),
..Default::default()
})
.unwrap(),
))
.unwrap();
let (status, payload) = json_response(&app, request).await;
assert_eq!(status, StatusCode::CONFLICT, "body: {payload}");
assert!(
payload["error"]
.as_str()
.unwrap_or_default()
.contains("cluster apply"),
"body: {payload}"
);
let reopened = Omnigraph::open(&graph_uri).await.unwrap();
assert!(
!reopened.catalog().node_types["Person"]
.properties
.contains_key("nickname"),
"cluster-backed schema apply must not mutate the graph"
);
}
#[tokio::test]
async fn schema_apply_route_cluster_backed_denies_unauthorized_actor_before_409() {
// The cluster-backed 409 is reported AFTER the Cedar gate, so an actor
// without `schema_apply` permission gets a 403 — never a 409 that would
// disclose the server is cluster-backed (401 → 403 → 409, no topology leak
// before authorization). POLICY_YAML grants read/export but not schema_apply,
// so act-ragnor is denied.
let temp = init_graph_with_schema(&fs::read_to_string(fixture("test.pg")).unwrap()).await;
let graph = graph_path(temp.path());
let graph_uri = graph.to_string_lossy().to_string();
let engine = Omnigraph::open(&graph_uri).await.unwrap();
let policy = PolicyEngine::load_graph_from_source(POLICY_YAML, "default").unwrap();
let handle = Arc::new(GraphHandle {
key: GraphKey::cluster(GraphId::try_from("default").unwrap()),
uri: graph_uri,
engine: Arc::new(engine),
policy: Some(Arc::new(policy)),
queries: None,
});
let state = AppState::new_multi(
vec![handle],
vec![("act-ragnor".to_string(), "admin-token".to_string())],
None,
workload::WorkloadController::from_env(),
Some(temp.path().join("cluster.yaml")),
)
.unwrap();
let app = build_app(state);
let request = Request::builder()
.method(Method::POST)
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
serde_json::to_vec(&SchemaApplyRequest {
schema_source: additive_schema_with_nickname(),
..Default::default()
})
.unwrap(),
))
.unwrap();
let (status, payload) = json_response(&app, request).await;
assert_eq!(
status,
StatusCode::FORBIDDEN,
"an unauthorized actor must get 403 before the cluster-backed 409: {payload}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn schema_apply_route_rejects_stored_query_breakage_before_publish() {
let (temp, app) = app_with_stored_queries(
@ -65,7 +173,7 @@ async fn schema_apply_route_rejects_stored_query_breakage_before_publish() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -115,7 +223,7 @@ async fn schema_apply_route_noop_keeps_valid_stored_query_registry() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -142,7 +250,7 @@ async fn schema_apply_route_requires_schema_apply_policy_permission() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -173,7 +281,7 @@ async fn schema_apply_route_requires_bearer_token_when_policy_enabled() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&SchemaApplyRequest {
@ -203,7 +311,7 @@ async fn schema_apply_route_can_rename_type() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -239,7 +347,7 @@ async fn schema_apply_route_can_rename_property() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -279,7 +387,7 @@ async fn schema_apply_route_can_add_index() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -294,6 +402,11 @@ async fn schema_apply_route_can_add_index() {
assert_eq!(status, StatusCode::OK);
assert_eq!(payload["applied"], true);
// iss-848: the /schema/apply route accepts the index-add and applies it as a
// metadata change — it records the `@index` intent in the catalog/IR but does
// NOT build the physical index inline (the build is deferred to
// ensure_indices/optimize; on this empty table nothing would build anyway).
// So the physical index count is unchanged by the apply.
let reopened = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
let snapshot = reopened
.snapshot_of(ReadTarget::branch("main"))
@ -301,7 +414,10 @@ async fn schema_apply_route_can_add_index() {
.unwrap();
let dataset = snapshot.open("node:Person").await.unwrap();
let after_index_count = dataset.load_indices().await.unwrap().len();
assert!(after_index_count > before_index_count);
assert_eq!(
after_index_count, before_index_count,
"schema apply records @index intent but defers the physical build (iss-848)"
);
}
#[tokio::test]
@ -315,7 +431,7 @@ async fn schema_apply_route_rejects_unsupported_plan() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -356,7 +472,7 @@ async fn schema_apply_route_rejects_when_non_main_branch_exists() {
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -385,7 +501,7 @@ async fn schema_drift_returns_conflict_for_snapshot_read_and_change() {
let (snapshot_status, snapshot_body) = json_response(
&app,
Request::builder()
.uri("/snapshot?branch=main")
.uri(g("/snapshot?branch=main"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -413,7 +529,7 @@ async fn schema_drift_returns_conflict_for_snapshot_read_and_change() {
let (read_status, read_body) = json_response(
&app,
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&read).unwrap()))
@ -441,7 +557,7 @@ async fn schema_drift_returns_conflict_for_snapshot_read_and_change() {
let (change_status, change_body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&change).unwrap()))
@ -467,7 +583,7 @@ async fn schema_route_returns_current_source() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/schema")
.uri(g("/schema"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -486,7 +602,7 @@ async fn schema_route_requires_bearer_token_when_auth_configured() {
let (missing_status, missing_body) = json_response(
&app,
Request::builder()
.uri("/schema")
.uri(g("/schema"))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -502,7 +618,7 @@ async fn schema_route_requires_bearer_token_when_auth_configured() {
let (ok_status, ok_body) = json_response(
&app,
Request::builder()
.uri("/schema")
.uri(g("/schema"))
.method(Method::GET)
.header("authorization", "Bearer demo-token")
.body(Body::empty())
@ -533,7 +649,7 @@ async fn schema_route_denied_when_actor_lacks_read_permission() {
let (status, body) = json_response(
&app,
Request::builder()
.uri("/schema")
.uri(g("/schema"))
.method(Method::GET)
.header("authorization", "Bearer team-token")
.body(Body::empty())
@ -574,7 +690,7 @@ async fn schema_apply_route_soft_drops_property_via_http() {
&app,
Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -631,7 +747,7 @@ async fn schema_apply_route_soft_drops_node_type_via_http() {
&app,
Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -683,7 +799,7 @@ async fn schema_apply_route_hard_drops_property_with_allow_data_loss() {
&app,
Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -738,7 +854,7 @@ async fn schema_apply_route_keeps_drops_soft_without_flag() {
&app,
Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -770,29 +886,27 @@ async fn schema_apply_route_additive_property_preserves_existing_rows() {
// AddProperty wasn't pinned with a row-count check anywhere.
// Load N rows, apply schema adding nullable property, verify
// every row is still readable and the new column is null.
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(
&fs::read_to_string(fixture("test.pg")).unwrap(),
let (temp, app) = app_for_loaded_graph_with_auth_tokens_and_policy(
&[("act-ragnor", "admin-token")],
SCHEMA_APPLY_POLICY_YAML,
)
.await;
let graph = graph_path(temp.path());
// Standard fixture data: 4 Persons + 1 Company. Load it.
// Standard fixture data is loaded before the app is built, so the server
// handle applies schema from the same manifest it is serving.
let pre_count = {
let db = Omnigraph::open(graph.to_str().unwrap()).await.unwrap();
db.load(
"main",
&fs::read_to_string(fixture("test.jsonl")).unwrap(),
LoadMode::Append,
)
.await
.unwrap();
let snap = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap();
snap.entry("node:Person").expect("Person").row_count
snap.open("node:Person")
.await
.expect("Person")
.count_rows(None)
.await
.unwrap()
};
assert!(pre_count > 0, "fixture should have loaded Person rows");
@ -800,7 +914,7 @@ async fn schema_apply_route_additive_property_preserves_existing_rows() {
&app,
Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.uri(g("/schema/apply"))
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
@ -822,7 +936,13 @@ async fn schema_apply_route_additive_property_preserves_existing_rows() {
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap();
let post_count = snap.entry("node:Person").expect("Person").row_count;
let post_count = snap
.open("node:Person")
.await
.expect("Person")
.count_rows(None)
.await
.unwrap();
assert_eq!(
post_count, pre_count,
"AddProperty should preserve row count",

View file

@ -82,6 +82,58 @@ async fn invoke_stored_read_returns_rows() {
assert!(body["rows"].is_array(), "read envelope shape; body: {body}");
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_with_mismatched_expected_kind_is_rejected() {
// RFC-011 D3: the CLI verb asserts the stored query's kind via
// `expect_mutation`. Invoking a read with `expect_mutation: true`
// (i.e. `omnigraph mutate <a-read>`) is a 400 naming the right verb.
let (_temp, app) = app_with_stored_queries(
&[("find_person", FIND_PERSON_GQ, false)],
&[("act-invoke", "t-invoke")],
INVOKE_POLICY_YAML,
)
.await;
let (status, body) = json_response(
&app,
invoke_request(
"find_person",
"t-invoke",
json!({ "expect_mutation": true, "params": { "name": "Alice" } }),
),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST, "body: {body}");
assert!(
body["error"]
.as_str()
.unwrap_or_default()
.contains("'find_person' is a read — use omnigraph query find_person"),
"expected a kind-mismatch error; body: {body}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_with_matching_expected_kind_runs() {
// The matching assertion (`omnigraph query <a-read>`) passes through.
let (_temp, app) = app_with_stored_queries(
&[("find_person", FIND_PERSON_GQ, false)],
&[("act-invoke", "t-invoke")],
INVOKE_POLICY_YAML,
)
.await;
let (status, body) = json_response(
&app,
invoke_request(
"find_person",
"t-invoke",
json!({ "expect_mutation": false, "params": { "name": "Alice" } }),
),
)
.await;
assert_eq!(status, StatusCode::OK, "matching kind should run; body: {body}");
assert_eq!(body["query_name"], "find_person");
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_stored_read_accepts_absent_or_empty_body() {
let no_param_query = "query list_people() { match { $p: Person } return { $p.name } }";
@ -272,7 +324,7 @@ async fn list_queries_returns_only_exposed_with_typed_params() {
INVOKE_POLICY_YAML,
)
.await;
let (status, body) = json_response(&app, get_request("/queries", "t-invoke")).await;
let (status, body) = json_response(&app, get_request(&g("/queries"), "t-invoke")).await;
assert_eq!(status, StatusCode::OK, "body: {body}");
let entries = body["queries"].as_array().unwrap();
@ -303,7 +355,7 @@ async fn list_queries_is_read_gated_so_a_non_invoker_can_list() {
INVOKE_POLICY_YAML,
)
.await;
let (status, body) = json_response(&app, get_request("/queries", "t-noinvoke")).await;
let (status, body) = json_response(&app, get_request(&g("/queries"), "t-noinvoke")).await;
assert_eq!(status, StatusCode::OK, "read-gated catalog; body: {body}");
let names: Vec<&str> = body["queries"]
.as_array()
@ -320,7 +372,7 @@ async fn list_queries_is_read_gated_so_a_non_invoker_can_list() {
#[tokio::test(flavor = "multi_thread")]
async fn list_queries_is_empty_when_no_registry() {
let (_temp, app) = app_for_loaded_graph_with_auth("demo-token").await;
let (status, body) = json_response(&app, get_request("/queries", "demo-token")).await;
let (status, body) = json_response(&app, get_request(&g("/queries"), "demo-token")).await;
assert_eq!(status, StatusCode::OK, "body: {body}");
assert!(
body["queries"].as_array().unwrap().is_empty(),

View file

@ -248,9 +248,17 @@ rules:
pub const FIND_PERSON_GQ: &str =
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }";
/// RFC-011 cluster-only: the single-graph convenience apps built by the
/// `app_for_loaded_graph*` helpers serve the graph under the reserved id
/// `default`. This prefixes a flat per-graph path (e.g. `/snapshot`) with
/// the cluster route prefix so tests address `/graphs/default/snapshot`.
pub fn g(path: &str) -> String {
format!("/graphs/default{path}")
}
pub fn invoke_request(name: &str, token: &str, body: Value) -> Request<Body> {
Request::builder()
.uri(format!("/queries/{name}"))
.uri(g(&format!("/queries/{name}")))
.method(Method::POST)
.header("content-type", "application/json")
.header("authorization", format!("Bearer {token}"))
@ -265,7 +273,7 @@ pub fn invoke_request_bytes(
content_type: Option<&str>,
) -> Request<Body> {
let mut builder = Request::builder()
.uri(format!("/queries/{name}"))
.uri(g(&format!("/queries/{name}")))
.method(Method::POST)
.header("authorization", format!("Bearer {token}"));
if let Some(content_type) = content_type {
@ -656,7 +664,7 @@ pub mod matrix {
.clone()
.oneshot(
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -686,7 +694,7 @@ pub mod matrix {
.clone()
.oneshot(
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -728,7 +736,7 @@ pub mod matrix {
.clone()
.oneshot(
Request::builder()
.uri(format!("/snapshot?branch={}", branch))
.uri(g(&format!("/snapshot?branch={}", branch)))
.method(Method::GET)
.body(Body::empty())
.unwrap(),
@ -766,7 +774,7 @@ pub mod matrix {
.clone()
.oneshot(
Request::builder()
.uri("/read")
.uri(g("/read"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -833,7 +841,7 @@ pub mod matrix {
.clone()
.oneshot(
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -874,7 +882,7 @@ pub mod matrix {
let response = app
.oneshot(
Request::builder()
.uri("/branches/merge")
.uri(g("/branches/merge"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -910,7 +918,7 @@ pub mod matrix {
let response = app
.oneshot(
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -943,7 +951,7 @@ pub mod matrix {
let response = app
.oneshot(
Request::builder()
.uri("/branches")
.uri(g("/branches"))
.method(Method::POST)
.header("content-type", "application/json")
.body(Body::from(body))
@ -970,7 +978,7 @@ pub mod matrix {
let response = app
.oneshot(
Request::builder()
.uri(format!("/branches/{}", name))
.uri(g(&format!("/branches/{}", name)))
.method(Method::DELETE)
.body(Body::empty())
.unwrap(),
@ -1091,7 +1099,7 @@ pub async fn http_change_decision(
let (status, _body) = json_response(
&app,
Request::builder()
.uri("/change")
.uri(g("/change"))
.method(Method::POST)
.header(AUTHORIZATION, format!("Bearer {token}"))
.header("content-type", "application/json")
@ -1141,7 +1149,7 @@ pub async fn http_merge_decision(
let (status, _body) = json_response(
&app,
Request::builder()
.uri("/branches/merge")
.uri(g("/branches/merge"))
.method(Method::POST)
.header(AUTHORIZATION, format!("Bearer {token}"))
.header("content-type", "application/json")
@ -1191,5 +1199,5 @@ graphs:
}
pub async fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result<omnigraph_server::ServerConfig> {
omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true).await
omnigraph_server::load_server_settings(Some(&dir.to_path_buf()), None, true).await
}