feat(cluster,server): inline policy content + config-free --cluster URI boot

Two serving changes that complete RFC-006's read side:

ServingPolicy carries the policy bundle CONTENT (digest-verified at
snapshot read) instead of a blob path — the catalog may live on object
storage, and the server must not re-read mutable state after the
snapshot. The server grows a PolicySource enum: File for omnigraph.yaml
deployments (unchanged), Inline for cluster boots, wired through
PolicyEngine::load_{graph,server}_from_source.

read_serving_snapshot_from_storage(uri) reads the applied revision
straight from a storage root, and --cluster accepts a scheme-qualified
URI (s3://bucket/prefix): config-free serving — a serving box needs only
the URI and credentials; the ledger and catalog on the bucket ARE the
deployment artifact. Bare paths keep the config-directory behavior.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
aaltshuler 2026-06-11 15:56:22 +03:00
parent 7af3697397
commit 58855c0a7c
8 changed files with 127 additions and 69 deletions

View file

@ -193,18 +193,28 @@ pub enum ServerConfigMode {
config_path: PathBuf,
/// `server.policy.file` (server-level Cedar policy for the
/// management endpoints). Wired into `GET /graphs` authorization.
server_policy_file: Option<PathBuf>,
server_policy: Option<PolicySource>,
},
}
/// Where a Cedar policy bundle comes from at startup. File-based for
/// omnigraph.yaml deployments; inline (digest-verified catalog content)
/// for cluster-mode boots, where the catalog may live on object storage
/// and the server must not re-read mutable state after the snapshot.
#[derive(Debug, Clone)]
pub enum PolicySource {
File(PathBuf),
Inline(String),
}
/// One graph's startup-time configuration: id, opened URI, optional
/// per-graph policy file path. Constructed by `load_server_settings`
/// per-graph policy source. Constructed by `load_server_settings`
/// in multi mode; consumed by `serve`'s parallel open loop.
#[derive(Debug, Clone)]
pub struct GraphStartupConfig {
pub graph_id: String,
pub uri: String,
pub policy_file: Option<PathBuf>,
pub policy: Option<PolicySource>,
/// Per-graph stored-query registry, loaded and identity-checked at
/// settings-build time; type-checked against the schema when this
/// graph's engine opens.
@ -994,9 +1004,9 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(),
ServerConfigMode::Multi {
graphs,
server_policy_file,
server_policy,
..
} => server_policy_file.is_some() || graphs.iter().any(|g| g.policy_file.is_some()),
} => server_policy.is_some() || graphs.iter().any(|g| g.policy.is_some()),
};
let runtime_state = classify_server_runtime_state(
!tokens.is_empty(),
@ -1045,7 +1055,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
ServerConfigMode::Multi {
graphs,
config_path,
server_policy_file,
server_policy,
} => {
info!(
bind = %bind,
@ -1054,7 +1064,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
config = %config_path.display(),
"serving omnigraph"
);
open_multi_graph_state(graphs, tokens, server_policy_file.as_ref(), config_path).await?
open_multi_graph_state(graphs, tokens, server_policy.as_ref(), config_path).await?
}
};
@ -1065,6 +1075,14 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
Ok(())
}
/// Load a graph-scoped policy bundle from either source kind.
fn load_graph_policy(source: &PolicySource, graph_id: &str) -> Result<PolicyEngine> {
match source {
PolicySource::File(path) => Ok(PolicyEngine::load_graph(path, graph_id)?),
PolicySource::Inline(text) => Ok(PolicyEngine::load_graph_from_source(text, graph_id)?),
}
}
/// Parallel open of every graph in the startup config, with bounded
/// concurrency (`buffer_unordered(4)`). Fail-fast — the first open error
/// aborts startup; other in-flight opens are dropped (their `Omnigraph`
@ -1076,7 +1094,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
pub async fn open_multi_graph_state(
graphs: Vec<GraphStartupConfig>,
tokens: Vec<(String, String)>,
server_policy_file: Option<&PathBuf>,
server_policy_source: Option<&PolicySource>,
config_path: PathBuf,
) -> Result<AppState> {
use futures::{StreamExt, TryStreamExt};
@ -1089,8 +1107,11 @@ pub async fn open_multi_graph_state(
// The placeholder graph_id `"server"` is the sentinel the Cedar
// resource-model refactor maps to the singleton
// `Omnigraph::Server::"root"` entity at evaluation time.
let server_policy = match server_policy_file {
Some(path) => Some(PolicyEngine::load_server(path)?),
let server_policy = match server_policy_source {
Some(PolicySource::File(path)) => Some(PolicyEngine::load_server(path)?),
Some(PolicySource::Inline(source)) => {
Some(PolicyEngine::load_server_from_source(source)?)
}
None => None,
};
@ -1128,9 +1149,9 @@ async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>>
// owned `Arc`, so no borrow of `db` survives into the match.
let queries = validate_and_attach(cfg.queries, &db.catalog(), graph_id.as_str())?;
let (policy_arc, db) = match &cfg.policy_file {
Some(path) => {
let policy = PolicyEngine::load_graph(path, graph_id.as_str())?;
let (policy_arc, db) = match &cfg.policy {
Some(source) => {
let policy = load_graph_policy(source, graph_id.as_str())?;
let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
(Some(policy_arc), db.with_policy(checker))

View file

@ -14,10 +14,10 @@ struct Cli {
target: Option<String>,
#[arg(long)]
config: Option<PathBuf>,
/// Boot from a cluster directory (the applied revision in
/// __cluster/state.json + content-addressed catalog blobs) instead of
/// omnigraph.yaml. Exclusive: cannot combine with <URI>, --target, or
/// --config.
/// Boot from a cluster: either a config directory (storage resolved
/// through cluster.yaml) or a storage-root URI directly
/// (s3://bucket/prefix — config-free serving from the bucket).
/// Exclusive: cannot combine with <URI>, --target, or --config.
#[arg(long)]
cluster: Option<PathBuf>,
#[arg(long)]

View file

@ -14,7 +14,19 @@ pub(crate) async fn load_cluster_settings(
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
) -> Result<ServerConfig> {
let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).await.map_err(|diagnostics| {
// `--cluster` accepts either a config directory (the ledger location is
// resolved through cluster.yaml's `storage:` key) or a storage-root URI
// directly (`s3://bucket/prefix`) — config-free serving: the ledger and
// catalog on the bucket ARE the deployment artifact.
// Any scheme-qualified argument (s3://, file://) is a storage root; a
// bare path is a config directory.
let cluster_arg = cluster_dir.to_string_lossy();
let snapshot = if cluster_arg.contains("://") {
omnigraph_cluster::read_serving_snapshot_from_storage(cluster_arg.as_ref()).await
} else {
omnigraph_cluster::read_serving_snapshot(cluster_dir).await
}
.map_err(|diagnostics| {
let details = diagnostics
.iter()
.map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message))
@ -26,19 +38,25 @@ pub(crate) async fn load_cluster_settings(
// Bindings -> Cedar slots. The serving pipeline loads one bundle per
// graph plus one server-level bundle; stacked bundles per scope are a
// later slice — refuse loudly rather than silently merging policy.
let mut server_policy_file: Option<PathBuf> = None;
let mut graph_policy_files: BTreeMap<String, PathBuf> = BTreeMap::new();
let mut server_policy: Option<PolicySource> = None;
let mut graph_policies: BTreeMap<String, PolicySource> = BTreeMap::new();
for policy in &snapshot.policies {
for binding in &policy.applies_to {
if binding == "cluster" {
if server_policy_file.replace(policy.blob_path.clone()).is_some() {
if server_policy
.replace(PolicySource::Inline(policy.source.clone()))
.is_some()
{
bail!(
"multiple policy bundles bind the cluster scope; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
);
}
} else if let Some(graph_id) = binding.strip_prefix("graph.") {
if graph_policy_files
.insert(graph_id.to_string(), policy.blob_path.clone())
if graph_policies
.insert(
graph_id.to_string(),
PolicySource::Inline(policy.source.clone()),
)
.is_some()
{
bail!(
@ -80,7 +98,7 @@ pub(crate) async fn load_cluster_settings(
graphs.push(GraphStartupConfig {
graph_id: graph.graph_id.clone(),
uri: graph.root.to_string_lossy().to_string(),
policy_file: graph_policy_files.get(&graph.graph_id).cloned(),
policy: graph_policies.get(&graph.graph_id).cloned(),
queries: registry,
});
}
@ -97,7 +115,7 @@ pub(crate) async fn load_cluster_settings(
mode: ServerConfigMode::Multi {
graphs,
config_path: cluster_dir.clone(),
server_policy_file,
server_policy,
},
bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()),
allow_unauthenticated: cli_allow_unauthenticated || env_unauth,
@ -226,18 +244,18 @@ pub async fn load_server_settings(
graphs.push(GraphStartupConfig {
graph_id: name.clone(),
uri,
policy_file: config.resolve_target_policy_file(name),
policy: config.resolve_target_policy_file(name).map(PolicySource::File),
queries,
});
}
let config_path = config_path
.cloned()
.expect("has_explicit_config implies config_path is Some");
let server_policy_file = config.resolve_server_policy_file();
let server_policy = config.resolve_server_policy_file().map(PolicySource::File);
ServerConfigMode::Multi {
graphs,
config_path,
server_policy_file,
server_policy,
}
} else {
// Rule 5 → error with migration hint.
@ -729,11 +747,11 @@ server:
.join("alpha.omni")
.to_string_lossy()
.into_owned(),
policy_file: None,
policy: None,
queries: crate::queries::QueryRegistry::default(),
}],
config_path: temp.path().join("omnigraph.yaml"),
server_policy_file: Some(policy_path),
server_policy: Some(crate::PolicySource::File(policy_path)),
},
bind: "127.0.0.1:0".to_string(),
allow_unauthenticated: false,