Load and type-check stored queries at server boot, refusing breakage

At startup the server now loads each graph's stored-query registry,
type-checks every query against that graph's live schema, and refuses to
boot if any query references a type/property the schema doesn't have
(same posture as bad policy YAML) — so schema drift surfaces at the
deploy boundary, not silently at invocation. Non-blocking warnings are
logged. The validated registry is attached to the GraphHandle (the two
production sites previously held `queries: None`).

Loading (parse + key==symbol identity) happens at settings-build time
where the config is in scope; the schema type-check happens after each
engine opens (single mode in `open_single_with_queries`, multi mode in
`open_single_graph`). `open_with_bearer_tokens_and_policy` delegates
with an empty registry so its 18 test callers are unchanged; the public
`new_*` constructors are unchanged (only the private build path threads
the registry).

- ServerConfigMode::Single / GraphStartupConfig carry the loaded registry
- boot tests: valid registry boots; type-broken query refuses boot + names it

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-05-30 20:45:35 +02:00
parent 6a16b3c6ac
commit 9aa96cbb4b
No known key found for this signature in database
2 changed files with 213 additions and 10 deletions

View file

@ -12,6 +12,8 @@ pub use graph_id::GraphId;
pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId};
pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot};
use crate::queries::{QueryRegistry, check};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::io;
@ -160,6 +162,10 @@ pub enum ServerConfigMode {
uri: String,
/// Top-level `policy.file` (single-graph Cedar policy).
policy_file: Option<PathBuf>,
/// Top-level stored-query registry, loaded and identity-checked
/// at settings-build time; type-checked against the schema when
/// the engine opens.
queries: QueryRegistry,
},
/// Multi-graph invocation — `--config omnigraph.yaml` with a
/// non-empty `graphs:` map and no single-mode selector.
@ -186,6 +192,10 @@ pub struct GraphStartupConfig {
pub graph_id: String,
pub uri: String,
pub policy_file: Option<PathBuf>,
/// Per-graph stored-query registry, loaded and identity-checked at
/// settings-build time; type-checked against the schema when this
/// graph's engine opens.
pub queries: QueryRegistry,
}
/// Runtime routing for the server. Single mode = legacy
@ -286,7 +296,31 @@ impl AppState {
) -> Self {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let per_graph_policy = policy_engine.map(Arc::new);
Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload))
Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload), None)
}
/// Like `new_single`, but attaches a pre-validated stored-query
/// registry. Private — the production single-mode boot path
/// (`open_single_with_queries`) is the only caller; every public
/// `new_*` constructor builds with no stored queries.
fn new_single_with_queries(
uri: String,
db: Omnigraph,
bearer_tokens: Vec<(String, String)>,
policy_engine: Option<PolicyEngine>,
workload: workload::WorkloadController,
queries: Option<Arc<QueryRegistry>>,
) -> Self {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let per_graph_policy = policy_engine.map(Arc::new);
Self::build_single_mode(
uri,
db,
bearer_tokens,
per_graph_policy,
Arc::new(workload),
queries,
)
}
pub fn new(uri: String, db: Omnigraph) -> Self {
@ -378,6 +412,28 @@ impl AppState {
uri: impl Into<String>,
bearer_tokens: Vec<(String, String)>,
policy_file: Option<&PathBuf>,
) -> Result<Self> {
Self::open_single_with_queries(
uri,
bearer_tokens,
policy_file,
QueryRegistry::default(),
)
.await
}
/// Single-mode boot with a stored-query registry: open the engine,
/// **type-check the registry against the live schema and refuse to
/// start on a breakage** (same posture as bad policy YAML), log
/// non-blocking warnings, then attach the registry to the handle.
/// With an empty registry the check is a no-op and no registry is
/// attached — that is the path `open_with_bearer_tokens_and_policy`
/// (no stored queries) takes.
pub async fn open_single_with_queries(
uri: impl Into<String>,
bearer_tokens: Vec<(String, String)>,
policy_file: Option<&PathBuf>,
queries: QueryRegistry,
) -> Result<Self> {
// The "policy requires tokens" invariant is enforced once by
// `classify_server_runtime_state` in `serve()`, before either
@ -386,15 +442,29 @@ impl AppState {
// already been rejected — no second bail needed.
let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
let db = Omnigraph::open(&uri).await?;
let report = check(&queries, &db.catalog());
if report.has_breakages() {
bail!("{}", format_registry_breakages(&uri, &report));
}
log_registry_warnings(&uri, &report);
let policy_engine = match policy_file {
Some(path) => Some(PolicyEngine::load_graph(path, &uri)?),
None => None,
};
Ok(Self::new_with_bearer_tokens_and_policy(
let registry = if queries.is_empty() {
None
} else {
Some(Arc::new(queries))
};
Ok(Self::new_single_with_queries(
uri,
db,
bearer_tokens,
policy_engine,
workload::WorkloadController::from_env(),
registry,
))
}
@ -409,6 +479,7 @@ impl AppState {
bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
policy_engine: Option<Arc<PolicyEngine>>,
workload: Arc<workload::WorkloadController>,
queries: Option<Arc<QueryRegistry>>,
) -> Self {
// Engine-layer policy gate (MR-722). With a per-graph policy
// installed, every `_as` writer on `Omnigraph` calls into the
@ -437,9 +508,7 @@ impl AppState {
uri,
engine: Arc::new(db),
policy: policy_engine,
// Stored-query registry is wired in by the boot path;
// single-mode construction defaults to "no stored queries".
queries: None,
queries,
});
Self {
routing: GraphRouting::Single { handle },
@ -754,6 +823,40 @@ pub fn init_tracing() {
let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
}
/// Format every breakage in a registry check report into a multi-line
/// boot-abort message, naming each offending query.
fn format_registry_breakages(label: &str, report: &queries::CheckReport) -> String {
let joined = report
.breakages
.iter()
.map(|b| format!("query '{}': {}", b.query, b.message))
.collect::<Vec<_>>()
.join("\n ");
format!(
"graph '{label}': {} stored quer{} failed the schema check:\n {joined}",
report.breakages.len(),
if report.breakages.len() == 1 { "y" } else { "ies" }
)
}
/// Log each non-blocking advisory from a registry check report.
fn log_registry_warnings(label: &str, report: &queries::CheckReport) {
for warning in &report.warnings {
warn!(graph = label, query = %warning.query, "stored query: {}", warning.message);
}
}
/// Format every load error (parse / identity failure) into a multi-line
/// boot-abort message.
fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> String {
let joined = errors
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join("\n ");
format!("graph '{label}': stored-query registry failed to load:\n {joined}")
}
pub fn load_server_settings(
config_path: Option<&PathBuf>,
cli_uri: Option<String>,
@ -804,7 +907,16 @@ pub fn load_server_settings(
format!("normalize single-graph URI '{raw_uri}' from server settings")
})?;
let policy_file = config.resolve_policy_file();
ServerConfigMode::Single { uri, policy_file }
// Single mode uses the top-level `queries:` (mirrors top-level
// `policy.file`). Load + identity-check now (no engine needed);
// the schema type-check happens when the engine opens.
let queries = QueryRegistry::load(&config, config.query_entries())
.map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(&uri, &errs)))?;
ServerConfigMode::Single {
uri,
policy_file,
queries,
}
} else if has_explicit_config && has_graphs_map {
if config.resolve_policy_file().is_some() {
bail!(
@ -827,10 +939,15 @@ pub fn load_server_settings(
let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
format!("normalize URI '{raw_uri}' for graph '{name}' in omnigraph.yaml")
})?;
// Per-graph `queries:`. Load + identity-check now; the schema
// type-check happens when this graph's engine opens.
let queries = QueryRegistry::load(&config, &target.queries)
.map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(name, &errs)))?;
graphs.push(GraphStartupConfig {
graph_id: name.clone(),
uri,
policy_file: config.resolve_target_policy_file(name),
queries,
});
}
let config_path = config_path
@ -1050,10 +1167,14 @@ pub async fn serve(config: ServerConfig) -> Result<()> {
let bind = config.bind.clone();
let state = match config.mode {
ServerConfigMode::Single { uri, policy_file } => {
ServerConfigMode::Single {
uri,
policy_file,
queries,
} => {
let uri_for_log = uri.clone();
info!(uri = %uri_for_log, bind = %bind, mode = "single", "serving omnigraph");
AppState::open_with_bearer_tokens_and_policy(uri, tokens, policy_file.as_ref()).await?
AppState::open_single_with_queries(uri, tokens, policy_file.as_ref(), queries).await?
}
ServerConfigMode::Multi {
graphs,
@ -1135,6 +1256,14 @@ async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>>
.await
.map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?;
// Type-check this graph's stored queries against the live schema;
// refuse to start on a breakage (same posture as bad policy YAML).
let report = check(&cfg.queries, &db.catalog());
if report.has_breakages() {
bail!("{}", format_registry_breakages(graph_id.as_str(), &report));
}
log_registry_warnings(graph_id.as_str(), &report);
let (policy_arc, db) = match &cfg.policy_file {
Some(path) => {
let policy = PolicyEngine::load_graph(path, graph_id.as_str())?;
@ -1145,13 +1274,17 @@ async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>>
None => (None, db),
};
let queries = if cfg.queries.is_empty() {
None
} else {
Some(Arc::new(cfg.queries))
};
Ok(Arc::new(GraphHandle {
key: GraphKey::cluster(graph_id),
uri,
engine: Arc::new(db),
policy: policy_arc,
// Stored-query registry is loaded + checked by the boot path.
queries: None,
queries,
}))
}
@ -2854,6 +2987,7 @@ server:
.to_string_lossy()
.into_owned(),
policy_file: None,
queries: crate::queries::QueryRegistry::default(),
}],
config_path: temp.path().join("omnigraph.yaml"),
server_policy_file: Some(policy_path),
@ -2902,6 +3036,7 @@ server:
.to_string_lossy()
.into_owned(),
policy_file: None,
queries: crate::queries::QueryRegistry::default(),
},
bind: "127.0.0.1:0".to_string(),
allow_unauthenticated: false,

View file

@ -16,6 +16,7 @@ use omnigraph_server::api::{
BranchCreateRequest, BranchMergeRequest, ChangeRequest, ErrorOutput, ExportRequest,
IngestRequest, QueryRequest, ReadRequest, SchemaApplyRequest, SchemaOutput,
};
use omnigraph_server::queries::{QueryRegistry, RegistrySpec};
use omnigraph_server::{AppState, build_app};
use serde_json::{Value, json};
use serial_test::serial;
@ -141,6 +142,73 @@ fn graph_path(root: &Path) -> PathBuf {
root.join("server.omni")
}
fn stored_query_registry(specs: &[(&str, &str, bool)]) -> QueryRegistry {
QueryRegistry::from_specs(
specs
.iter()
.map(|(name, source, expose)| RegistrySpec {
name: name.to_string(),
source: source.to_string(),
expose: *expose,
tool_name: None,
})
.collect(),
)
.expect("specs parse and key==symbol")
}
#[tokio::test]
async fn server_boots_with_a_valid_stored_query_registry() {
// A stored query that type-checks against the fixture schema
// (`Person { name, age }`) must let the server boot.
let temp = init_loaded_graph().await;
let graph = graph_path(temp.path());
let registry = stored_query_registry(&[(
"find_person",
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
false,
)]);
let state = AppState::open_single_with_queries(
graph.to_string_lossy().to_string(),
vec![],
None,
registry,
)
.await;
assert!(state.is_ok(), "valid registry should boot: {:?}", state.err());
}
#[tokio::test]
async fn server_refuses_boot_on_type_broken_stored_query() {
// A stored query referencing a type not in the schema (`Widget`)
// must abort boot, naming the offending query.
let temp = init_loaded_graph().await;
let graph = graph_path(temp.path());
let registry = stored_query_registry(&[(
"ghost",
"query ghost() { match { $w: Widget } return { $w.name } }",
false,
)]);
let result = AppState::open_single_with_queries(
graph.to_string_lossy().to_string(),
vec![],
None,
registry,
)
.await;
// `AppState` is not `Debug`, so match rather than `expect_err`.
let err = match result {
Ok(_) => panic!("type-broken stored query must refuse boot"),
Err(err) => err,
};
let msg = err.to_string();
assert!(msg.contains("ghost"), "error should name the broken query: {msg}");
assert!(
msg.contains("schema check"),
"error should mention the schema check: {msg}"
);
}
fn drifted_test_schema() -> String {
fs::read_to_string(fixture("test.pg"))
.unwrap()