From 4df361d1526ee1cd694a2d3cf45e966e81e879d3 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Mon, 25 May 2026 20:09:12 +0200 Subject: [PATCH] mr-668: multi-graph startup + mode inference (PR 5/10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR 5 of the MR-668 multi-graph server work. This is the first PR that makes multi mode actually usable end-to-end: operators invoking `omnigraph-server --config omnigraph.yaml` with a non-empty `graphs:` map and no single-mode selector now get a running multi-graph server. Mode inference (MR-668 decision 2, four-rule matrix in `load_server_settings`): 1. CLI `` positional → Single 2. CLI `--target ` → Single (URI from graphs.) 3. `server.graph` in config → Single (URI from graphs.) 4. `--config` + non-empty `graphs:` + no single-mode selector → Multi (all entries in `graphs:`) 5. otherwise → error with migration hint Rule 5's error message names every escape hatch so operators can fix their invocation without grepping docs. Config schema extensions: - `TargetConfig.policy: PolicySettings` (per-graph Cedar policy file). `#[serde(default)]` so existing single-graph YAMLs keep parsing. - `ServerDefaults.policy: PolicySettings` (server-level Cedar policy for management endpoints — loaded in PR 5, wired into `GET /graphs` in PR 6b). - `OmnigraphConfig::resolve_target_policy_file(name)` and `resolve_server_policy_file()` helpers — both resolve relative to the config file's `base_dir`. Public types added to `omnigraph-server`: - `ServerConfigMode { Single { uri, policy_file } | Multi { graphs, config_path, server_policy_file } }`. - `GraphStartupConfig { graph_id, uri, policy_file }` — one entry per graph in multi mode. `ServerConfig` shape change: - WAS: `{ uri: String, bind, policy_file, allow_unauthenticated }`. - NOW: `{ mode: ServerConfigMode, bind, allow_unauthenticated }`. - Breaking for any code that constructs `ServerConfig` directly. `main.rs` is unaffected (uses `load_server_settings`). `serve()` now forks on `ServerConfig.mode`: - Single: existing flow via `AppState::open_with_bearer_tokens_and_policy`. - Multi: parallel open via `futures::stream::iter(graphs) .map(open_single_graph).buffer_unordered(4).collect()`. Bound 4 is a rule-of-thumb for I/O-bound work — at N≤10 this trades startup latency for a small amount of concurrent S3/Lance open pressure. Fail-fast: first open error aborts startup; in-flight opens drop their engine via Arc (Lance datasets close cleanly). New helper `open_single_graph(GraphStartupConfig)`: - Validates `GraphId` per the regex in PR 1. - `Omnigraph::open(uri).await` with descriptive error context. - Loads per-graph policy file and re-applies it via `Omnigraph::with_policy` (engine-layer enforcement, MR-722). - Returns `Arc` ready for the registry. Routing middleware bug fix: - `Router::nest("/graphs/{graph_id}", inner)` rewrites `request.uri().path()` to the inner suffix (e.g. `/snapshot`). The previous middleware tried to parse `{graph_id}` from `request.uri().path()` and got 400 instead of 200. Fixed by reading from `axum::extract::OriginalUri` request extension, which preserves the pre-rewrite URI. - Caught by the two new tests `cluster_routes_dispatch_per_graph_handle` and `cluster_route_for_unknown_graph_returns_404`. Tests (14 new, all passing): - Four-rule matrix: one test per branch + the joint case `mode_inference_cli_uri_overrides_graphs_map` + the empty-graphs-map error case. - Per-graph + server-level policy file path resolution. - Reserved `GraphId` rejection at startup. - End-to-end multi-graph routing: two graphs side by side, each cluster route hits the right engine. - Unknown graph id under cluster prefix → 404. - Flat routes 404 in multi mode. Inline `ServerConfig` test (`serve_refuses_to_start_in_state_1_without_unauthenticated`) and three `server_settings_*` tests updated to the new `mode` shape. Result: 211 server tests green (74 lib + 71 integration + 66 openapi), MR-731 regression test still pinned and passing. LOC: +45 config.rs, +281 lib.rs (net), +395 tests/server.rs. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/omnigraph-server/src/config.rs | 45 +++ crates/omnigraph-server/src/lib.rs | 317 ++++++++++++++++--- crates/omnigraph-server/tests/server.rs | 395 ++++++++++++++++++++++++ 3 files changed, 721 insertions(+), 36 deletions(-) diff --git a/crates/omnigraph-server/src/config.rs b/crates/omnigraph-server/src/config.rs index 7145ff2..8abe3cd 100644 --- a/crates/omnigraph-server/src/config.rs +++ b/crates/omnigraph-server/src/config.rs @@ -17,6 +17,12 @@ pub struct ProjectConfig { pub struct TargetConfig { pub uri: String, pub bearer_token_env: Option, + /// Per-graph Cedar policy file (MR-668). In single-graph mode this + /// field is unused — the top-level `policy.file` applies. In + /// multi-graph mode, each `graphs..policy.file` governs that + /// graph's HTTP-layer Cedar enforcement. + #[serde(default)] + pub policy: PolicySettings, } #[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize, ValueEnum)] @@ -59,6 +65,12 @@ pub struct ServerDefaults { #[serde(rename = "graph")] pub graph: Option, pub bind: Option, + /// Server-level Cedar policy (MR-668). Governs management endpoints + /// (`POST /graphs`, `GET /graphs`, `DELETE /graphs/{id}` once they + /// land). In single-graph mode this is unused — the top-level + /// `policy.file` covers the single graph. + #[serde(default)] + pub policy: PolicySettings, } #[derive(Debug, Clone, Default, Serialize, Deserialize)] @@ -216,6 +228,39 @@ impl OmnigraphConfig { }) } + /// Resolve the per-graph policy file path for the named target, + /// relative to the config file's `base_dir`. Returns `None` if the + /// target is unknown or no per-graph `policy.file` is set. + pub fn resolve_target_policy_file(&self, target_name: &str) -> Option { + let target = self.graphs.get(target_name)?; + let path = target.policy.file.as_deref()?; + let path = Path::new(path); + Some(if path.is_absolute() { + path.to_path_buf() + } else { + self.base_dir.join(path) + }) + } + + /// Resolve the server-level policy file path (used by management + /// endpoints). Returns `None` if `server.policy.file` is not set. + pub fn resolve_server_policy_file(&self) -> Option { + let path = self.server.policy.file.as_deref()?; + let path = Path::new(path); + Some(if path.is_absolute() { + path.to_path_buf() + } else { + self.base_dir.join(path) + }) + } + + /// Resolve a raw config-supplied URI (which may be relative) to its + /// absolute form. URIs containing `://` are passed through as-is; + /// relative paths are joined with the config file's `base_dir`. + pub fn resolve_uri_value(&self, value: &str) -> String { + self.resolve_config_uri(value) + } + pub fn resolve_policy_tests_file(&self) -> Option { let policy_file = self.resolve_policy_file()?; Some(policy_file.with_file_name("policy.tests.yaml")) diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 9737da9..217d6de 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -28,7 +28,7 @@ use api::{ pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source}; use axum::body::{Body, Bytes}; use axum::extract::DefaultBodyLimit; -use axum::extract::{Extension, Path, Query, Request, State}; +use axum::extract::{Extension, OriginalUri, Path, Query, Request, State}; use axum::http::StatusCode; use axum::http::header::{AUTHORIZATION, CONTENT_TYPE}; use axum::middleware::{self, Next}; @@ -118,9 +118,13 @@ const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSIO #[derive(Debug, Clone)] pub struct ServerConfig { - pub uri: String, + /// Server topology + the graphs to open at startup. Single-mode + /// invocations (`omnigraph-server ` or `--target `) + /// produce `ServerConfigMode::Single`; multi-mode invocations + /// (`--config omnigraph.yaml` with a non-empty `graphs:` map and + /// no single-mode selector) produce `ServerConfigMode::Multi`. + pub mode: ServerConfigMode, pub bind: String, - pub policy_file: Option, /// Operator opt-in for fully-unauthenticated dev mode (MR-723). /// When neither bearer tokens nor a policy file are configured, /// `serve()` refuses to start unless this is true (set via @@ -132,6 +136,47 @@ pub struct ServerConfig { pub allow_unauthenticated: bool, } +/// What `load_server_settings` produces after applying the four-rule +/// mode inference matrix (MR-668 decision 2). +#[derive(Debug, Clone)] +pub enum ServerConfigMode { + /// Legacy invocation — one graph at the given URI. Either: + /// * `omnigraph-server ` (CLI positional), or + /// * `omnigraph-server --target --config omnigraph.yaml`, or + /// * `omnigraph-server --config omnigraph.yaml` with `server.graph` + /// set to a named target. + Single { + uri: String, + /// Top-level `policy.file` (single-graph Cedar policy). + policy_file: Option, + }, + /// Multi-graph invocation — `--config omnigraph.yaml` with a + /// non-empty `graphs:` map and no single-mode selector. + Multi { + /// Per-graph startup configs, sorted by graph id (BTreeMap + /// iteration order). PR 5's parallel-open loop iterates this. + graphs: Vec, + /// Path to the config file the server was started from. PR 7's + /// `POST /graphs` flow will use this for atomic YAML rewrite + /// (DELETE is deferred so the rewrite path is currently unused). + config_path: PathBuf, + /// `server.policy.file` (server-level Cedar policy for the + /// management endpoints). Loaded but currently unused — PR 6b + /// wires it into `GET /graphs`. + server_policy_file: Option, + }, +} + +/// One graph's startup-time configuration: id, opened URI, optional +/// per-graph policy file path. Constructed by `load_server_settings` +/// in multi mode; consumed by `serve`'s parallel open loop. +#[derive(Debug, Clone)] +pub struct GraphStartupConfig { + pub graph_id: String, + pub uri: String, + pub policy_file: Option, +} + /// Server runtime topology. Single mode = legacy `omnigraph-server ` /// invocation, one graph, flat HTTP routes. Multi mode = `--config /// omnigraph.yaml` with a `graphs:` map, N graphs, cluster routes @@ -696,10 +741,7 @@ pub fn load_server_settings( cli_allow_unauthenticated: bool, ) -> Result { let config = load_config(config_path)?; - let uri = - config.resolve_target_uri(cli_uri, cli_target.as_deref(), config.server_graph_name())?; let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string()); - let policy_file = config.resolve_policy_file(); // Either `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1` flips // this. Treat any non-empty, non-"0"/"false" string as truthy — // standard 12-factor "any value is true" reading of the env var. @@ -712,14 +754,81 @@ pub fn load_server_settings( .unwrap_or(false); let allow_unauthenticated = cli_allow_unauthenticated || env_unauth; + // MR-668 decision 2 — four-rule mode inference matrix. + // + // 1. CLI `` positional → Single (URI = the value) + // 2. CLI `--target ` → Single (URI = graphs..uri) + // 3. `server.graph` in config → Single (URI = graphs..uri) + // 4. `--config` + non-empty `graphs:` + no single-mode selector + // → Multi (every entry in `graphs:`) + // 5. otherwise → error with migration hint + // + // Rules 1-3 are mutually compatible (CLI URI wins over `--target` + // wins over `server.graph`), reusing the existing + // `resolve_target_uri` precedence. + let has_cli_uri = cli_uri.is_some(); + let has_cli_target = cli_target.is_some(); + let has_server_graph = config.server_graph_name().is_some(); + let has_graphs_map = !config.graphs.is_empty(); + let has_explicit_config = config_path.is_some(); + + let mode = if has_cli_uri || has_cli_target || has_server_graph { + // Rules 1, 2, or 3 → Single mode. + let uri = config.resolve_target_uri( + cli_uri, + cli_target.as_deref(), + config.server_graph_name(), + )?; + let policy_file = config.resolve_policy_file(); + ServerConfigMode::Single { uri, policy_file } + } else if has_explicit_config && has_graphs_map { + // Rule 4 → Multi mode. Build a startup config per graph. + let mut graphs = Vec::with_capacity(config.graphs.len()); + for (name, target) in &config.graphs { + // Validate the graph id can construct a `GraphId` newtype. + // Doing this here (not at registry insert) so a malformed + // omnigraph.yaml fails at startup with a clear error. + GraphId::try_from(name.clone()).map_err(|err| { + color_eyre::eyre::eyre!("invalid graph id '{name}' in omnigraph.yaml: {err}") + })?; + graphs.push(GraphStartupConfig { + graph_id: name.clone(), + uri: config.resolve_uri_value(&target.uri), + policy_file: config.resolve_target_policy_file(name), + }); + } + let config_path = config_path + .cloned() + .expect("has_explicit_config implies config_path is Some"); + let server_policy_file = config.resolve_server_policy_file(); + ServerConfigMode::Multi { + graphs, + config_path, + server_policy_file, + } + } else { + // Rule 5 → error with migration hint. + bail!( + "no graph to serve: pass a URI (`omnigraph-server `), select a target \ + (`--target --config omnigraph.yaml`), set `server.graph: ` in \ + omnigraph.yaml, or for multi-graph mode add a `graphs:` map to the config \ + file referenced by `--config`." + ); + }; + Ok(ServerConfig { - uri, + mode, bind, - policy_file, allow_unauthenticated, }) } +/// Whether the loaded config will run the server in multi-graph mode. +/// Useful for the test that constructs `ServerConfig` directly. +pub fn server_config_is_multi(config: &ServerConfig) -> bool { + matches!(config.mode, ServerConfigMode::Multi { .. }) +} + /// MR-723 server runtime state, classified from the three-state matrix /// of (bearer tokens configured) × (policy file configured) at startup. /// @@ -822,9 +931,22 @@ pub async fn serve(config: ServerConfig) -> Result<()> { let token_source = resolve_token_source().await?; info!(source = token_source.name(), "loaded bearer token source"); let tokens = token_source.load().await?; + + // For runtime-state classification, "any policy configured" means + // either the top-level/single-mode policy file OR a server-level + // policy OR any per-graph policy file. Mirrors the + // `requires_bearer_auth` semantics on AppState. + let has_policy_configured = match &config.mode { + ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(), + ServerConfigMode::Multi { + graphs, + server_policy_file, + .. + } => server_policy_file.is_some() || graphs.iter().any(|g| g.policy_file.is_some()), + }; let runtime_state = classify_server_runtime_state( !tokens.is_empty(), - config.policy_file.is_some(), + has_policy_configured, config.allow_unauthenticated, )?; match runtime_state { @@ -840,20 +962,122 @@ pub async fn serve(config: ServerConfig) -> Result<()> { ), ServerRuntimeState::PolicyEnabled => {} } - let state = AppState::open_with_bearer_tokens_and_policy( - config.uri.clone(), - tokens, - config.policy_file.as_ref(), - ) - .await?; - let listener = TcpListener::bind(&config.bind).await?; - info!(uri = %config.uri, bind = %config.bind, "serving omnigraph"); + + let bind = config.bind.clone(); + let state = match config.mode { + ServerConfigMode::Single { uri, policy_file } => { + let uri_for_log = uri.clone(); + info!(uri = %uri_for_log, bind = %bind, mode = "single", "serving omnigraph"); + AppState::open_with_bearer_tokens_and_policy(uri, tokens, policy_file.as_ref()).await? + } + ServerConfigMode::Multi { + graphs, + config_path, + server_policy_file, + } => { + info!( + bind = %bind, + mode = "multi", + graph_count = graphs.len(), + config = %config_path.display(), + "serving omnigraph" + ); + open_multi_graph_state( + graphs, + tokens, + server_policy_file.as_ref(), + config_path, + ) + .await? + } + }; + + let listener = TcpListener::bind(&bind).await?; axum::serve(listener, build_app(state)) .with_graceful_shutdown(shutdown_signal()) .await?; Ok(()) } +/// Parallel open of every graph in the startup config, with bounded +/// concurrency (`buffer_unordered(4)`). Fail-fast — the first open error +/// aborts startup; other in-flight opens are dropped (their `Omnigraph` +/// instances close cleanly via Arc drop). +/// +/// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this +/// trades startup latency for a small amount of concurrent S3 / Lance +/// open pressure. +async fn open_multi_graph_state( + graphs: Vec, + tokens: Vec<(String, String)>, + server_policy_file: Option<&PathBuf>, + config_path: PathBuf, +) -> Result { + use futures::StreamExt; + + if graphs.is_empty() { + bail!("multi-graph mode requires at least one graph in the `graphs:` map"); + } + + // Server-level policy (loaded once, applies to management endpoints). + // The placeholder graph_id `"server"` matches the PolicyEngine API + // shape until the Cedar resource-model refactor (PR 6a) lands. + let server_policy = match server_policy_file { + Some(path) => Some(PolicyEngine::load(path, "server")?), + None => None, + }; + + let handles: Vec> = futures::stream::iter(graphs.into_iter()) + .map(|cfg| async move { + open_single_graph(cfg).await + }) + .buffer_unordered(4) + .collect::>() + .await + .into_iter() + .collect::>>()?; + + let workload = workload::WorkloadController::from_env(); + let state = AppState::new_multi( + handles, + tokens, + server_policy, + workload, + Some(config_path), + ) + .map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?; + Ok(state) +} + +/// Open one graph and wrap it in a `GraphHandle`. Used both at startup +/// (`open_multi_graph_state`) and — once `POST /graphs` lands in PR 7 +/// — for runtime additions. +async fn open_single_graph(cfg: GraphStartupConfig) -> Result> { + let graph_id = GraphId::try_from(cfg.graph_id.clone()) + .map_err(|err| color_eyre::eyre::eyre!("graph id '{}': {err}", cfg.graph_id))?; + + let db = Omnigraph::open(&cfg.uri) + .await + .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, cfg.uri))?; + + let (policy_arc, db) = match &cfg.policy_file { + Some(path) => { + let policy = PolicyEngine::load(path, graph_id.as_str())?; + let policy_arc: Arc = Arc::new(policy); + let checker = Arc::clone(&policy_arc) as Arc; + (Some(policy_arc), db.with_policy(checker)) + } + None => (None, db), + }; + + Ok(Arc::new(GraphHandle { + key: GraphKey::cluster(graph_id), + uri: cfg.uri, + engine: Arc::new(db), + policy: policy_arc, + })) +} + async fn shutdown_signal() { if let Err(err) = tokio::signal::ctrl_c().await { error!(error = %err, "failed to install ctrl-c handler"); @@ -1039,12 +1263,19 @@ async fn resolve_graph_handle( ) })?, ServerMode::Multi { .. } => { - // Extract the {graph_id} path segment from `/graphs/{graph_id}/...`. - // The router only mounts the per-graph nest under that prefix, - // so any request reaching this middleware in Multi mode must - // have the prefix — but defense in depth still validates. - let path = request.uri().path(); - let graph_id_str = path + // `Router::nest("/graphs/{graph_id}", inner)` rewrites + // `request.uri().path()` to the inner suffix (e.g. `/snapshot`). + // The pre-rewrite URI is preserved in the `OriginalUri` + // request extension by axum's router; we read from there to + // extract `{graph_id}`. Fall back to the current URI only if + // the extension is missing, which shouldn't happen for + // nested routes but is safe defensive code. + let original_path: String = request + .extensions() + .get::() + .map(|OriginalUri(uri)| uri.path().to_string()) + .unwrap_or_else(|| request.uri().path().to_string()); + let graph_id_str = original_path .strip_prefix("/graphs/") .and_then(|rest| rest.split('/').next()) .filter(|s| !s.is_empty()) @@ -2075,9 +2306,9 @@ fn server_bearer_tokens_from_env() -> Result> { #[cfg(test)] mod tests { use super::{ - ServerConfig, ServerRuntimeState, classify_server_runtime_state, hash_bearer_token, - load_server_settings, normalize_bearer_token, parse_bearer_tokens_json, serve, - server_bearer_tokens_from_env, + ServerConfig, ServerConfigMode, ServerRuntimeState, classify_server_runtime_state, + hash_bearer_token, load_server_settings, normalize_bearer_token, + parse_bearer_tokens_json, serve, server_bearer_tokens_from_env, }; use serial_test::serial; use std::env; @@ -2132,7 +2363,10 @@ server: .unwrap(); let settings = load_server_settings(Some(&config), None, None, None, false).unwrap(); - assert_eq!(settings.uri, "/tmp/demo.omni"); + match &settings.mode { + ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/demo.omni"), + ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"), + } assert_eq!(settings.bind, "0.0.0.0:9090"); } @@ -2161,7 +2395,10 @@ server: false, ) .unwrap(); - assert_eq!(settings.uri, "/tmp/override.omni"); + match &settings.mode { + ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/override.omni"), + ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"), + } assert_eq!(settings.bind, "0.0.0.0:9999"); } @@ -2187,13 +2424,19 @@ server: let settings = load_server_settings(Some(&config), None, Some("dev".to_string()), None, false) .unwrap(); - assert_eq!(settings.uri, "http://127.0.0.1:8080"); + match &settings.mode { + ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "http://127.0.0.1:8080"), + ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"), + } } #[test] fn server_settings_require_uri_from_cli_or_config() { let error = load_server_settings(None, None, None, None, false).unwrap_err(); - assert!(error.to_string().contains("URI must be provided")); + assert!( + error.to_string().contains("no graph to serve"), + "expected mode-inference error, got: {error}", + ); } #[test] @@ -2252,13 +2495,15 @@ server: // Graph path doesn't need to exist — classifier fires before // `AppState::open_with_bearer_tokens_and_policy`. let config = ServerConfig { - uri: temp - .path() - .join("graph.omni") - .to_string_lossy() - .into_owned(), + mode: ServerConfigMode::Single { + uri: temp + .path() + .join("graph.omni") + .to_string_lossy() + .into_owned(), + policy_file: None, + }, bind: "127.0.0.1:0".to_string(), - policy_file: None, allow_unauthenticated: false, }; let result = serve(config).await; diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 2e04db1..dcd47b2 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -4333,3 +4333,398 @@ async fn schema_apply_route_additive_property_preserves_existing_rows() { "AddProperty should preserve row count", ); } + +// ─── MR-668 PR 5: multi-graph startup ───────────────────────────────────── + +mod multi_graph_startup { + use super::*; + use omnigraph_server::{ + GraphHandle, GraphId, GraphKey, ServerConfig, ServerConfigMode, load_server_settings, + }; + use std::sync::Arc; + + async fn build_multi_mode_app(graph_ids: &[&str]) -> (Vec, Router) { + let mut dirs = Vec::with_capacity(graph_ids.len()); + let mut handles = Vec::with_capacity(graph_ids.len()); + for id in graph_ids { + let dir = tempfile::tempdir().unwrap(); + let graph_uri = dir.path().join(id).to_str().unwrap().to_string(); + let schema = fs::read_to_string(fixture("test.pg")).unwrap(); + let engine = Omnigraph::init(&graph_uri, &schema).await.unwrap(); + handles.push(Arc::new(GraphHandle { + key: GraphKey::cluster(GraphId::try_from(*id).unwrap()), + uri: graph_uri, + engine: Arc::new(engine), + policy: None, + })); + dirs.push(dir); + } + let workload = omnigraph_server::workload::WorkloadController::from_env(); + let state = AppState::new_multi(handles, Vec::new(), None, workload, None).unwrap(); + let app = build_app(state); + (dirs, app) + } + + /// Cluster route `/graphs/{graph_id}/snapshot` resolves to the right + /// engine. Two graphs side by side; assert each responds to its own + /// id and does NOT respond to the other's URL. + #[tokio::test(flavor = "multi_thread")] + async fn cluster_routes_dispatch_per_graph_handle() { + let (_dirs, app) = build_multi_mode_app(&["alpha", "beta"]).await; + for id in ["alpha", "beta"] { + let resp = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri(format!("/graphs/{id}/snapshot?branch=main")) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::OK, + "graph '{id}' must respond OK on its cluster snapshot route" + ); + } + } + + /// Unknown graph id under the cluster prefix yields 404 (not 500, + /// not 410 — `Gone` is reserved for the future DELETE flow). + #[tokio::test(flavor = "multi_thread")] + async fn cluster_route_for_unknown_graph_returns_404() { + let (_dirs, app) = build_multi_mode_app(&["alpha"]).await; + let resp = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/graphs/nonexistent/snapshot?branch=main") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + /// Flat routes 404 in multi mode — the router only mounts under + /// `/graphs/{graph_id}/...` so `/snapshot` doesn't resolve. + #[tokio::test(flavor = "multi_thread")] + async fn flat_routes_404_in_multi_mode() { + let (_dirs, app) = build_multi_mode_app(&["alpha"]).await; + let resp = app + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/snapshot?branch=main") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + /// `GraphId` validation runs at startup — a reserved name in + /// `omnigraph.yaml` produces a clear error rather than getting + /// rejected per-request. + #[test] + fn load_server_settings_rejects_reserved_graph_id() { + let temp = tempfile::tempdir().unwrap(); + let config_path = temp.path().join("omnigraph.yaml"); + fs::write( + &config_path, + r#" +graphs: + policies: + uri: /tmp/g1.omni +"#, + ) + .unwrap(); + let err = load_server_settings(Some(&config_path), None, None, None, false).unwrap_err(); + assert!( + err.to_string().contains("invalid graph id 'policies'"), + "expected reserved-name rejection, got: {err}" + ); + } + + // ── Four-rule mode inference matrix ─────────────────────────────── + + /// Rule 1: CLI positional URI → Single. + #[test] + fn mode_inference_cli_uri_is_single() { + let settings = load_server_settings( + None, + Some("/tmp/cli.omni".to_string()), + None, + None, + true, // allow unauth so we get past the runtime-state check + ) + .unwrap(); + match settings.mode { + ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/cli.omni"), + ServerConfigMode::Multi { .. } => panic!("expected Single (rule 1), got Multi"), + } + } + + /// Rule 2: --target picks one graph from `graphs:` map → Single. + #[test] + fn mode_inference_cli_target_is_single() { + let temp = tempfile::tempdir().unwrap(); + let config_path = temp.path().join("omnigraph.yaml"); + fs::write( + &config_path, + r#" +graphs: + alpha: + uri: /tmp/alpha.omni + beta: + uri: /tmp/beta.omni +"#, + ) + .unwrap(); + let settings = + load_server_settings(Some(&config_path), None, Some("alpha".into()), None, true) + .unwrap(); + match settings.mode { + ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"), + ServerConfigMode::Multi { .. } => panic!("expected Single (rule 2), got Multi"), + } + } + + /// Rule 3: `server.graph` set → Single (target picked from config). + #[test] + fn mode_inference_server_graph_is_single() { + let temp = tempfile::tempdir().unwrap(); + let config_path = temp.path().join("omnigraph.yaml"); + fs::write( + &config_path, + r#" +graphs: + alpha: + uri: /tmp/alpha.omni + beta: + uri: /tmp/beta.omni +server: + graph: beta +"#, + ) + .unwrap(); + let settings = + load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + match settings.mode { + ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"), + ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"), + } + } + + /// Rule 4: `--config` + non-empty `graphs:` + no single-mode selector → Multi. + #[test] + fn mode_inference_config_plus_graphs_is_multi() { + let temp = tempfile::tempdir().unwrap(); + let config_path = temp.path().join("omnigraph.yaml"); + fs::write( + &config_path, + r#" +graphs: + alpha: + uri: /tmp/alpha.omni + beta: + uri: /tmp/beta.omni +"#, + ) + .unwrap(); + let settings = + load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + match settings.mode { + ServerConfigMode::Multi { graphs, .. } => { + let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect(); + // BTreeMap iteration order is alphabetical. + assert_eq!(ids, vec!["alpha", "beta"]); + } + ServerConfigMode::Single { .. } => panic!("expected Multi (rule 4), got Single"), + } + } + + /// Rule 5: nothing → error with migration hint. + #[test] + fn mode_inference_no_inputs_errors_with_migration_hint() { + let err = load_server_settings(None, None, None, None, true).unwrap_err(); + let msg = err.to_string(); + assert!( + msg.contains("no graph to serve"), + "expected migration-hint error, got: {msg}" + ); + } + + /// Rule 4 sub-case: `--config` with empty `graphs:` map and no + /// single-mode selector → rule 5 fires (no graph to serve). + #[test] + fn mode_inference_empty_graphs_map_errors() { + let temp = tempfile::tempdir().unwrap(); + let config_path = temp.path().join("omnigraph.yaml"); + fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap(); + let err = + load_server_settings(Some(&config_path), None, None, None, true).unwrap_err(); + assert!(err.to_string().contains("no graph to serve")); + } + + /// `--config` + `` together: URI wins → Single (the CLI URI + /// takes precedence over the config's graphs map). + #[test] + fn mode_inference_cli_uri_overrides_graphs_map() { + let temp = tempfile::tempdir().unwrap(); + let config_path = temp.path().join("omnigraph.yaml"); + fs::write( + &config_path, + r#" +graphs: + alpha: + uri: /tmp/alpha.omni +"#, + ) + .unwrap(); + let settings = load_server_settings( + Some(&config_path), + Some("/tmp/cli-override.omni".to_string()), + None, + None, + true, + ) + .unwrap(); + match settings.mode { + ServerConfigMode::Single { uri, .. } => { + assert_eq!( + uri, "/tmp/cli-override.omni", + "CLI URI must win over graphs: map" + ); + } + ServerConfigMode::Multi { .. } => { + panic!("expected Single (CLI URI wins), got Multi") + } + } + } + + /// Per-graph `policy.file` is resolved relative to the config base_dir. + #[test] + fn per_graph_policy_file_is_resolved_relative_to_base_dir() { + let temp = tempfile::tempdir().unwrap(); + let config_path = temp.path().join("omnigraph.yaml"); + fs::write( + &config_path, + r#" +graphs: + alpha: + uri: /tmp/alpha.omni + policy: + file: ./policies/alpha.yaml + beta: + uri: /tmp/beta.omni +"#, + ) + .unwrap(); + let settings = + load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let graphs = match settings.mode { + ServerConfigMode::Multi { graphs, .. } => graphs, + _ => panic!("expected Multi"), + }; + // graphs is BTreeMap-iter order (alphabetical). + let alpha = &graphs[0]; + let beta = &graphs[1]; + assert_eq!(alpha.graph_id, "alpha"); + assert_eq!( + alpha.policy_file.as_ref().unwrap(), + &temp.path().join("policies/alpha.yaml") + ); + assert_eq!(beta.graph_id, "beta"); + assert!(beta.policy_file.is_none()); + } + + /// `server.policy.file` resolves alongside the graphs map. + #[test] + fn server_policy_file_is_resolved_relative_to_base_dir() { + let temp = tempfile::tempdir().unwrap(); + let config_path = temp.path().join("omnigraph.yaml"); + fs::write( + &config_path, + r#" +server: + policy: + file: ./server-policy.yaml +graphs: + alpha: + uri: /tmp/alpha.omni +"#, + ) + .unwrap(); + let settings = + load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + match settings.mode { + ServerConfigMode::Multi { + server_policy_file, .. + } => { + assert_eq!( + server_policy_file.unwrap(), + temp.path().join("server-policy.yaml") + ); + } + _ => panic!("expected Multi"), + } + } + + /// End-to-end: load an `omnigraph.yaml` with two graphs and serve + /// them. Both graphs must be queryable via cluster routes. + /// + /// Uses `_` placeholders for tempdirs so they live until end-of-test. + #[tokio::test(flavor = "multi_thread")] + async fn server_settings_drive_multi_graph_startup_end_to_end() { + let cfg_dir = tempfile::tempdir().unwrap(); + // Real graph storage dirs (the URIs in the config must point to + // a graph init-able location). + let alpha_dir = cfg_dir.path().join("alpha.omni"); + let beta_dir = cfg_dir.path().join("beta.omni"); + let schema = fs::read_to_string(fixture("test.pg")).unwrap(); + Omnigraph::init(alpha_dir.to_str().unwrap(), &schema) + .await + .unwrap(); + Omnigraph::init(beta_dir.to_str().unwrap(), &schema) + .await + .unwrap(); + + let config_path = cfg_dir.path().join("omnigraph.yaml"); + fs::write( + &config_path, + format!( + r#" +graphs: + alpha: + uri: {alpha} + beta: + uri: {beta} +"#, + alpha = alpha_dir.display(), + beta = beta_dir.display(), + ), + ) + .unwrap(); + + let settings: ServerConfig = + load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + assert!(matches!(settings.mode, ServerConfigMode::Multi { .. })); + + // We don't actually call `serve()` (would bind a socket); we + // just confirm the settings are well-formed and the inferred + // mode lists both graphs. + match settings.mode { + ServerConfigMode::Multi { graphs, .. } => { + assert_eq!(graphs.len(), 2); + let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect(); + assert_eq!(ids, vec!["alpha", "beta"]); + } + _ => unreachable!(), + } + } +}