mirror of
https://github.com/ModernRelay/omnigraph.git
synced 2026-06-12 01:45:14 +02:00
mr-668: multi-graph startup + mode inference (PR 5/10)
PR 5 of the MR-668 multi-graph server work. This is the first PR that
makes multi mode actually usable end-to-end: operators invoking
`omnigraph-server --config omnigraph.yaml` with a non-empty `graphs:`
map and no single-mode selector now get a running multi-graph server.
Mode inference (MR-668 decision 2, four-rule matrix in
`load_server_settings`):
1. CLI `<URI>` positional → Single
2. CLI `--target <name>` → Single (URI from graphs.<name>)
3. `server.graph` in config → Single (URI from graphs.<name>)
4. `--config` + non-empty `graphs:` + no single-mode selector
→ Multi (all entries in `graphs:`)
5. otherwise → error with migration hint
Rule 5's error message names every escape hatch so operators can fix
their invocation without grepping docs.
Config schema extensions:
- `TargetConfig.policy: PolicySettings` (per-graph Cedar policy file).
`#[serde(default)]` so existing single-graph YAMLs keep parsing.
- `ServerDefaults.policy: PolicySettings` (server-level Cedar policy
for management endpoints — loaded in PR 5, wired into `GET /graphs`
in PR 6b).
- `OmnigraphConfig::resolve_target_policy_file(name)` and
`resolve_server_policy_file()` helpers — both resolve relative to
the config file's `base_dir`.
Public types added to `omnigraph-server`:
- `ServerConfigMode { Single { uri, policy_file } | Multi { graphs,
config_path, server_policy_file } }`.
- `GraphStartupConfig { graph_id, uri, policy_file }` — one entry
per graph in multi mode.
`ServerConfig` shape change:
- WAS: `{ uri: String, bind, policy_file, allow_unauthenticated }`.
- NOW: `{ mode: ServerConfigMode, bind, allow_unauthenticated }`.
- Breaking for any code that constructs `ServerConfig` directly.
`main.rs` is unaffected (uses `load_server_settings`).
`serve()` now forks on `ServerConfig.mode`:
- Single: existing flow via `AppState::open_with_bearer_tokens_and_policy`.
- Multi: parallel open via `futures::stream::iter(graphs)
.map(open_single_graph).buffer_unordered(4).collect()`. Bound 4 is
a rule-of-thumb for I/O-bound work — at N≤10 this trades startup
latency for a small amount of concurrent S3/Lance open pressure.
Fail-fast: first open error aborts startup; in-flight opens drop
their engine via Arc (Lance datasets close cleanly).
New helper `open_single_graph(GraphStartupConfig)`:
- Validates `GraphId` per the regex in PR 1.
- `Omnigraph::open(uri).await` with descriptive error context.
- Loads per-graph policy file and re-applies it via
`Omnigraph::with_policy` (engine-layer enforcement, MR-722).
- Returns `Arc<GraphHandle>` ready for the registry.
Routing middleware bug fix:
- `Router::nest("/graphs/{graph_id}", inner)` rewrites
`request.uri().path()` to the inner suffix (e.g. `/snapshot`).
The previous middleware tried to parse `{graph_id}` from
`request.uri().path()` and got 400 instead of 200. Fixed by reading
from `axum::extract::OriginalUri` request extension, which preserves
the pre-rewrite URI.
- Caught by the two new tests
`cluster_routes_dispatch_per_graph_handle` and
`cluster_route_for_unknown_graph_returns_404`.
Tests (14 new, all passing):
- Four-rule matrix: one test per branch + the joint case
`mode_inference_cli_uri_overrides_graphs_map` + the empty-graphs-map
error case.
- Per-graph + server-level policy file path resolution.
- Reserved `GraphId` rejection at startup.
- End-to-end multi-graph routing: two graphs side by side, each
cluster route hits the right engine.
- Unknown graph id under cluster prefix → 404.
- Flat routes 404 in multi mode.
Inline `ServerConfig` test (`serve_refuses_to_start_in_state_1_without_unauthenticated`)
and three `server_settings_*` tests updated to the new `mode` shape.
Result: 211 server tests green (74 lib + 71 integration + 66 openapi),
MR-731 regression test still pinned and passing.
LOC: +45 config.rs, +281 lib.rs (net), +395 tests/server.rs.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
ecf01ef3fe
commit
4df361d152
3 changed files with 721 additions and 36 deletions
|
|
@ -17,6 +17,12 @@ pub struct ProjectConfig {
|
|||
pub struct TargetConfig {
|
||||
pub uri: String,
|
||||
pub bearer_token_env: Option<String>,
|
||||
/// Per-graph Cedar policy file (MR-668). In single-graph mode this
|
||||
/// field is unused — the top-level `policy.file` applies. In
|
||||
/// multi-graph mode, each `graphs.<id>.policy.file` governs that
|
||||
/// graph's HTTP-layer Cedar enforcement.
|
||||
#[serde(default)]
|
||||
pub policy: PolicySettings,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize, Deserialize, ValueEnum)]
|
||||
|
|
@ -59,6 +65,12 @@ pub struct ServerDefaults {
|
|||
#[serde(rename = "graph")]
|
||||
pub graph: Option<String>,
|
||||
pub bind: Option<String>,
|
||||
/// Server-level Cedar policy (MR-668). Governs management endpoints
|
||||
/// (`POST /graphs`, `GET /graphs`, `DELETE /graphs/{id}` once they
|
||||
/// land). In single-graph mode this is unused — the top-level
|
||||
/// `policy.file` covers the single graph.
|
||||
#[serde(default)]
|
||||
pub policy: PolicySettings,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
|
|
@ -216,6 +228,39 @@ impl OmnigraphConfig {
|
|||
})
|
||||
}
|
||||
|
||||
/// Resolve the per-graph policy file path for the named target,
|
||||
/// relative to the config file's `base_dir`. Returns `None` if the
|
||||
/// target is unknown or no per-graph `policy.file` is set.
|
||||
pub fn resolve_target_policy_file(&self, target_name: &str) -> Option<PathBuf> {
|
||||
let target = self.graphs.get(target_name)?;
|
||||
let path = target.policy.file.as_deref()?;
|
||||
let path = Path::new(path);
|
||||
Some(if path.is_absolute() {
|
||||
path.to_path_buf()
|
||||
} else {
|
||||
self.base_dir.join(path)
|
||||
})
|
||||
}
|
||||
|
||||
/// Resolve the server-level policy file path (used by management
|
||||
/// endpoints). Returns `None` if `server.policy.file` is not set.
|
||||
pub fn resolve_server_policy_file(&self) -> Option<PathBuf> {
|
||||
let path = self.server.policy.file.as_deref()?;
|
||||
let path = Path::new(path);
|
||||
Some(if path.is_absolute() {
|
||||
path.to_path_buf()
|
||||
} else {
|
||||
self.base_dir.join(path)
|
||||
})
|
||||
}
|
||||
|
||||
/// Resolve a raw config-supplied URI (which may be relative) to its
|
||||
/// absolute form. URIs containing `://` are passed through as-is;
|
||||
/// relative paths are joined with the config file's `base_dir`.
|
||||
pub fn resolve_uri_value(&self, value: &str) -> String {
|
||||
self.resolve_config_uri(value)
|
||||
}
|
||||
|
||||
pub fn resolve_policy_tests_file(&self) -> Option<PathBuf> {
|
||||
let policy_file = self.resolve_policy_file()?;
|
||||
Some(policy_file.with_file_name("policy.tests.yaml"))
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ use api::{
|
|||
pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
|
||||
use axum::body::{Body, Bytes};
|
||||
use axum::extract::DefaultBodyLimit;
|
||||
use axum::extract::{Extension, Path, Query, Request, State};
|
||||
use axum::extract::{Extension, OriginalUri, Path, Query, Request, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::http::header::{AUTHORIZATION, CONTENT_TYPE};
|
||||
use axum::middleware::{self, Next};
|
||||
|
|
@ -118,9 +118,13 @@ const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSIO
|
|||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServerConfig {
|
||||
pub uri: String,
|
||||
/// Server topology + the graphs to open at startup. Single-mode
|
||||
/// invocations (`omnigraph-server <URI>` or `--target <name>`)
|
||||
/// produce `ServerConfigMode::Single`; multi-mode invocations
|
||||
/// (`--config omnigraph.yaml` with a non-empty `graphs:` map and
|
||||
/// no single-mode selector) produce `ServerConfigMode::Multi`.
|
||||
pub mode: ServerConfigMode,
|
||||
pub bind: String,
|
||||
pub policy_file: Option<PathBuf>,
|
||||
/// Operator opt-in for fully-unauthenticated dev mode (MR-723).
|
||||
/// When neither bearer tokens nor a policy file are configured,
|
||||
/// `serve()` refuses to start unless this is true (set via
|
||||
|
|
@ -132,6 +136,47 @@ pub struct ServerConfig {
|
|||
pub allow_unauthenticated: bool,
|
||||
}
|
||||
|
||||
/// What `load_server_settings` produces after applying the four-rule
|
||||
/// mode inference matrix (MR-668 decision 2).
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ServerConfigMode {
|
||||
/// Legacy invocation — one graph at the given URI. Either:
|
||||
/// * `omnigraph-server <URI>` (CLI positional), or
|
||||
/// * `omnigraph-server --target <name> --config omnigraph.yaml`, or
|
||||
/// * `omnigraph-server --config omnigraph.yaml` with `server.graph`
|
||||
/// set to a named target.
|
||||
Single {
|
||||
uri: String,
|
||||
/// Top-level `policy.file` (single-graph Cedar policy).
|
||||
policy_file: Option<PathBuf>,
|
||||
},
|
||||
/// Multi-graph invocation — `--config omnigraph.yaml` with a
|
||||
/// non-empty `graphs:` map and no single-mode selector.
|
||||
Multi {
|
||||
/// Per-graph startup configs, sorted by graph id (BTreeMap
|
||||
/// iteration order). PR 5's parallel-open loop iterates this.
|
||||
graphs: Vec<GraphStartupConfig>,
|
||||
/// Path to the config file the server was started from. PR 7's
|
||||
/// `POST /graphs` flow will use this for atomic YAML rewrite
|
||||
/// (DELETE is deferred so the rewrite path is currently unused).
|
||||
config_path: PathBuf,
|
||||
/// `server.policy.file` (server-level Cedar policy for the
|
||||
/// management endpoints). Loaded but currently unused — PR 6b
|
||||
/// wires it into `GET /graphs`.
|
||||
server_policy_file: Option<PathBuf>,
|
||||
},
|
||||
}
|
||||
|
||||
/// One graph's startup-time configuration: id, opened URI, optional
|
||||
/// per-graph policy file path. Constructed by `load_server_settings`
|
||||
/// in multi mode; consumed by `serve`'s parallel open loop.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct GraphStartupConfig {
|
||||
pub graph_id: String,
|
||||
pub uri: String,
|
||||
pub policy_file: Option<PathBuf>,
|
||||
}
|
||||
|
||||
/// Server runtime topology. Single mode = legacy `omnigraph-server <URI>`
|
||||
/// invocation, one graph, flat HTTP routes. Multi mode = `--config
|
||||
/// omnigraph.yaml` with a `graphs:` map, N graphs, cluster routes
|
||||
|
|
@ -696,10 +741,7 @@ pub fn load_server_settings(
|
|||
cli_allow_unauthenticated: bool,
|
||||
) -> Result<ServerConfig> {
|
||||
let config = load_config(config_path)?;
|
||||
let uri =
|
||||
config.resolve_target_uri(cli_uri, cli_target.as_deref(), config.server_graph_name())?;
|
||||
let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
|
||||
let policy_file = config.resolve_policy_file();
|
||||
// Either `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1` flips
|
||||
// this. Treat any non-empty, non-"0"/"false" string as truthy —
|
||||
// standard 12-factor "any value is true" reading of the env var.
|
||||
|
|
@ -712,14 +754,81 @@ pub fn load_server_settings(
|
|||
.unwrap_or(false);
|
||||
let allow_unauthenticated = cli_allow_unauthenticated || env_unauth;
|
||||
|
||||
// MR-668 decision 2 — four-rule mode inference matrix.
|
||||
//
|
||||
// 1. CLI `<URI>` positional → Single (URI = the value)
|
||||
// 2. CLI `--target <name>` → Single (URI = graphs.<name>.uri)
|
||||
// 3. `server.graph` in config → Single (URI = graphs.<server.graph>.uri)
|
||||
// 4. `--config` + non-empty `graphs:` + no single-mode selector
|
||||
// → Multi (every entry in `graphs:`)
|
||||
// 5. otherwise → error with migration hint
|
||||
//
|
||||
// Rules 1-3 are mutually compatible (CLI URI wins over `--target`
|
||||
// wins over `server.graph`), reusing the existing
|
||||
// `resolve_target_uri` precedence.
|
||||
let has_cli_uri = cli_uri.is_some();
|
||||
let has_cli_target = cli_target.is_some();
|
||||
let has_server_graph = config.server_graph_name().is_some();
|
||||
let has_graphs_map = !config.graphs.is_empty();
|
||||
let has_explicit_config = config_path.is_some();
|
||||
|
||||
let mode = if has_cli_uri || has_cli_target || has_server_graph {
|
||||
// Rules 1, 2, or 3 → Single mode.
|
||||
let uri = config.resolve_target_uri(
|
||||
cli_uri,
|
||||
cli_target.as_deref(),
|
||||
config.server_graph_name(),
|
||||
)?;
|
||||
let policy_file = config.resolve_policy_file();
|
||||
ServerConfigMode::Single { uri, policy_file }
|
||||
} else if has_explicit_config && has_graphs_map {
|
||||
// Rule 4 → Multi mode. Build a startup config per graph.
|
||||
let mut graphs = Vec::with_capacity(config.graphs.len());
|
||||
for (name, target) in &config.graphs {
|
||||
// Validate the graph id can construct a `GraphId` newtype.
|
||||
// Doing this here (not at registry insert) so a malformed
|
||||
// omnigraph.yaml fails at startup with a clear error.
|
||||
GraphId::try_from(name.clone()).map_err(|err| {
|
||||
color_eyre::eyre::eyre!("invalid graph id '{name}' in omnigraph.yaml: {err}")
|
||||
})?;
|
||||
graphs.push(GraphStartupConfig {
|
||||
graph_id: name.clone(),
|
||||
uri: config.resolve_uri_value(&target.uri),
|
||||
policy_file: config.resolve_target_policy_file(name),
|
||||
});
|
||||
}
|
||||
let config_path = config_path
|
||||
.cloned()
|
||||
.expect("has_explicit_config implies config_path is Some");
|
||||
let server_policy_file = config.resolve_server_policy_file();
|
||||
ServerConfigMode::Multi {
|
||||
graphs,
|
||||
config_path,
|
||||
server_policy_file,
|
||||
}
|
||||
} else {
|
||||
// Rule 5 → error with migration hint.
|
||||
bail!(
|
||||
"no graph to serve: pass a URI (`omnigraph-server <URI>`), select a target \
|
||||
(`--target <name> --config omnigraph.yaml`), set `server.graph: <name>` in \
|
||||
omnigraph.yaml, or for multi-graph mode add a `graphs:` map to the config \
|
||||
file referenced by `--config`."
|
||||
);
|
||||
};
|
||||
|
||||
Ok(ServerConfig {
|
||||
uri,
|
||||
mode,
|
||||
bind,
|
||||
policy_file,
|
||||
allow_unauthenticated,
|
||||
})
|
||||
}
|
||||
|
||||
/// Whether the loaded config will run the server in multi-graph mode.
|
||||
/// Useful for the test that constructs `ServerConfig` directly.
|
||||
pub fn server_config_is_multi(config: &ServerConfig) -> bool {
|
||||
matches!(config.mode, ServerConfigMode::Multi { .. })
|
||||
}
|
||||
|
||||
/// MR-723 server runtime state, classified from the three-state matrix
|
||||
/// of (bearer tokens configured) × (policy file configured) at startup.
|
||||
///
|
||||
|
|
@ -822,9 +931,22 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
|
|||
let token_source = resolve_token_source().await?;
|
||||
info!(source = token_source.name(), "loaded bearer token source");
|
||||
let tokens = token_source.load().await?;
|
||||
|
||||
// For runtime-state classification, "any policy configured" means
|
||||
// either the top-level/single-mode policy file OR a server-level
|
||||
// policy OR any per-graph policy file. Mirrors the
|
||||
// `requires_bearer_auth` semantics on AppState.
|
||||
let has_policy_configured = match &config.mode {
|
||||
ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(),
|
||||
ServerConfigMode::Multi {
|
||||
graphs,
|
||||
server_policy_file,
|
||||
..
|
||||
} => server_policy_file.is_some() || graphs.iter().any(|g| g.policy_file.is_some()),
|
||||
};
|
||||
let runtime_state = classify_server_runtime_state(
|
||||
!tokens.is_empty(),
|
||||
config.policy_file.is_some(),
|
||||
has_policy_configured,
|
||||
config.allow_unauthenticated,
|
||||
)?;
|
||||
match runtime_state {
|
||||
|
|
@ -840,20 +962,122 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
|
|||
),
|
||||
ServerRuntimeState::PolicyEnabled => {}
|
||||
}
|
||||
let state = AppState::open_with_bearer_tokens_and_policy(
|
||||
config.uri.clone(),
|
||||
tokens,
|
||||
config.policy_file.as_ref(),
|
||||
)
|
||||
.await?;
|
||||
let listener = TcpListener::bind(&config.bind).await?;
|
||||
info!(uri = %config.uri, bind = %config.bind, "serving omnigraph");
|
||||
|
||||
let bind = config.bind.clone();
|
||||
let state = match config.mode {
|
||||
ServerConfigMode::Single { uri, policy_file } => {
|
||||
let uri_for_log = uri.clone();
|
||||
info!(uri = %uri_for_log, bind = %bind, mode = "single", "serving omnigraph");
|
||||
AppState::open_with_bearer_tokens_and_policy(uri, tokens, policy_file.as_ref()).await?
|
||||
}
|
||||
ServerConfigMode::Multi {
|
||||
graphs,
|
||||
config_path,
|
||||
server_policy_file,
|
||||
} => {
|
||||
info!(
|
||||
bind = %bind,
|
||||
mode = "multi",
|
||||
graph_count = graphs.len(),
|
||||
config = %config_path.display(),
|
||||
"serving omnigraph"
|
||||
);
|
||||
open_multi_graph_state(
|
||||
graphs,
|
||||
tokens,
|
||||
server_policy_file.as_ref(),
|
||||
config_path,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
let listener = TcpListener::bind(&bind).await?;
|
||||
axum::serve(listener, build_app(state))
|
||||
.with_graceful_shutdown(shutdown_signal())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Parallel open of every graph in the startup config, with bounded
|
||||
/// concurrency (`buffer_unordered(4)`). Fail-fast — the first open error
|
||||
/// aborts startup; other in-flight opens are dropped (their `Omnigraph`
|
||||
/// instances close cleanly via Arc drop).
|
||||
///
|
||||
/// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this
|
||||
/// trades startup latency for a small amount of concurrent S3 / Lance
|
||||
/// open pressure.
|
||||
async fn open_multi_graph_state(
|
||||
graphs: Vec<GraphStartupConfig>,
|
||||
tokens: Vec<(String, String)>,
|
||||
server_policy_file: Option<&PathBuf>,
|
||||
config_path: PathBuf,
|
||||
) -> Result<AppState> {
|
||||
use futures::StreamExt;
|
||||
|
||||
if graphs.is_empty() {
|
||||
bail!("multi-graph mode requires at least one graph in the `graphs:` map");
|
||||
}
|
||||
|
||||
// Server-level policy (loaded once, applies to management endpoints).
|
||||
// The placeholder graph_id `"server"` matches the PolicyEngine API
|
||||
// shape until the Cedar resource-model refactor (PR 6a) lands.
|
||||
let server_policy = match server_policy_file {
|
||||
Some(path) => Some(PolicyEngine::load(path, "server")?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let handles: Vec<Arc<GraphHandle>> = futures::stream::iter(graphs.into_iter())
|
||||
.map(|cfg| async move {
|
||||
open_single_graph(cfg).await
|
||||
})
|
||||
.buffer_unordered(4)
|
||||
.collect::<Vec<_>>()
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let workload = workload::WorkloadController::from_env();
|
||||
let state = AppState::new_multi(
|
||||
handles,
|
||||
tokens,
|
||||
server_policy,
|
||||
workload,
|
||||
Some(config_path),
|
||||
)
|
||||
.map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?;
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
/// Open one graph and wrap it in a `GraphHandle`. Used both at startup
|
||||
/// (`open_multi_graph_state`) and — once `POST /graphs` lands in PR 7
|
||||
/// — for runtime additions.
|
||||
async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>> {
|
||||
let graph_id = GraphId::try_from(cfg.graph_id.clone())
|
||||
.map_err(|err| color_eyre::eyre::eyre!("graph id '{}': {err}", cfg.graph_id))?;
|
||||
|
||||
let db = Omnigraph::open(&cfg.uri)
|
||||
.await
|
||||
.map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, cfg.uri))?;
|
||||
|
||||
let (policy_arc, db) = match &cfg.policy_file {
|
||||
Some(path) => {
|
||||
let policy = PolicyEngine::load(path, graph_id.as_str())?;
|
||||
let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
|
||||
let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
|
||||
(Some(policy_arc), db.with_policy(checker))
|
||||
}
|
||||
None => (None, db),
|
||||
};
|
||||
|
||||
Ok(Arc::new(GraphHandle {
|
||||
key: GraphKey::cluster(graph_id),
|
||||
uri: cfg.uri,
|
||||
engine: Arc::new(db),
|
||||
policy: policy_arc,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn shutdown_signal() {
|
||||
if let Err(err) = tokio::signal::ctrl_c().await {
|
||||
error!(error = %err, "failed to install ctrl-c handler");
|
||||
|
|
@ -1039,12 +1263,19 @@ async fn resolve_graph_handle(
|
|||
)
|
||||
})?,
|
||||
ServerMode::Multi { .. } => {
|
||||
// Extract the {graph_id} path segment from `/graphs/{graph_id}/...`.
|
||||
// The router only mounts the per-graph nest under that prefix,
|
||||
// so any request reaching this middleware in Multi mode must
|
||||
// have the prefix — but defense in depth still validates.
|
||||
let path = request.uri().path();
|
||||
let graph_id_str = path
|
||||
// `Router::nest("/graphs/{graph_id}", inner)` rewrites
|
||||
// `request.uri().path()` to the inner suffix (e.g. `/snapshot`).
|
||||
// The pre-rewrite URI is preserved in the `OriginalUri`
|
||||
// request extension by axum's router; we read from there to
|
||||
// extract `{graph_id}`. Fall back to the current URI only if
|
||||
// the extension is missing, which shouldn't happen for
|
||||
// nested routes but is safe defensive code.
|
||||
let original_path: String = request
|
||||
.extensions()
|
||||
.get::<OriginalUri>()
|
||||
.map(|OriginalUri(uri)| uri.path().to_string())
|
||||
.unwrap_or_else(|| request.uri().path().to_string());
|
||||
let graph_id_str = original_path
|
||||
.strip_prefix("/graphs/")
|
||||
.and_then(|rest| rest.split('/').next())
|
||||
.filter(|s| !s.is_empty())
|
||||
|
|
@ -2075,9 +2306,9 @@ fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{
|
||||
ServerConfig, ServerRuntimeState, classify_server_runtime_state, hash_bearer_token,
|
||||
load_server_settings, normalize_bearer_token, parse_bearer_tokens_json, serve,
|
||||
server_bearer_tokens_from_env,
|
||||
ServerConfig, ServerConfigMode, ServerRuntimeState, classify_server_runtime_state,
|
||||
hash_bearer_token, load_server_settings, normalize_bearer_token,
|
||||
parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
|
||||
};
|
||||
use serial_test::serial;
|
||||
use std::env;
|
||||
|
|
@ -2132,7 +2363,10 @@ server:
|
|||
.unwrap();
|
||||
|
||||
let settings = load_server_settings(Some(&config), None, None, None, false).unwrap();
|
||||
assert_eq!(settings.uri, "/tmp/demo.omni");
|
||||
match &settings.mode {
|
||||
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/demo.omni"),
|
||||
ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
|
||||
}
|
||||
assert_eq!(settings.bind, "0.0.0.0:9090");
|
||||
}
|
||||
|
||||
|
|
@ -2161,7 +2395,10 @@ server:
|
|||
false,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(settings.uri, "/tmp/override.omni");
|
||||
match &settings.mode {
|
||||
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/override.omni"),
|
||||
ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
|
||||
}
|
||||
assert_eq!(settings.bind, "0.0.0.0:9999");
|
||||
}
|
||||
|
||||
|
|
@ -2187,13 +2424,19 @@ server:
|
|||
let settings =
|
||||
load_server_settings(Some(&config), None, Some("dev".to_string()), None, false)
|
||||
.unwrap();
|
||||
assert_eq!(settings.uri, "http://127.0.0.1:8080");
|
||||
match &settings.mode {
|
||||
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "http://127.0.0.1:8080"),
|
||||
ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_settings_require_uri_from_cli_or_config() {
|
||||
let error = load_server_settings(None, None, None, None, false).unwrap_err();
|
||||
assert!(error.to_string().contains("URI must be provided"));
|
||||
assert!(
|
||||
error.to_string().contains("no graph to serve"),
|
||||
"expected mode-inference error, got: {error}",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -2252,13 +2495,15 @@ server:
|
|||
// Graph path doesn't need to exist — classifier fires before
|
||||
// `AppState::open_with_bearer_tokens_and_policy`.
|
||||
let config = ServerConfig {
|
||||
uri: temp
|
||||
.path()
|
||||
.join("graph.omni")
|
||||
.to_string_lossy()
|
||||
.into_owned(),
|
||||
mode: ServerConfigMode::Single {
|
||||
uri: temp
|
||||
.path()
|
||||
.join("graph.omni")
|
||||
.to_string_lossy()
|
||||
.into_owned(),
|
||||
policy_file: None,
|
||||
},
|
||||
bind: "127.0.0.1:0".to_string(),
|
||||
policy_file: None,
|
||||
allow_unauthenticated: false,
|
||||
};
|
||||
let result = serve(config).await;
|
||||
|
|
|
|||
|
|
@ -4333,3 +4333,398 @@ async fn schema_apply_route_additive_property_preserves_existing_rows() {
|
|||
"AddProperty should preserve row count",
|
||||
);
|
||||
}
|
||||
|
||||
// ─── MR-668 PR 5: multi-graph startup ─────────────────────────────────────
|
||||
|
||||
mod multi_graph_startup {
|
||||
use super::*;
|
||||
use omnigraph_server::{
|
||||
GraphHandle, GraphId, GraphKey, ServerConfig, ServerConfigMode, load_server_settings,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
async fn build_multi_mode_app(graph_ids: &[&str]) -> (Vec<tempfile::TempDir>, Router) {
|
||||
let mut dirs = Vec::with_capacity(graph_ids.len());
|
||||
let mut handles = Vec::with_capacity(graph_ids.len());
|
||||
for id in graph_ids {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let graph_uri = dir.path().join(id).to_str().unwrap().to_string();
|
||||
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
|
||||
let engine = Omnigraph::init(&graph_uri, &schema).await.unwrap();
|
||||
handles.push(Arc::new(GraphHandle {
|
||||
key: GraphKey::cluster(GraphId::try_from(*id).unwrap()),
|
||||
uri: graph_uri,
|
||||
engine: Arc::new(engine),
|
||||
policy: None,
|
||||
}));
|
||||
dirs.push(dir);
|
||||
}
|
||||
let workload = omnigraph_server::workload::WorkloadController::from_env();
|
||||
let state = AppState::new_multi(handles, Vec::new(), None, workload, None).unwrap();
|
||||
let app = build_app(state);
|
||||
(dirs, app)
|
||||
}
|
||||
|
||||
/// Cluster route `/graphs/{graph_id}/snapshot` resolves to the right
|
||||
/// engine. Two graphs side by side; assert each responds to its own
|
||||
/// id and does NOT respond to the other's URL.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn cluster_routes_dispatch_per_graph_handle() {
|
||||
let (_dirs, app) = build_multi_mode_app(&["alpha", "beta"]).await;
|
||||
for id in ["alpha", "beta"] {
|
||||
let resp = app
|
||||
.clone()
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method(Method::GET)
|
||||
.uri(format!("/graphs/{id}/snapshot?branch=main"))
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
resp.status(),
|
||||
StatusCode::OK,
|
||||
"graph '{id}' must respond OK on its cluster snapshot route"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Unknown graph id under the cluster prefix yields 404 (not 500,
|
||||
/// not 410 — `Gone` is reserved for the future DELETE flow).
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn cluster_route_for_unknown_graph_returns_404() {
|
||||
let (_dirs, app) = build_multi_mode_app(&["alpha"]).await;
|
||||
let resp = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method(Method::GET)
|
||||
.uri("/graphs/nonexistent/snapshot?branch=main")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
/// Flat routes 404 in multi mode — the router only mounts under
|
||||
/// `/graphs/{graph_id}/...` so `/snapshot` doesn't resolve.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn flat_routes_404_in_multi_mode() {
|
||||
let (_dirs, app) = build_multi_mode_app(&["alpha"]).await;
|
||||
let resp = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method(Method::GET)
|
||||
.uri("/snapshot?branch=main")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
||||
}
|
||||
|
||||
/// `GraphId` validation runs at startup — a reserved name in
|
||||
/// `omnigraph.yaml` produces a clear error rather than getting
|
||||
/// rejected per-request.
|
||||
#[test]
|
||||
fn load_server_settings_rejects_reserved_graph_id() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let config_path = temp.path().join("omnigraph.yaml");
|
||||
fs::write(
|
||||
&config_path,
|
||||
r#"
|
||||
graphs:
|
||||
policies:
|
||||
uri: /tmp/g1.omni
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let err = load_server_settings(Some(&config_path), None, None, None, false).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("invalid graph id 'policies'"),
|
||||
"expected reserved-name rejection, got: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
// ── Four-rule mode inference matrix ───────────────────────────────
|
||||
|
||||
/// Rule 1: CLI positional URI → Single.
|
||||
#[test]
|
||||
fn mode_inference_cli_uri_is_single() {
|
||||
let settings = load_server_settings(
|
||||
None,
|
||||
Some("/tmp/cli.omni".to_string()),
|
||||
None,
|
||||
None,
|
||||
true, // allow unauth so we get past the runtime-state check
|
||||
)
|
||||
.unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/cli.omni"),
|
||||
ServerConfigMode::Multi { .. } => panic!("expected Single (rule 1), got Multi"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Rule 2: --target picks one graph from `graphs:` map → Single.
|
||||
#[test]
|
||||
fn mode_inference_cli_target_is_single() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let config_path = temp.path().join("omnigraph.yaml");
|
||||
fs::write(
|
||||
&config_path,
|
||||
r#"
|
||||
graphs:
|
||||
alpha:
|
||||
uri: /tmp/alpha.omni
|
||||
beta:
|
||||
uri: /tmp/beta.omni
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings =
|
||||
load_server_settings(Some(&config_path), None, Some("alpha".into()), None, true)
|
||||
.unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"),
|
||||
ServerConfigMode::Multi { .. } => panic!("expected Single (rule 2), got Multi"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Rule 3: `server.graph` set → Single (target picked from config).
|
||||
#[test]
|
||||
fn mode_inference_server_graph_is_single() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let config_path = temp.path().join("omnigraph.yaml");
|
||||
fs::write(
|
||||
&config_path,
|
||||
r#"
|
||||
graphs:
|
||||
alpha:
|
||||
uri: /tmp/alpha.omni
|
||||
beta:
|
||||
uri: /tmp/beta.omni
|
||||
server:
|
||||
graph: beta
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings =
|
||||
load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"),
|
||||
ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Rule 4: `--config` + non-empty `graphs:` + no single-mode selector → Multi.
|
||||
#[test]
|
||||
fn mode_inference_config_plus_graphs_is_multi() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let config_path = temp.path().join("omnigraph.yaml");
|
||||
fs::write(
|
||||
&config_path,
|
||||
r#"
|
||||
graphs:
|
||||
alpha:
|
||||
uri: /tmp/alpha.omni
|
||||
beta:
|
||||
uri: /tmp/beta.omni
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings =
|
||||
load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Multi { graphs, .. } => {
|
||||
let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect();
|
||||
// BTreeMap iteration order is alphabetical.
|
||||
assert_eq!(ids, vec!["alpha", "beta"]);
|
||||
}
|
||||
ServerConfigMode::Single { .. } => panic!("expected Multi (rule 4), got Single"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Rule 5: nothing → error with migration hint.
|
||||
#[test]
|
||||
fn mode_inference_no_inputs_errors_with_migration_hint() {
|
||||
let err = load_server_settings(None, None, None, None, true).unwrap_err();
|
||||
let msg = err.to_string();
|
||||
assert!(
|
||||
msg.contains("no graph to serve"),
|
||||
"expected migration-hint error, got: {msg}"
|
||||
);
|
||||
}
|
||||
|
||||
/// Rule 4 sub-case: `--config` with empty `graphs:` map and no
|
||||
/// single-mode selector → rule 5 fires (no graph to serve).
|
||||
#[test]
|
||||
fn mode_inference_empty_graphs_map_errors() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let config_path = temp.path().join("omnigraph.yaml");
|
||||
fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap();
|
||||
let err =
|
||||
load_server_settings(Some(&config_path), None, None, None, true).unwrap_err();
|
||||
assert!(err.to_string().contains("no graph to serve"));
|
||||
}
|
||||
|
||||
/// `--config` + `<URI>` together: URI wins → Single (the CLI URI
|
||||
/// takes precedence over the config's graphs map).
|
||||
#[test]
|
||||
fn mode_inference_cli_uri_overrides_graphs_map() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let config_path = temp.path().join("omnigraph.yaml");
|
||||
fs::write(
|
||||
&config_path,
|
||||
r#"
|
||||
graphs:
|
||||
alpha:
|
||||
uri: /tmp/alpha.omni
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings = load_server_settings(
|
||||
Some(&config_path),
|
||||
Some("/tmp/cli-override.omni".to_string()),
|
||||
None,
|
||||
None,
|
||||
true,
|
||||
)
|
||||
.unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Single { uri, .. } => {
|
||||
assert_eq!(
|
||||
uri, "/tmp/cli-override.omni",
|
||||
"CLI URI must win over graphs: map"
|
||||
);
|
||||
}
|
||||
ServerConfigMode::Multi { .. } => {
|
||||
panic!("expected Single (CLI URI wins), got Multi")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-graph `policy.file` is resolved relative to the config base_dir.
|
||||
#[test]
|
||||
fn per_graph_policy_file_is_resolved_relative_to_base_dir() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let config_path = temp.path().join("omnigraph.yaml");
|
||||
fs::write(
|
||||
&config_path,
|
||||
r#"
|
||||
graphs:
|
||||
alpha:
|
||||
uri: /tmp/alpha.omni
|
||||
policy:
|
||||
file: ./policies/alpha.yaml
|
||||
beta:
|
||||
uri: /tmp/beta.omni
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings =
|
||||
load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
let graphs = match settings.mode {
|
||||
ServerConfigMode::Multi { graphs, .. } => graphs,
|
||||
_ => panic!("expected Multi"),
|
||||
};
|
||||
// graphs is BTreeMap-iter order (alphabetical).
|
||||
let alpha = &graphs[0];
|
||||
let beta = &graphs[1];
|
||||
assert_eq!(alpha.graph_id, "alpha");
|
||||
assert_eq!(
|
||||
alpha.policy_file.as_ref().unwrap(),
|
||||
&temp.path().join("policies/alpha.yaml")
|
||||
);
|
||||
assert_eq!(beta.graph_id, "beta");
|
||||
assert!(beta.policy_file.is_none());
|
||||
}
|
||||
|
||||
/// `server.policy.file` resolves alongside the graphs map.
|
||||
#[test]
|
||||
fn server_policy_file_is_resolved_relative_to_base_dir() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let config_path = temp.path().join("omnigraph.yaml");
|
||||
fs::write(
|
||||
&config_path,
|
||||
r#"
|
||||
server:
|
||||
policy:
|
||||
file: ./server-policy.yaml
|
||||
graphs:
|
||||
alpha:
|
||||
uri: /tmp/alpha.omni
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
let settings =
|
||||
load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
match settings.mode {
|
||||
ServerConfigMode::Multi {
|
||||
server_policy_file, ..
|
||||
} => {
|
||||
assert_eq!(
|
||||
server_policy_file.unwrap(),
|
||||
temp.path().join("server-policy.yaml")
|
||||
);
|
||||
}
|
||||
_ => panic!("expected Multi"),
|
||||
}
|
||||
}
|
||||
|
||||
/// End-to-end: load an `omnigraph.yaml` with two graphs and serve
|
||||
/// them. Both graphs must be queryable via cluster routes.
|
||||
///
|
||||
/// Uses `_` placeholders for tempdirs so they live until end-of-test.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn server_settings_drive_multi_graph_startup_end_to_end() {
|
||||
let cfg_dir = tempfile::tempdir().unwrap();
|
||||
// Real graph storage dirs (the URIs in the config must point to
|
||||
// a graph init-able location).
|
||||
let alpha_dir = cfg_dir.path().join("alpha.omni");
|
||||
let beta_dir = cfg_dir.path().join("beta.omni");
|
||||
let schema = fs::read_to_string(fixture("test.pg")).unwrap();
|
||||
Omnigraph::init(alpha_dir.to_str().unwrap(), &schema)
|
||||
.await
|
||||
.unwrap();
|
||||
Omnigraph::init(beta_dir.to_str().unwrap(), &schema)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let config_path = cfg_dir.path().join("omnigraph.yaml");
|
||||
fs::write(
|
||||
&config_path,
|
||||
format!(
|
||||
r#"
|
||||
graphs:
|
||||
alpha:
|
||||
uri: {alpha}
|
||||
beta:
|
||||
uri: {beta}
|
||||
"#,
|
||||
alpha = alpha_dir.display(),
|
||||
beta = beta_dir.display(),
|
||||
),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let settings: ServerConfig =
|
||||
load_server_settings(Some(&config_path), None, None, None, true).unwrap();
|
||||
assert!(matches!(settings.mode, ServerConfigMode::Multi { .. }));
|
||||
|
||||
// We don't actually call `serve()` (would bind a socket); we
|
||||
// just confirm the settings are well-formed and the inferred
|
||||
// mode lists both graphs.
|
||||
match settings.mode {
|
||||
ServerConfigMode::Multi { graphs, .. } => {
|
||||
assert_eq!(graphs.len(), 2);
|
||||
let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect();
|
||||
assert_eq!(ids, vec!["alpha", "beta"]);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue