diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 919f35d..3cc26d5 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -12,6 +12,8 @@ pub use graph_id::GraphId; pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId}; pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot}; +use crate::queries::{QueryRegistry, check}; + use std::collections::{HashMap, HashSet}; use std::fs; use std::io; @@ -160,6 +162,10 @@ pub enum ServerConfigMode { uri: String, /// Top-level `policy.file` (single-graph Cedar policy). policy_file: Option, + /// Top-level stored-query registry, loaded and identity-checked + /// at settings-build time; type-checked against the schema when + /// the engine opens. + queries: QueryRegistry, }, /// Multi-graph invocation — `--config omnigraph.yaml` with a /// non-empty `graphs:` map and no single-mode selector. @@ -186,6 +192,10 @@ pub struct GraphStartupConfig { pub graph_id: String, pub uri: String, pub policy_file: Option, + /// Per-graph stored-query registry, loaded and identity-checked at + /// settings-build time; type-checked against the schema when this + /// graph's engine opens. + pub queries: QueryRegistry, } /// Runtime routing for the server. Single mode = legacy @@ -286,7 +296,31 @@ impl AppState { ) -> Self { let bearer_tokens = hash_bearer_tokens(bearer_tokens); let per_graph_policy = policy_engine.map(Arc::new); - Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload)) + Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload), None) + } + + /// Like `new_single`, but attaches a pre-validated stored-query + /// registry. Private — the production single-mode boot path + /// (`open_single_with_queries`) is the only caller; every public + /// `new_*` constructor builds with no stored queries. + fn new_single_with_queries( + uri: String, + db: Omnigraph, + bearer_tokens: Vec<(String, String)>, + policy_engine: Option, + workload: workload::WorkloadController, + queries: Option>, + ) -> Self { + let bearer_tokens = hash_bearer_tokens(bearer_tokens); + let per_graph_policy = policy_engine.map(Arc::new); + Self::build_single_mode( + uri, + db, + bearer_tokens, + per_graph_policy, + Arc::new(workload), + queries, + ) } pub fn new(uri: String, db: Omnigraph) -> Self { @@ -378,6 +412,28 @@ impl AppState { uri: impl Into, bearer_tokens: Vec<(String, String)>, policy_file: Option<&PathBuf>, + ) -> Result { + Self::open_single_with_queries( + uri, + bearer_tokens, + policy_file, + QueryRegistry::default(), + ) + .await + } + + /// Single-mode boot with a stored-query registry: open the engine, + /// **type-check the registry against the live schema and refuse to + /// start on a breakage** (same posture as bad policy YAML), log + /// non-blocking warnings, then attach the registry to the handle. + /// With an empty registry the check is a no-op and no registry is + /// attached — that is the path `open_with_bearer_tokens_and_policy` + /// (no stored queries) takes. + pub async fn open_single_with_queries( + uri: impl Into, + bearer_tokens: Vec<(String, String)>, + policy_file: Option<&PathBuf>, + queries: QueryRegistry, ) -> Result { // The "policy requires tokens" invariant is enforced once by // `classify_server_runtime_state` in `serve()`, before either @@ -386,15 +442,29 @@ impl AppState { // already been rejected — no second bail needed. let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?; let db = Omnigraph::open(&uri).await?; + + let report = check(&queries, &db.catalog()); + if report.has_breakages() { + bail!("{}", format_registry_breakages(&uri, &report)); + } + log_registry_warnings(&uri, &report); + let policy_engine = match policy_file { Some(path) => Some(PolicyEngine::load_graph(path, &uri)?), None => None, }; - Ok(Self::new_with_bearer_tokens_and_policy( + let registry = if queries.is_empty() { + None + } else { + Some(Arc::new(queries)) + }; + Ok(Self::new_single_with_queries( uri, db, bearer_tokens, policy_engine, + workload::WorkloadController::from_env(), + registry, )) } @@ -409,6 +479,7 @@ impl AppState { bearer_tokens: Arc<[(BearerTokenHash, Arc)]>, policy_engine: Option>, workload: Arc, + queries: Option>, ) -> Self { // Engine-layer policy gate (MR-722). With a per-graph policy // installed, every `_as` writer on `Omnigraph` calls into the @@ -437,9 +508,7 @@ impl AppState { uri, engine: Arc::new(db), policy: policy_engine, - // Stored-query registry is wired in by the boot path; - // single-mode construction defaults to "no stored queries". - queries: None, + queries, }); Self { routing: GraphRouting::Single { handle }, @@ -754,6 +823,40 @@ pub fn init_tracing() { let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init(); } +/// Format every breakage in a registry check report into a multi-line +/// boot-abort message, naming each offending query. +fn format_registry_breakages(label: &str, report: &queries::CheckReport) -> String { + let joined = report + .breakages + .iter() + .map(|b| format!("query '{}': {}", b.query, b.message)) + .collect::>() + .join("\n "); + format!( + "graph '{label}': {} stored quer{} failed the schema check:\n {joined}", + report.breakages.len(), + if report.breakages.len() == 1 { "y" } else { "ies" } + ) +} + +/// Log each non-blocking advisory from a registry check report. +fn log_registry_warnings(label: &str, report: &queries::CheckReport) { + for warning in &report.warnings { + warn!(graph = label, query = %warning.query, "stored query: {}", warning.message); + } +} + +/// Format every load error (parse / identity failure) into a multi-line +/// boot-abort message. +fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> String { + let joined = errors + .iter() + .map(|e| e.to_string()) + .collect::>() + .join("\n "); + format!("graph '{label}': stored-query registry failed to load:\n {joined}") +} + pub fn load_server_settings( config_path: Option<&PathBuf>, cli_uri: Option, @@ -804,7 +907,16 @@ pub fn load_server_settings( format!("normalize single-graph URI '{raw_uri}' from server settings") })?; let policy_file = config.resolve_policy_file(); - ServerConfigMode::Single { uri, policy_file } + // Single mode uses the top-level `queries:` (mirrors top-level + // `policy.file`). Load + identity-check now (no engine needed); + // the schema type-check happens when the engine opens. + let queries = QueryRegistry::load(&config, config.query_entries()) + .map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(&uri, &errs)))?; + ServerConfigMode::Single { + uri, + policy_file, + queries, + } } else if has_explicit_config && has_graphs_map { if config.resolve_policy_file().is_some() { bail!( @@ -827,10 +939,15 @@ pub fn load_server_settings( let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| { format!("normalize URI '{raw_uri}' for graph '{name}' in omnigraph.yaml") })?; + // Per-graph `queries:`. Load + identity-check now; the schema + // type-check happens when this graph's engine opens. + let queries = QueryRegistry::load(&config, &target.queries) + .map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(name, &errs)))?; graphs.push(GraphStartupConfig { graph_id: name.clone(), uri, policy_file: config.resolve_target_policy_file(name), + queries, }); } let config_path = config_path @@ -1050,10 +1167,14 @@ pub async fn serve(config: ServerConfig) -> Result<()> { let bind = config.bind.clone(); let state = match config.mode { - ServerConfigMode::Single { uri, policy_file } => { + ServerConfigMode::Single { + uri, + policy_file, + queries, + } => { let uri_for_log = uri.clone(); info!(uri = %uri_for_log, bind = %bind, mode = "single", "serving omnigraph"); - AppState::open_with_bearer_tokens_and_policy(uri, tokens, policy_file.as_ref()).await? + AppState::open_single_with_queries(uri, tokens, policy_file.as_ref(), queries).await? } ServerConfigMode::Multi { graphs, @@ -1135,6 +1256,14 @@ async fn open_single_graph(cfg: GraphStartupConfig) -> Result> .await .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?; + // Type-check this graph's stored queries against the live schema; + // refuse to start on a breakage (same posture as bad policy YAML). + let report = check(&cfg.queries, &db.catalog()); + if report.has_breakages() { + bail!("{}", format_registry_breakages(graph_id.as_str(), &report)); + } + log_registry_warnings(graph_id.as_str(), &report); + let (policy_arc, db) = match &cfg.policy_file { Some(path) => { let policy = PolicyEngine::load_graph(path, graph_id.as_str())?; @@ -1145,13 +1274,17 @@ async fn open_single_graph(cfg: GraphStartupConfig) -> Result> None => (None, db), }; + let queries = if cfg.queries.is_empty() { + None + } else { + Some(Arc::new(cfg.queries)) + }; Ok(Arc::new(GraphHandle { key: GraphKey::cluster(graph_id), uri, engine: Arc::new(db), policy: policy_arc, - // Stored-query registry is loaded + checked by the boot path. - queries: None, + queries, })) } @@ -2854,6 +2987,7 @@ server: .to_string_lossy() .into_owned(), policy_file: None, + queries: crate::queries::QueryRegistry::default(), }], config_path: temp.path().join("omnigraph.yaml"), server_policy_file: Some(policy_path), @@ -2902,6 +3036,7 @@ server: .to_string_lossy() .into_owned(), policy_file: None, + queries: crate::queries::QueryRegistry::default(), }, bind: "127.0.0.1:0".to_string(), allow_unauthenticated: false, diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 97344c3..852fa5f 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -16,6 +16,7 @@ use omnigraph_server::api::{ BranchCreateRequest, BranchMergeRequest, ChangeRequest, ErrorOutput, ExportRequest, IngestRequest, QueryRequest, ReadRequest, SchemaApplyRequest, SchemaOutput, }; +use omnigraph_server::queries::{QueryRegistry, RegistrySpec}; use omnigraph_server::{AppState, build_app}; use serde_json::{Value, json}; use serial_test::serial; @@ -141,6 +142,73 @@ fn graph_path(root: &Path) -> PathBuf { root.join("server.omni") } +fn stored_query_registry(specs: &[(&str, &str, bool)]) -> QueryRegistry { + QueryRegistry::from_specs( + specs + .iter() + .map(|(name, source, expose)| RegistrySpec { + name: name.to_string(), + source: source.to_string(), + expose: *expose, + tool_name: None, + }) + .collect(), + ) + .expect("specs parse and key==symbol") +} + +#[tokio::test] +async fn server_boots_with_a_valid_stored_query_registry() { + // A stored query that type-checks against the fixture schema + // (`Person { name, age }`) must let the server boot. + let temp = init_loaded_graph().await; + let graph = graph_path(temp.path()); + let registry = stored_query_registry(&[( + "find_person", + "query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }", + false, + )]); + let state = AppState::open_single_with_queries( + graph.to_string_lossy().to_string(), + vec![], + None, + registry, + ) + .await; + assert!(state.is_ok(), "valid registry should boot: {:?}", state.err()); +} + +#[tokio::test] +async fn server_refuses_boot_on_type_broken_stored_query() { + // A stored query referencing a type not in the schema (`Widget`) + // must abort boot, naming the offending query. + let temp = init_loaded_graph().await; + let graph = graph_path(temp.path()); + let registry = stored_query_registry(&[( + "ghost", + "query ghost() { match { $w: Widget } return { $w.name } }", + false, + )]); + let result = AppState::open_single_with_queries( + graph.to_string_lossy().to_string(), + vec![], + None, + registry, + ) + .await; + // `AppState` is not `Debug`, so match rather than `expect_err`. + let err = match result { + Ok(_) => panic!("type-broken stored query must refuse boot"), + Err(err) => err, + }; + let msg = err.to_string(); + assert!(msg.contains("ghost"), "error should name the broken query: {msg}"); + assert!( + msg.contains("schema check"), + "error should mention the schema check: {msg}" + ); +} + fn drifted_test_schema() -> String { fs::read_to_string(fixture("test.pg")) .unwrap()