feat(server): boot from cluster state via --cluster

RFC-005 §D1/§D2: omnigraph-server --cluster <dir> is rule 0 of the mode
inference — an exclusive boot source (hard error when combined with a graph
URI, --target, or --config) that never opens omnigraph.yaml, not even the
implicit current-directory search. The cluster branch reads the applied
revision through omnigraph-cluster's serving-snapshot API and feeds the
EXISTING multi-graph pipeline: GraphStartupConfig per recorded graph at its
derived root, stored queries built via QueryRegistry::from_specs from
verified blob content (expose-all — the §D5 bridge until Phase 6
policy-owned exposure), cluster-bound policy bundles as the server-level
Cedar engine and graph-bound bundles per graph, straight from the
content-addressed blob paths. Multiple bundles binding one scope refuse boot
(one-bundle-per-scope is the serving pipeline's shape; stacking is a later
slice). Everything downstream — parallel opens, query type-checking,
registry, routing, auth, OpenAPI — is reused unchanged; cluster mode is a
new source, not a new pipeline.

First server->cluster crate dependency: read-only types + one fn;
omnigraph-cluster stays HTTP-free. open_multi_graph_state goes pub for
integration tests.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-10 17:48:10 +03:00
parent f5b43164b8
commit 948a54daa7
5 changed files with 378 additions and 24 deletions

View file

@ -14,7 +14,7 @@ pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, Regi
use crate::queries::{QueryRegistry, check, format_check_breakages};
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs;
use std::io;
use std::io::Write;
@ -40,7 +40,7 @@ use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Response};
use axum::routing::{delete, get, post};
use axum::{Json, Router};
use color_eyre::eyre::{Result, WrapErr, bail};
use color_eyre::eyre::{Result, WrapErr, bail, eyre};
pub use config::{
AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings,
ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig,
@ -888,13 +888,125 @@ fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> St
format!("graph '{label}': stored-query registry failed to load:\n {joined}")
}
/// Build serving settings from a cluster directory's applied revision
/// (RFC-005 §D2): graphs at derived roots, stored queries from verified
/// catalog blob content, policy bundles from blob paths with their applied
/// bindings. Always multi-graph routing. The unauthenticated/env handling
/// matches the omnigraph.yaml path.
fn load_cluster_settings(
cluster_dir: &PathBuf,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
) -> Result<ServerConfig> {
let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).map_err(|diagnostics| {
let details = diagnostics
.iter()
.map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message))
.collect::<Vec<_>>()
.join("\n ");
eyre!("the cluster at '{}' is not ready to serve:\n {details}", cluster_dir.display())
})?;
// Bindings -> Cedar slots. The serving pipeline loads one bundle per
// graph plus one server-level bundle; stacked bundles per scope are a
// later slice — refuse loudly rather than silently merging policy.
let mut server_policy_file: Option<PathBuf> = None;
let mut graph_policy_files: BTreeMap<String, PathBuf> = BTreeMap::new();
for policy in &snapshot.policies {
for binding in &policy.applies_to {
if binding == "cluster" {
if server_policy_file.replace(policy.blob_path.clone()).is_some() {
bail!(
"multiple policy bundles bind the cluster scope; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
);
}
} else if let Some(graph_id) = binding.strip_prefix("graph.") {
if graph_policy_files
.insert(graph_id.to_string(), policy.blob_path.clone())
.is_some()
{
bail!(
"multiple policy bundles bind graph '{graph_id}'; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
);
}
} else {
bail!("unrecognized policy binding '{binding}' in the applied revision");
}
}
}
let mut graphs = Vec::new();
for graph in &snapshot.graphs {
let specs: Vec<queries::RegistrySpec> = snapshot
.queries
.iter()
.filter(|query| query.graph_id == graph.graph_id)
.map(|query| queries::RegistrySpec {
name: query.name.clone(),
source: query.source.clone(),
// The §D5 bridge: the cluster registry has no expose flag
// (exposure becomes a policy decision in Phase 6) — cluster
// mode lists every stored query.
expose: true,
tool_name: None,
})
.collect();
let registry = QueryRegistry::from_specs(specs).map_err(|errors| {
let details = errors
.iter()
.map(|error| error.to_string())
.collect::<Vec<_>>()
.join("\n ");
eyre!(
"stored queries in the applied revision failed to parse:\n {details}\nrun `cluster refresh` then `cluster apply`, and restart"
)
})?;
graphs.push(GraphStartupConfig {
graph_id: graph.graph_id.clone(),
uri: graph.root.to_string_lossy().to_string(),
policy_file: graph_policy_files.get(&graph.graph_id).cloned(),
queries: registry,
});
}
let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
.ok()
.map(|v| {
let trimmed = v.trim();
!trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
})
.unwrap_or(false);
Ok(ServerConfig {
mode: ServerConfigMode::Multi {
graphs,
config_path: cluster_dir.clone(),
server_policy_file,
},
bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()),
allow_unauthenticated: cli_allow_unauthenticated || env_unauth,
})
}
pub fn load_server_settings(
config_path: Option<&PathBuf>,
cli_cluster: Option<&PathBuf>,
cli_uri: Option<String>,
cli_target: Option<String>,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
) -> Result<ServerConfig> {
// Rule 0 (RFC-005): --cluster is an exclusive boot source. It is checked
// before anything reads omnigraph.yaml — in cluster mode that file is
// never opened, not even the implicit current-directory search.
if let Some(cluster_dir) = cli_cluster {
if cli_uri.is_some() || cli_target.is_some() || config_path.is_some() {
bail!(
"--cluster is an exclusive boot source; it cannot combine with a graph URI, --target, or --config (axiom 15: a deployment serves from one source)"
);
}
return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated);
}
let config = load_config(config_path)?;
let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
// Either `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1` flips
@ -1275,7 +1387,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
/// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this
/// trades startup latency for a small amount of concurrent S3 / Lance
/// open pressure.
async fn open_multi_graph_state(
pub async fn open_multi_graph_state(
graphs: Vec<GraphStartupConfig>,
tokens: Vec<(String, String)>,
server_policy_file: Option<&PathBuf>,
@ -3255,7 +3367,7 @@ server:
)
.unwrap();
let settings = load_server_settings(Some(&config), None, None, None, false).unwrap();
let settings = load_server_settings(Some(&config), None, None, None, None, false).unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
assert_eq!(uri, "/tmp/demo.omni");
@ -3285,6 +3397,7 @@ server:
let settings = load_server_settings(
Some(&config),
None,
Some("/tmp/override.omni".to_string()),
None,
Some("0.0.0.0:9999".to_string()),
@ -3321,7 +3434,7 @@ server:
.unwrap();
let settings =
load_server_settings(Some(&config), None, Some("dev".to_string()), None, false)
load_server_settings(Some(&config), None, None, Some("dev".to_string()), None, false)
.unwrap();
match &settings.mode {
ServerConfigMode::Single { uri, graph_id, .. } => {
@ -3334,7 +3447,7 @@ server:
#[test]
fn server_settings_require_uri_from_cli_or_config() {
let error = load_server_settings(None, None, None, None, false).unwrap_err();
let error = load_server_settings(None, None, None, None, None, false).unwrap_err();
assert!(
error.to_string().contains("no graph to serve"),
"expected mode-inference error, got: {error}",
@ -3501,7 +3614,7 @@ server:
// Truthy values flip Open mode on, even with CLI flag off.
for value in ["1", "true", "yes", "TRUE", "anything"] {
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
let settings = load_server_settings(Some(&config_path), None, None, None, false)
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
.expect("settings load should succeed");
assert!(
settings.allow_unauthenticated,
@ -3512,7 +3625,7 @@ server:
// Falsy values keep refusal behavior, even with CLI flag off.
for value in ["0", "false", "FALSE", ""] {
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
let settings = load_server_settings(Some(&config_path), None, None, None, false)
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
.expect("settings load should succeed");
assert!(
!settings.allow_unauthenticated,
@ -3522,7 +3635,7 @@ server:
// Unset env var: also false.
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]);
let settings = load_server_settings(Some(&config_path), None, None, None, false)
let settings = load_server_settings(Some(&config_path), None, None, None, None, false)
.expect("settings load should succeed");
assert!(
!settings.allow_unauthenticated,
@ -3533,7 +3646,7 @@ server:
// CLI flag wins even when env is falsy — `serve()` honors the
// OR of both inputs.
let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]);
let settings = load_server_settings(Some(&config_path), None, None, None, true)
let settings = load_server_settings(Some(&config_path), None, None, None, None, true)
.expect("settings load should succeed");
assert!(
settings.allow_unauthenticated,

View file

@ -14,6 +14,12 @@ struct Cli {
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
/// Boot from a cluster directory (the applied revision in
/// __cluster/state.json + content-addressed catalog blobs) instead of
/// omnigraph.yaml. Exclusive: cannot combine with <URI>, --target, or
/// --config.
#[arg(long)]
cluster: Option<PathBuf>,
#[arg(long)]
bind: Option<String>,
/// Run without bearer tokens and without a policy file (MR-723).
@ -32,6 +38,7 @@ async fn main() -> Result<()> {
let cli = Cli::parse();
let settings: ServerConfig = load_server_settings(
cli.config.as_ref(),
cli.cluster.as_ref(),
cli.uri,
cli.target,
cli.bind,