diff --git a/Cargo.lock b/Cargo.lock index f6a1b8a..675fad7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4673,6 +4673,7 @@ dependencies = [ "futures", "lance", "lance-index", + "omnigraph-cluster", "omnigraph-compiler", "omnigraph-engine", "omnigraph-policy", diff --git a/crates/omnigraph-server/Cargo.toml b/crates/omnigraph-server/Cargo.toml index 5f87082..5393221 100644 --- a/crates/omnigraph-server/Cargo.toml +++ b/crates/omnigraph-server/Cargo.toml @@ -22,6 +22,7 @@ aws = ["dep:aws-config", "dep:aws-sdk-secretsmanager"] omnigraph = { package = "omnigraph-engine", path = "../omnigraph", version = "0.6.2" } omnigraph-compiler = { path = "../omnigraph-compiler", version = "0.6.2" } omnigraph-policy = { path = "../omnigraph-policy", version = "0.6.2" } +omnigraph-cluster = { path = "../omnigraph-cluster", version = "0.6.2" } axum = { workspace = true } clap = { workspace = true } color-eyre = { workspace = true } diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 60ebef3..3b9ff1d 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -14,7 +14,7 @@ pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, Regi use crate::queries::{QueryRegistry, check, format_check_breakages}; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs; use std::io; use std::io::Write; @@ -40,7 +40,7 @@ use axum::middleware::{self, Next}; use axum::response::{IntoResponse, Response}; use axum::routing::{delete, get, post}; use axum::{Json, Router}; -use color_eyre::eyre::{Result, WrapErr, bail}; +use color_eyre::eyre::{Result, WrapErr, bail, eyre}; pub use config::{ AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings, ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig, @@ -888,13 +888,125 @@ fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> St format!("graph '{label}': stored-query registry failed to load:\n {joined}") } +/// Build serving settings from a cluster directory's applied revision +/// (RFC-005 §D2): graphs at derived roots, stored queries from verified +/// catalog blob content, policy bundles from blob paths with their applied +/// bindings. Always multi-graph routing. The unauthenticated/env handling +/// matches the omnigraph.yaml path. +fn load_cluster_settings( + cluster_dir: &PathBuf, + cli_bind: Option, + cli_allow_unauthenticated: bool, +) -> Result { + let snapshot = omnigraph_cluster::read_serving_snapshot(cluster_dir).map_err(|diagnostics| { + let details = diagnostics + .iter() + .map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message)) + .collect::>() + .join("\n "); + eyre!("the cluster at '{}' is not ready to serve:\n {details}", cluster_dir.display()) + })?; + + // Bindings -> Cedar slots. The serving pipeline loads one bundle per + // graph plus one server-level bundle; stacked bundles per scope are a + // later slice — refuse loudly rather than silently merging policy. + let mut server_policy_file: Option = None; + let mut graph_policy_files: BTreeMap = BTreeMap::new(); + for policy in &snapshot.policies { + for binding in &policy.applies_to { + if binding == "cluster" { + if server_policy_file.replace(policy.blob_path.clone()).is_some() { + bail!( + "multiple policy bundles bind the cluster scope; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)" + ); + } + } else if let Some(graph_id) = binding.strip_prefix("graph.") { + if graph_policy_files + .insert(graph_id.to_string(), policy.blob_path.clone()) + .is_some() + { + bail!( + "multiple policy bundles bind graph '{graph_id}'; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)" + ); + } + } else { + bail!("unrecognized policy binding '{binding}' in the applied revision"); + } + } + } + + let mut graphs = Vec::new(); + for graph in &snapshot.graphs { + let specs: Vec = snapshot + .queries + .iter() + .filter(|query| query.graph_id == graph.graph_id) + .map(|query| queries::RegistrySpec { + name: query.name.clone(), + source: query.source.clone(), + // The §D5 bridge: the cluster registry has no expose flag + // (exposure becomes a policy decision in Phase 6) — cluster + // mode lists every stored query. + expose: true, + tool_name: None, + }) + .collect(); + let registry = QueryRegistry::from_specs(specs).map_err(|errors| { + let details = errors + .iter() + .map(|error| error.to_string()) + .collect::>() + .join("\n "); + eyre!( + "stored queries in the applied revision failed to parse:\n {details}\nrun `cluster refresh` then `cluster apply`, and restart" + ) + })?; + graphs.push(GraphStartupConfig { + graph_id: graph.graph_id.clone(), + uri: graph.root.to_string_lossy().to_string(), + policy_file: graph_policy_files.get(&graph.graph_id).cloned(), + queries: registry, + }); + } + + let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED") + .ok() + .map(|v| { + let trimmed = v.trim(); + !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false") + }) + .unwrap_or(false); + + Ok(ServerConfig { + mode: ServerConfigMode::Multi { + graphs, + config_path: cluster_dir.clone(), + server_policy_file, + }, + bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()), + allow_unauthenticated: cli_allow_unauthenticated || env_unauth, + }) +} + pub fn load_server_settings( config_path: Option<&PathBuf>, + cli_cluster: Option<&PathBuf>, cli_uri: Option, cli_target: Option, cli_bind: Option, cli_allow_unauthenticated: bool, ) -> Result { + // Rule 0 (RFC-005): --cluster is an exclusive boot source. It is checked + // before anything reads omnigraph.yaml — in cluster mode that file is + // never opened, not even the implicit current-directory search. + if let Some(cluster_dir) = cli_cluster { + if cli_uri.is_some() || cli_target.is_some() || config_path.is_some() { + bail!( + "--cluster is an exclusive boot source; it cannot combine with a graph URI, --target, or --config (axiom 15: a deployment serves from one source)" + ); + } + return load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated); + } let config = load_config(config_path)?; let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string()); // Either `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1` flips @@ -1275,7 +1387,7 @@ pub async fn serve(config: ServerConfig) -> Result<()> { /// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this /// trades startup latency for a small amount of concurrent S3 / Lance /// open pressure. -async fn open_multi_graph_state( +pub async fn open_multi_graph_state( graphs: Vec, tokens: Vec<(String, String)>, server_policy_file: Option<&PathBuf>, @@ -3255,7 +3367,7 @@ server: ) .unwrap(); - let settings = load_server_settings(Some(&config), None, None, None, false).unwrap(); + let settings = load_server_settings(Some(&config), None, None, None, None, false).unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { assert_eq!(uri, "/tmp/demo.omni"); @@ -3285,6 +3397,7 @@ server: let settings = load_server_settings( Some(&config), + None, Some("/tmp/override.omni".to_string()), None, Some("0.0.0.0:9999".to_string()), @@ -3321,7 +3434,7 @@ server: .unwrap(); let settings = - load_server_settings(Some(&config), None, Some("dev".to_string()), None, false) + load_server_settings(Some(&config), None, None, Some("dev".to_string()), None, false) .unwrap(); match &settings.mode { ServerConfigMode::Single { uri, graph_id, .. } => { @@ -3334,7 +3447,7 @@ server: #[test] fn server_settings_require_uri_from_cli_or_config() { - let error = load_server_settings(None, None, None, None, false).unwrap_err(); + let error = load_server_settings(None, None, None, None, None, false).unwrap_err(); assert!( error.to_string().contains("no graph to serve"), "expected mode-inference error, got: {error}", @@ -3501,7 +3614,7 @@ server: // Truthy values flip Open mode on, even with CLI flag off. for value in ["1", "true", "yes", "TRUE", "anything"] { let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]); - let settings = load_server_settings(Some(&config_path), None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false) .expect("settings load should succeed"); assert!( settings.allow_unauthenticated, @@ -3512,7 +3625,7 @@ server: // Falsy values keep refusal behavior, even with CLI flag off. for value in ["0", "false", "FALSE", ""] { let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]); - let settings = load_server_settings(Some(&config_path), None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false) .expect("settings load should succeed"); assert!( !settings.allow_unauthenticated, @@ -3522,7 +3635,7 @@ server: // Unset env var: also false. let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]); - let settings = load_server_settings(Some(&config_path), None, None, None, false) + let settings = load_server_settings(Some(&config_path), None, None, None, None, false) .expect("settings load should succeed"); assert!( !settings.allow_unauthenticated, @@ -3533,7 +3646,7 @@ server: // CLI flag wins even when env is falsy — `serve()` honors the // OR of both inputs. let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]); - let settings = load_server_settings(Some(&config_path), None, None, None, true) + let settings = load_server_settings(Some(&config_path), None, None, None, None, true) .expect("settings load should succeed"); assert!( settings.allow_unauthenticated, diff --git a/crates/omnigraph-server/src/main.rs b/crates/omnigraph-server/src/main.rs index 4e1c256..c71ea2f 100644 --- a/crates/omnigraph-server/src/main.rs +++ b/crates/omnigraph-server/src/main.rs @@ -14,6 +14,12 @@ struct Cli { target: Option, #[arg(long)] config: Option, + /// Boot from a cluster directory (the applied revision in + /// __cluster/state.json + content-addressed catalog blobs) instead of + /// omnigraph.yaml. Exclusive: cannot combine with , --target, or + /// --config. + #[arg(long)] + cluster: Option, #[arg(long)] bind: Option, /// Run without bearer tokens and without a policy file (MR-723). @@ -32,6 +38,7 @@ async fn main() -> Result<()> { let cli = Cli::parse(); let settings: ServerConfig = load_server_settings( cli.config.as_ref(), + cli.cluster.as_ref(), cli.uri, cli.target, cli.bind, diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 4a49a14..bf99b8d 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -5508,7 +5508,7 @@ graphs: "#, ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, false).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, false).unwrap_err(); assert!( err.to_string().contains("invalid graph id 'policies'"), "expected reserved-name rejection, got: {err}" @@ -5575,6 +5575,7 @@ graphs: #[test] fn mode_inference_cli_uri_is_single() { let settings = load_server_settings( + None, None, Some("/tmp/cli.omni".to_string()), None, @@ -5605,7 +5606,7 @@ graphs: ) .unwrap(); let settings = - load_server_settings(Some(&config_path), None, Some("alpha".into()), None, true) + load_server_settings(Some(&config_path), None, None, Some("alpha".into()), None, true) .unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/alpha.omni"), @@ -5631,7 +5632,7 @@ server: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); match settings.mode { ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/beta.omni"), ServerConfigMode::Multi { .. } => panic!("expected Single (rule 3), got Multi"), @@ -5654,7 +5655,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); match settings.mode { ServerConfigMode::Multi { graphs, .. } => { let ids: Vec<&str> = graphs.iter().map(|g| g.graph_id.as_str()).collect(); @@ -5680,7 +5681,7 @@ graphs: "#, ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); let msg = err.to_string(); assert!( msg.contains("top-level") && msg.contains("policy.file") && msg.contains("not honored"), @@ -5708,7 +5709,7 @@ graphs: "queries:\n q:\n file: ./q.gq\ngraphs:\n alpha:\n uri: /tmp/alpha.omni\n", ) .unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); let msg = err.to_string(); assert!( msg.contains("queries") && msg.contains("not honored"), @@ -5729,7 +5730,7 @@ graphs: ) .unwrap(); let err = - load_server_settings(Some(&config_path), None, Some("prod".to_string()), None, true) + load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true) .unwrap_err(); let msg = err.to_string(); assert!( @@ -5756,7 +5757,7 @@ graphs: ) .unwrap(); let settings = - load_server_settings(Some(&config_path), None, Some("prod".to_string()), None, true) + load_server_settings(Some(&config_path), None, None, Some("prod".to_string()), None, true) .unwrap(); match settings.mode { ServerConfigMode::Single { @@ -5795,7 +5796,7 @@ graphs: ), ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); match settings.mode { ServerConfigMode::Multi { graphs, .. } => { assert_eq!(graphs[0].uri, graph.to_string_lossy()); @@ -5807,7 +5808,7 @@ graphs: /// Rule 5: nothing → error with migration hint. #[test] fn mode_inference_no_inputs_errors_with_migration_hint() { - let err = load_server_settings(None, None, None, None, true).unwrap_err(); + let err = load_server_settings(None, None, None, None, None, true).unwrap_err(); let msg = err.to_string(); assert!( msg.contains("no graph to serve"), @@ -5822,7 +5823,7 @@ graphs: let temp = tempfile::tempdir().unwrap(); let config_path = temp.path().join("omnigraph.yaml"); fs::write(&config_path, "server:\n bind: 127.0.0.1:8080\n").unwrap(); - let err = load_server_settings(Some(&config_path), None, None, None, true).unwrap_err(); + let err = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap_err(); assert!(err.to_string().contains("no graph to serve")); } @@ -5843,6 +5844,7 @@ graphs: .unwrap(); let settings = load_server_settings( Some(&config_path), + None, Some("/tmp/cli-override.omni".to_string()), None, None, @@ -5880,7 +5882,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); let graphs = match settings.mode { ServerConfigMode::Multi { graphs, .. } => graphs, _ => panic!("expected Multi"), @@ -5914,7 +5916,7 @@ graphs: "#, ) .unwrap(); - let settings = load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + let settings = load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); match settings.mode { ServerConfigMode::Multi { server_policy_file, .. @@ -6194,7 +6196,7 @@ graphs: .unwrap(); let settings: ServerConfig = - load_server_settings(Some(&config_path), None, None, None, true).unwrap(); + load_server_settings(Some(&config_path), None, None, None, None, true).unwrap(); assert!(matches!(settings.mode, ServerConfigMode::Multi { .. })); match settings.mode { @@ -6207,3 +6209,233 @@ graphs: } } } + +// ---- Phase 5: cluster-mode boot (RFC-005) ---- + +/// Build and converge a real cluster directory: cluster.yaml + schema + +/// stored query (+ optional policies), then `import` + `apply` so the +/// catalog and state ledger exist exactly as an operator would have them. +async fn converged_cluster_dir(policies_yaml: &str) -> tempfile::TempDir { + let temp = tempfile::tempdir().unwrap(); + fs::write( + temp.path().join("people.pg"), + "\nnode Person {\n name: String @key\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("cluster.yaml"), + format!( + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +{policies_yaml}"# + ), + ) + .unwrap(); + let import = omnigraph_cluster::import_config_dir(temp.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + let apply = omnigraph_cluster::apply_config_dir(temp.path()).await; + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + temp +} + +fn cluster_settings(dir: &Path) -> color_eyre::eyre::Result { + omnigraph_server::load_server_settings(None, Some(&dir.to_path_buf()), None, None, None, true) +} + +#[tokio::test] +async fn cluster_boot_serves_applied_state() { + let temp = converged_cluster_dir("").await; + let settings = cluster_settings(temp.path()).unwrap(); + let omnigraph_server::ServerConfigMode::Multi { + graphs, + config_path, + server_policy_file, + } = settings.mode + else { + panic!("cluster boot must select multi-graph routing"); + }; + assert_eq!(graphs.len(), 1); + assert_eq!(graphs[0].graph_id, "knowledge"); + assert!(server_policy_file.is_none()); + + let state = + omnigraph_server::open_multi_graph_state(graphs, Vec::new(), None, config_path) + .await + .unwrap(); + let app = build_app(state); + + // The management surface keeps its closed-by-default contract: without a + // cluster-scoped policy bundle there is no server-level Cedar engine, so + // GET /graphs refuses even in cluster mode. + let (status, body) = json_response( + &app, + Request::builder().uri("/graphs").body(Body::empty()).unwrap(), + ) + .await; + assert_eq!(status, StatusCode::FORBIDDEN, "{body}"); + + let (status, body) = json_response( + &app, + Request::builder() + .uri("/graphs/knowledge/queries") + .body(Body::empty()) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::OK, "{body}"); + assert!( + body["queries"] + .as_array() + .unwrap() + .iter() + .any(|q| q["name"] == "find_person"), + "{body}" + ); + + let (status, body) = json_response( + &app, + Request::builder() + .method(Method::POST) + .uri("/graphs/knowledge/queries/find_person") + .header("content-type", "application/json") + .body(Body::from(r#"{"params":{"name":"nobody"}}"#)) + .unwrap(), + ) + .await; + assert_eq!(status, StatusCode::OK, "{body}"); +} + +#[tokio::test] +async fn cluster_boot_wires_policy_bindings_into_cedar_slots() { + let temp = tempfile::tempdir().unwrap(); + drop(temp); + let policy_block = r#"policies: + graph_rules: + file: ./graph.policy.yaml + applies_to: [knowledge] + cluster_rules: + file: ./cluster.policy.yaml + applies_to: [cluster] +"#; + let temp = { + let temp = tempfile::tempdir().unwrap(); + fs::write( + temp.path().join("people.pg"), + "\nnode Person {\n name: String @key\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("people.gq"), + "\nquery find_person($name: String) {\n match { $p: Person { name: $name } }\n return { $p.name }\n}\n", + ) + .unwrap(); + fs::write( + temp.path().join("graph.policy.yaml"), + permit_all_policy_yaml(&["default"]), + ) + .unwrap(); + fs::write( + temp.path().join("cluster.policy.yaml"), + permit_all_policy_yaml(&["default"]).replace("protected_branches: [main]\n", "protected_branches: [main]\nkind: server\n"), + ) + .unwrap(); + fs::write( + temp.path().join("cluster.yaml"), + format!( + r#" +version: 1 +graphs: + knowledge: + schema: ./people.pg + queries: + find_person: + file: ./people.gq +{policy_block}"# + ), + ) + .unwrap(); + let import = omnigraph_cluster::import_config_dir(temp.path()).await; + assert!(import.ok, "{:?}", import.diagnostics); + let apply = omnigraph_cluster::apply_config_dir(temp.path()).await; + assert!(apply.ok && apply.converged, "{:?}", apply.diagnostics); + temp + }; + + let settings = cluster_settings(temp.path()).unwrap(); + let omnigraph_server::ServerConfigMode::Multi { + graphs, + server_policy_file, + .. + } = settings.mode + else { + panic!("cluster boot must select multi-graph routing"); + }; + let graph_policy = graphs[0].policy_file.as_ref().expect("graph-bound bundle"); + assert!( + graph_policy + .to_string_lossy() + .contains("__cluster/resources/policy/graph_rules/"), + "{graph_policy:?}" + ); + let server_policy = server_policy_file.expect("cluster-bound bundle"); + assert!( + server_policy + .to_string_lossy() + .contains("__cluster/resources/policy/cluster_rules/"), + "{server_policy:?}" + ); +} + +#[tokio::test] +async fn cluster_boot_refusals() { + // Mutual exclusion with --config / URI. + let temp = converged_cluster_dir("").await; + let dir = temp.path().to_path_buf(); + let err = omnigraph_server::load_server_settings( + Some(&dir.join("omnigraph.yaml")), + Some(&dir), + None, + None, + None, + true, + ) + .unwrap_err(); + assert!(err.to_string().contains("exclusive boot source"), "{err}"); + let err = omnigraph_server::load_server_settings( + None, + Some(&dir), + Some("file:///tmp/x.omni".to_string()), + None, + None, + true, + ) + .unwrap_err(); + assert!(err.to_string().contains("exclusive boot source"), "{err}"); + + // Tampered catalog blob refuses boot with the remedy. + let blob_dir = dir.join("__cluster/resources/query/knowledge/find_person"); + let blob = fs::read_dir(&blob_dir).unwrap().next().unwrap().unwrap().path(); + fs::write(&blob, "tampered").unwrap(); + let err = cluster_settings(&dir).unwrap_err(); + assert!( + err.to_string().contains("catalog_payload_digest_mismatch"), + "{err}" + ); + assert!(err.to_string().contains("cluster refresh"), "{err}"); + + // Missing state refuses with the import/apply remedy. + let empty = tempfile::tempdir().unwrap(); + let err = cluster_settings(empty.path()).unwrap_err(); + assert!(err.to_string().contains("cluster_state_missing"), "{err}"); +}