fix(config): enforce graph-scoped policies and query validation

This commit is contained in:
Ragnor Comerford 2026-06-01 17:06:41 +02:00
parent fb442adb14
commit 845e32324c
No known key found for this signature in database
12 changed files with 682 additions and 168 deletions

View file

@ -24,7 +24,7 @@ use omnigraph_server::api::{
SnapshotTableOutput, commit_output, ingest_output, read_output, schema_apply_output,
snapshot_payload,
};
use omnigraph_server::queries::{QueryRegistry, check};
use omnigraph_server::queries::{QueryRegistry, check, format_check_breakages};
use omnigraph_server::{
AliasCommand, OmnigraphConfig, PolicyAction, PolicyDecision, PolicyEngine, PolicyRequest,
PolicyTestConfig, ReadOutputFormat, load_config,
@ -778,25 +778,80 @@ fn load_cli_config(config_path: Option<&PathBuf>) -> Result<OmnigraphConfig> {
Ok(config)
}
fn resolve_policy_engine(config: &OmnigraphConfig) -> Result<PolicyEngine> {
let policy_file = config
.resolve_policy_file()
.ok_or_else(|| color_eyre::eyre::eyre!("policy.file must be set in omnigraph.yaml"))?;
PolicyEngine::load_graph(&policy_file, &policy_graph_id(config))
#[derive(Debug, Clone)]
struct ResolvedCliGraph {
uri: String,
selected: Option<String>,
policy_file: Option<PathBuf>,
is_remote: bool,
}
/// Open a local-URI graph and, when `policy.file` is configured in
/// `omnigraph.yaml`, install the resolved `PolicyEngine` on the engine
/// handle so every direct-engine write goes through
/// `Omnigraph::enforce(...)` (MR-722). Without a configured policy this
/// is identical to a bare `Omnigraph::open`.
///
/// Returns owned `Omnigraph`; chained on top of `Omnigraph::open(...)`'s
/// existing future to keep call sites narrow.
async fn open_local_db_with_policy(uri: &str, config: &OmnigraphConfig) -> Result<Omnigraph> {
let db = Omnigraph::open(uri).await?;
if config.resolve_policy_file().is_some() {
let engine = Arc::new(resolve_policy_engine(config)?);
impl ResolvedCliGraph {
fn selected(&self) -> Option<&str> {
self.selected.as_deref()
}
}
struct ResolvedPolicyContext {
policy_file: PathBuf,
graph_id: String,
}
fn resolve_policy_context(config: &OmnigraphConfig) -> Result<ResolvedPolicyContext> {
let selected = config.cli_graph_name().map(str::to_string);
config.resolve_graph_selection(selected.as_deref())?;
let policy_file = config
.resolve_policy_file_for(selected.as_deref())
.ok_or_else(|| {
color_eyre::eyre::eyre!(
"policy.file or graphs.<name>.policy.file must be set in omnigraph.yaml"
)
})?;
let graph_id = if let Some(name) = &config.project.name {
name.clone()
} else if let Some(selected) = selected.as_deref() {
config
.resolve_target_uri(None, Some(selected), None)
.unwrap_or_else(|_| selected.to_string())
} else {
policy_graph_id_from_uri(config, None)
};
Ok(ResolvedPolicyContext {
policy_file,
graph_id,
})
}
fn resolve_policy_engine(context: &ResolvedPolicyContext) -> Result<PolicyEngine> {
PolicyEngine::load_graph(&context.policy_file, &context.graph_id)
}
fn resolve_policy_engine_for_graph(
config: &OmnigraphConfig,
graph: &ResolvedCliGraph,
) -> Result<PolicyEngine> {
let policy_file = graph.policy_file.as_ref().ok_or_else(|| {
color_eyre::eyre::eyre!(
"policy.file or graphs.<name>.policy.file must be set in omnigraph.yaml"
)
})?;
PolicyEngine::load_graph(
policy_file,
&policy_graph_id_from_uri(config, Some(&graph.uri)),
)
}
/// Open a local graph and install the policy resolved for the same graph
/// identity that produced the URI. A named graph uses
/// `graphs.<name>.policy.file`; an explicit positional URI is anonymous and
/// uses the legacy top-level `policy.file`.
async fn open_local_db_with_policy(
graph: &ResolvedCliGraph,
config: &OmnigraphConfig,
) -> Result<Omnigraph> {
let db = Omnigraph::open(&graph.uri).await?;
if graph.policy_file.is_some() {
let engine = Arc::new(resolve_policy_engine_for_graph(config, graph)?);
Ok(db.with_policy(engine as Arc<dyn omnigraph_policy::PolicyChecker>))
} else {
Ok(db)
@ -813,18 +868,17 @@ fn resolve_cli_actor<'a>(cli_as: Option<&'a str>, config: &'a OmnigraphConfig) -
cli_as.or(config.cli.actor.as_deref())
}
fn resolve_policy_tests_path(config: &OmnigraphConfig) -> Result<PathBuf> {
config.resolve_policy_tests_file().ok_or_else(|| {
color_eyre::eyre::eyre!(
"policy.tests.yaml requires policy.file to be set in omnigraph.yaml"
)
})
fn resolve_policy_tests_path(context: &ResolvedPolicyContext) -> PathBuf {
context.policy_file.with_file_name("policy.tests.yaml")
}
fn policy_graph_id(config: &OmnigraphConfig) -> String {
fn policy_graph_id_from_uri(config: &OmnigraphConfig, uri: Option<&str>) -> String {
if let Some(name) = &config.project.name {
return name.clone();
}
if let Some(uri) = uri {
return uri.to_string();
}
config
.resolve_target_uri(None, None, config.server_graph_name())
.or_else(|_| config.resolve_target_uri(None, None, config.cli_graph_name()))
@ -912,6 +966,44 @@ fn resolve_uri(
config.resolve_target_uri(cli_uri, cli_target, config.cli_graph_name())
}
fn resolve_cli_graph(
config: &OmnigraphConfig,
cli_uri: Option<String>,
cli_target: Option<&str>,
) -> Result<ResolvedCliGraph> {
let selected = if cli_uri.is_some() {
None
} else {
cli_target
.map(str::to_string)
.or_else(|| config.cli_graph_name().map(str::to_string))
};
config.resolve_graph_selection(selected.as_deref())?;
let uri = resolve_uri(config, cli_uri, cli_target)?;
Ok(ResolvedCliGraph {
is_remote: is_remote_uri(&uri),
policy_file: config.resolve_policy_file_for(selected.as_deref()),
selected,
uri,
})
}
fn resolve_local_graph(
config: &OmnigraphConfig,
cli_uri: Option<String>,
cli_target: Option<&str>,
operation: &str,
) -> Result<ResolvedCliGraph> {
let graph = resolve_cli_graph(config, cli_uri, cli_target)?;
if graph.is_remote {
bail!(
"{} is only supported against local graph URIs in this milestone",
operation
);
}
Ok(graph)
}
/// Parse a Go-style compact duration: `7d`, `24h`, `30m`, `90s`, or a plain
/// integer as seconds. Used by the `cleanup --older-than` flag.
fn parse_duration_arg(s: &str) -> Result<std::time::Duration> {
@ -950,14 +1042,7 @@ fn resolve_local_uri(
cli_target: Option<&str>,
operation: &str,
) -> Result<String> {
let uri = resolve_uri(config, cli_uri, cli_target)?;
if is_remote_uri(&uri) {
bail!(
"{} is only supported against local graph URIs in this milestone",
operation
);
}
Ok(uri)
Ok(resolve_local_graph(config, cli_uri, cli_target, operation)?.uri)
}
fn resolve_branch(
@ -1691,20 +1776,8 @@ fn resolve_selected_graph(
cli_target: Option<&str>,
operation: &str,
) -> Result<(String, Option<String>)> {
let selected = if cli_uri.is_some() {
None
} else {
cli_target
.map(str::to_string)
.or_else(|| config.cli_graph_name().map(str::to_string))
};
// Validate the selection through the single gate (membership + coherence),
// so a positional URI stays anonymous and a named graph is rejected when a
// top-level block would be silently ignored — matching server boot. `list`
// already routes through the same gate; this keeps `validate` in step.
config.resolve_graph_selection(selected.as_deref())?;
let uri = resolve_local_uri(config, cli_uri, cli_target, operation)?;
Ok((uri, selected))
let graph = resolve_local_graph(config, cli_uri, cli_target, operation)?;
Ok((graph.uri, graph.selected))
}
/// Load the stored-query registry for an already-resolved graph selection
@ -1725,6 +1798,20 @@ fn load_registry_or_report(
})
}
fn validate_registry_for_catalog(
registry: &QueryRegistry,
catalog: &omnigraph_compiler::catalog::Catalog,
label: &str,
) -> omnigraph::error::Result<()> {
let report = check(registry, catalog);
if report.has_breakages() {
return Err(omnigraph::error::OmniError::manifest(
format_check_breakages(label, &report),
));
}
Ok(())
}
async fn execute_queries_validate(
uri: Option<String>,
target: Option<String>,
@ -1899,7 +1986,7 @@ async fn execute_read_remote(
}
async fn execute_change(
uri: &str,
graph: &ResolvedCliGraph,
query_source: &str,
query_name: Option<&str>,
branch: &str,
@ -1909,7 +1996,7 @@ async fn execute_change(
) -> Result<ChangeOutput> {
let (selected_name, query_params) = select_named_query(query_source, query_name)?;
let params = query_params_from_json(&query_params, params_json)?;
let db = open_local_db_with_policy(uri, config).await?;
let db = open_local_db_with_policy(graph, config).await?;
let actor = resolve_cli_actor(cli_as_actor, config);
let result = db
.mutate_as(branch, query_source, &selected_name, &params, actor)
@ -2137,9 +2224,10 @@ async fn main() -> Result<()> {
json,
} => {
let config = load_cli_config(config.as_ref())?;
let uri = resolve_local_uri(&config, uri, target.as_deref(), "load")?;
let graph = resolve_local_graph(&config, uri, target.as_deref(), "load")?;
let uri = graph.uri.clone();
let branch = resolve_branch(&config, branch, None, "main");
let db = open_local_db_with_policy(&uri, &config).await?;
let db = open_local_db_with_policy(&graph, &config).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
let result = db
.load_file_as(&branch, &data.to_string_lossy(), mode.into(), actor)
@ -2180,10 +2268,11 @@ async fn main() -> Result<()> {
let config = load_cli_config(config.as_ref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let branch = resolve_branch(&config, branch, None, "main");
let from = resolve_branch(&config, from, None, "main");
let payload = if is_remote_uri(&uri) {
let payload = if graph.is_remote {
let data = fs::read_to_string(&data)?;
remote_json::<IngestOutput>(
&http_client,
@ -2199,7 +2288,7 @@ async fn main() -> Result<()> {
)
.await?
} else {
let db = open_local_db_with_policy(&uri, &config).await?;
let db = open_local_db_with_policy(&graph, &config).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
let result = db
.ingest_file_as(
@ -2230,9 +2319,10 @@ async fn main() -> Result<()> {
let config = load_cli_config(config.as_ref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let from = resolve_branch(&config, from, None, "main");
let payload = if is_remote_uri(&uri) {
let payload = if graph.is_remote {
remote_json::<BranchCreateOutput>(
&http_client,
Method::POST,
@ -2245,7 +2335,7 @@ async fn main() -> Result<()> {
)
.await?
} else {
let db = open_local_db_with_policy(&uri, &config).await?;
let db = open_local_db_with_policy(&graph, &config).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
db.branch_create_from_as(ReadTarget::branch(&from), &name, actor)
.await?;
@ -2271,8 +2361,9 @@ async fn main() -> Result<()> {
let config = load_cli_config(config.as_ref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let payload = if is_remote_uri(&uri) {
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let payload = if graph.is_remote {
remote_json::<BranchListOutput>(
&http_client,
Method::GET,
@ -2305,8 +2396,9 @@ async fn main() -> Result<()> {
let config = load_cli_config(config.as_ref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let payload = if is_remote_uri(&uri) {
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let payload = if graph.is_remote {
remote_json::<BranchDeleteOutput>(
&http_client,
Method::DELETE,
@ -2316,7 +2408,7 @@ async fn main() -> Result<()> {
)
.await?
} else {
let db = open_local_db_with_policy(&uri, &config).await?;
let db = open_local_db_with_policy(&graph, &config).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
db.branch_delete_as(&name, actor).await?;
BranchDeleteOutput {
@ -2342,9 +2434,10 @@ async fn main() -> Result<()> {
let config = load_cli_config(config.as_ref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let into = resolve_branch(&config, into, None, "main");
let payload = if is_remote_uri(&uri) {
let payload = if graph.is_remote {
remote_json::<BranchMergeOutput>(
&http_client,
Method::POST,
@ -2357,7 +2450,7 @@ async fn main() -> Result<()> {
)
.await?
} else {
let db = open_local_db_with_policy(&uri, &config).await?;
let db = open_local_db_with_policy(&graph, &config).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
let outcome = db.branch_merge_as(&source, &into, actor).await?;
BranchMergeOutput {
@ -2492,9 +2585,10 @@ async fn main() -> Result<()> {
let config = load_cli_config(config.as_ref())?;
let bearer_token =
resolve_remote_bearer_token(&config, uri.as_deref(), target.as_deref())?;
let uri = resolve_uri(&config, uri, target.as_deref())?;
let graph = resolve_cli_graph(&config, uri, target.as_deref())?;
let uri = graph.uri.clone();
let schema_source = fs::read_to_string(&schema)?;
let output = if is_remote_uri(&uri) {
let output = if graph.is_remote {
// MR-694 PR B: SchemaApplyRequest gained an
// allow_data_loss field so Hard-mode drops are no
// longer CLI-only. The previous bail is gone; the
@ -2512,13 +2606,22 @@ async fn main() -> Result<()> {
)
.await?
} else {
let db = open_local_db_with_policy(&uri, &config).await?;
let db = open_local_db_with_policy(&graph, &config).await?;
let actor = resolve_cli_actor(cli.as_actor.as_deref(), &config);
let registry = load_registry_or_report(&config, graph.selected())?;
let registry = (!registry.is_empty()).then_some(registry);
let label = graph.selected().unwrap_or(&uri).to_string();
let result = db
.apply_schema_as(
.apply_schema_as_with_catalog_check(
&schema_source,
omnigraph::db::SchemaApplyOptions { allow_data_loss },
actor,
|catalog| {
if let Some(registry) = registry.as_ref() {
validate_registry_for_catalog(registry, catalog, &label)?;
}
Ok(())
},
)
.await?;
schema_apply_output(&uri, result)
@ -2697,7 +2800,8 @@ async fn main() -> Result<()> {
.as_deref()
.or_else(|| alias_config.and_then(|alias| alias.graph.as_deref()));
let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?;
let uri = resolve_uri(&config, uri, target_name)?;
let graph = resolve_cli_graph(&config, uri, target_name)?;
let uri = graph.uri.clone();
let query_source = resolve_query_source(
&config,
query.as_ref(),
@ -2719,7 +2823,7 @@ async fn main() -> Result<()> {
alias_config.and_then(|alias| alias.branch.clone()),
)?;
let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone()));
let output = if is_remote_uri(&uri) {
let output = if graph.is_remote {
execute_read_remote(
&http_client,
&uri,
@ -2782,7 +2886,8 @@ async fn main() -> Result<()> {
.as_deref()
.or_else(|| alias_config.and_then(|alias| alias.graph.as_deref()));
let bearer_token = resolve_remote_bearer_token(&config, uri.as_deref(), target_name)?;
let uri = resolve_uri(&config, uri, target_name)?;
let graph = resolve_cli_graph(&config, uri, target_name)?;
let uri = graph.uri.clone();
let query_source = resolve_query_source(
&config,
query.as_ref(),
@ -2804,7 +2909,7 @@ async fn main() -> Result<()> {
"main",
);
let query_name = name.or_else(|| alias_config.and_then(|alias| alias.name.clone()));
let output = if is_remote_uri(&uri) {
let output = if graph.is_remote {
execute_change_remote(
&http_client,
&uri,
@ -2817,7 +2922,7 @@ async fn main() -> Result<()> {
.await?
} else {
execute_change(
&uri,
&graph,
&query_source,
query_name.as_deref(),
&branch,
@ -2836,20 +2941,19 @@ async fn main() -> Result<()> {
Command::Policy { command } => match command {
PolicyCommand::Validate { config } => {
let config = load_cli_config(config.as_ref())?;
let engine = resolve_policy_engine(&config)?;
let policy_file = config
.resolve_policy_file()
.expect("policy file should exist after resolve_policy_engine");
let context = resolve_policy_context(&config)?;
let engine = resolve_policy_engine(&context)?;
println!(
"policy valid: {} [{} actors]",
policy_file.display(),
context.policy_file.display(),
engine.known_actor_count()
);
}
PolicyCommand::Test { config } => {
let config = load_cli_config(config.as_ref())?;
let engine = resolve_policy_engine(&config)?;
let tests_path = resolve_policy_tests_path(&config)?;
let context = resolve_policy_context(&config)?;
let engine = resolve_policy_engine(&context)?;
let tests_path = resolve_policy_tests_path(&context);
let tests = PolicyTestConfig::load(&tests_path)?;
engine.run_tests(&tests)?;
println!("policy tests passed: {} cases", tests.cases.len());
@ -2862,7 +2966,8 @@ async fn main() -> Result<()> {
target_branch,
} => {
let config = load_cli_config(config.as_ref())?;
let engine = resolve_policy_engine(&config)?;
let context = resolve_policy_context(&config)?;
let engine = resolve_policy_engine(&context)?;
let request = PolicyRequest {
action,
branch,

View file

@ -74,14 +74,14 @@ project:
graphs:
local:
uri: {}
policy:
file: ./policy.yaml
cli:
graph: local
branch: main
query:
roots:
- .
policy:
file: ./policy.yaml
",
yaml_string(&graph.path().to_string_lossy())
)
@ -1000,8 +1000,8 @@ query vector_search($q: String) {
#[test]
fn local_cli_policy_tooling_is_end_to_end() {
// Sanity check for the read-only policy CLI surfaces. These don't
// mutate the graph — they just parse and evaluate the policy file —
// so they don't depend on PR #4's engine-side enforcement.
// mutate the graph; they parse and evaluate the effective policy for
// the `cli.graph` selection, including per-graph policy files.
let graph = SystemGraph::loaded();
let config = graph.write_config("omnigraph-policy.yaml", &local_policy_config(&graph));
graph.write_config("policy.yaml", POLICY_E2E_YAML);
@ -1039,10 +1039,10 @@ fn local_cli_policy_tooling_is_end_to_end() {
#[test]
fn local_cli_change_enforces_engine_layer_policy() {
// Asserts MR-722 PR #4: when `policy.file` is configured in
// `omnigraph.yaml`, the CLI loads PolicyEngine into Omnigraph and
// every direct-engine write hits `enforce(action, scope, actor)` —
// identical to what the HTTP server gets, regardless of transport.
// Asserts MR-722 PR #4: when the selected graph has a configured
// policy file, the CLI loads PolicyEngine into Omnigraph and every
// direct-engine write hits `enforce(action, scope, actor)` — identical
// to what the HTTP server gets, regardless of transport.
//
// Three cases, each discriminating:
//
@ -1135,6 +1135,32 @@ fn local_cli_change_enforces_engine_layer_policy() {
assert_eq!(verify["rows"][0]["p.name"], "RagnorOnMain");
}
#[test]
fn local_cli_positional_uri_does_not_inherit_default_graph_policy() {
let graph = SystemGraph::loaded();
let config = graph.write_config("omnigraph-policy.yaml", &local_policy_config(&graph));
graph.write_config("policy.yaml", POLICY_E2E_YAML);
let mutation_file = insert_person_query(&graph, "system-local-policy-positional.gq");
let allowed = parse_stdout_json(&output_success(
cli()
.arg("--as")
.arg("act-bruno")
.arg("change")
.arg("--config")
.arg(&config)
.arg("--uri")
.arg(graph.path())
.arg("--query")
.arg(&mutation_file)
.arg("--params")
.arg(r#"{"name":"PositionalUriBruno","age":4}"#)
.arg("--json"),
));
assert_eq!(allowed["affected_nodes"], 1);
assert_eq!(allowed["actor_id"], "act-bruno");
}
// ─── MR-722 PR A: CLI×writer matrix ───────────────────────────────────────
//
// The change writer is covered above by `local_cli_change_enforces_engine_layer_policy`.
@ -1293,6 +1319,62 @@ fn local_cli_schema_apply_enforces_engine_layer_policy() {
assert_eq!(allowed["applied"], true);
}
#[test]
fn local_cli_schema_apply_rejects_stored_query_breakage_before_publish() {
let graph = SystemGraph::loaded();
graph.write_query(
"stored-find-person.gq",
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }",
);
let config = graph.write_config(
"omnigraph-stored-query-schema.yaml",
&format!(
"\
graphs:
local:
uri: {}
queries:
find_person:
file: ./stored-find-person.gq
cli:
graph: local
branch: main
query:
roots:
- .
policy: {{}}
",
yaml_string(&graph.path().to_string_lossy())
),
);
let renamed_schema = std::fs::read_to_string(fixture("test.pg"))
.unwrap()
.replace("age: I32?", "years: I32? @rename_from(\"age\")");
let schema_path = graph.write_file("stored-query-breaks.pg", &renamed_schema);
let rejected = output_failure(
cli()
.arg("schema")
.arg("apply")
.arg("--config")
.arg(&config)
.arg("--schema")
.arg(&schema_path)
.arg("--json"),
);
let stderr = String::from_utf8_lossy(&rejected.stderr);
assert!(
stderr.contains("find_person") && stderr.contains("schema check"),
"schema apply should reject the stored-query breakage before publish; stderr: {stderr}"
);
let schema = stdout_string(&output_success(
cli().arg("schema").arg("show").arg("--config").arg(&config),
));
assert!(schema.contains("age: I32?"));
assert!(!schema.contains("years: I32?"));
}
#[test]
fn local_cli_branch_create_enforces_engine_layer_policy() {
let graph = SystemGraph::loaded();
@ -1448,6 +1530,8 @@ project:
graphs:
local:
uri: {}
policy:
file: ./policy.yaml
cli:
graph: local
branch: main
@ -1455,8 +1539,6 @@ cli:
query:
roots:
- .
policy:
file: ./policy.yaml
",
yaml_string(&graph.path().to_string_lossy()),
actor,

View file

@ -60,10 +60,10 @@ project:
graphs:
local:
uri: {}
policy:
file: ./policy.yaml
server:
graph: local
policy:
file: ./policy.yaml
",
yaml_string(&graph.path().to_string_lossy())
)

View file

@ -306,7 +306,7 @@ pub struct ChangeRequest {
/// Body for `POST /queries/{name}` — invokes the server-side stored query
/// named in the path. The query source and name come from the registry,
/// never the body; only the runtime inputs are supplied here.
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)]
pub struct InvokeStoredQueryRequest {
/// JSON object whose keys match the stored query's declared parameters.
#[serde(default)]

View file

@ -12,7 +12,7 @@ pub use graph_id::GraphId;
pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId};
pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot};
use crate::queries::{QueryRegistry, check};
use crate::queries::{QueryRegistry, check, format_check_breakages};
use std::collections::{HashMap, HashSet};
use std::fs;
@ -820,22 +820,6 @@ pub fn init_tracing() {
let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
}
/// Format every breakage in a registry check report into a multi-line
/// boot-abort message, naming each offending query.
fn format_registry_breakages(label: &str, report: &queries::CheckReport) -> String {
let joined = report
.breakages
.iter()
.map(|b| format!("query '{}': {}", b.query, b.message))
.collect::<Vec<_>>()
.join("\n ");
format!(
"graph '{label}': {} stored quer{} failed the schema check:\n {joined}",
report.breakages.len(),
if report.breakages.len() == 1 { "y" } else { "ies" }
)
}
/// Log each non-blocking advisory from a registry check report.
fn log_registry_warnings(label: &str, report: &queries::CheckReport) {
for warning in &report.warnings {
@ -843,6 +827,19 @@ fn log_registry_warnings(label: &str, report: &queries::CheckReport) {
}
}
fn validate_registry_against_catalog(
registry: &QueryRegistry,
catalog: &Catalog,
label: &str,
) -> omnigraph::error::Result<()> {
let report = check(registry, catalog);
if report.has_breakages() {
return Err(OmniError::manifest(format_check_breakages(label, &report)));
}
log_registry_warnings(label, &report);
Ok(())
}
/// Validate a loaded stored-query registry against the live schema and
/// resolve it to an attachable handle. Refuses boot on any breakage
/// (same posture as bad policy YAML), logs the non-blocking warnings,
@ -855,11 +852,8 @@ fn validate_and_attach(
catalog: &Catalog,
label: &str,
) -> Result<Option<Arc<QueryRegistry>>> {
let report = check(&queries, catalog);
if report.has_breakages() {
bail!("{}", format_registry_breakages(label, &report));
}
log_registry_warnings(label, &report);
validate_registry_against_catalog(&queries, catalog, label)
.map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?;
Ok(if queries.is_empty() {
None
} else {
@ -2214,13 +2208,26 @@ struct QueryNamePath {
name: String,
}
fn parse_optional_invoke_body(
body: Bytes,
) -> std::result::Result<InvokeStoredQueryRequest, ApiError> {
if body.is_empty() {
return Ok(InvokeStoredQueryRequest::default());
}
serde_json::from_slice::<Option<InvokeStoredQueryRequest>>(&body)
.map(|request| request.unwrap_or_default())
.map_err(|err| {
ApiError::bad_request(format!("invalid stored-query invocation body: {err}"))
})
}
#[utoipa::path(
post,
path = "/queries/{name}",
tag = "queries",
operation_id = "invoke_query",
params(("name" = String, Path, description = "Stored query name (the registry key)")),
request_body = InvokeStoredQueryRequest,
request_body = Option<InvokeStoredQueryRequest>,
responses(
(status = 200, description = "Read envelope (ReadOutput) or mutation envelope (ChangeOutput), serialized untagged", body = InvokeStoredQueryResponse),
(status = 400, description = "Bad request (param type error; snapshot on a stored mutation)", body = ErrorOutput),
@ -2249,8 +2256,9 @@ async fn server_invoke_query(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Path(QueryNamePath { name }): Path<QueryNamePath>,
Json(req): Json<InvokeStoredQueryRequest>,
body: Bytes,
) -> std::result::Result<Json<InvokeStoredQueryResponse>, ApiError> {
let req = parse_optional_invoke_body(body)?;
// A caller without `invoke_query` can't tell a denial from a missing
// query: both 404 with this exact message, so the catalog can't be
// probed without the grant. (A caller that holds invoke_query may still
@ -2469,18 +2477,26 @@ async fn server_schema_apply(
.map_err(ApiError::from_workload_reject)?;
let result = {
let db = &handle.engine;
let registry = handle.queries.as_deref();
let label = handle.key.graph_id.as_str().to_string();
// Engine-layer policy enforcement (MR-722): pass the resolved
// actor through so apply_schema_as can call enforce() with the
// authoritative identity. With a policy installed in AppState,
// engine-side enforcement re-checks the same decision the
// HTTP-layer authorize_request just made above. PR #3 collapses
// the redundancy.
db.apply_schema_as(
db.apply_schema_as_with_catalog_check(
&request.schema_source,
omnigraph::db::SchemaApplyOptions {
allow_data_loss: request.allow_data_loss,
},
actor_id,
|catalog| {
if let Some(registry) = registry {
validate_registry_against_catalog(registry, catalog, &label)?;
}
Ok(())
},
)
.await
.map_err(ApiError::from_omni)?

View file

@ -315,6 +315,26 @@ pub fn check(registry: &QueryRegistry, catalog: &Catalog) -> CheckReport {
report
}
/// Format every breakage in a registry check report into a multi-line
/// operator-facing message, naming each offending query.
pub fn format_check_breakages(label: &str, report: &CheckReport) -> String {
let joined = report
.breakages
.iter()
.map(|b| format!("query '{}': {}", b.query, b.message))
.collect::<Vec<_>>()
.join("\n ");
format!(
"graph '{label}': {} stored quer{} failed the schema check:\n {joined}",
report.breakages.len(),
if report.breakages.len() == 1 {
"y"
} else {
"ies"
}
)
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -917,6 +917,34 @@ fn post_endpoints_have_request_body() {
}
}
#[test]
fn invoke_stored_query_request_body_is_optional() {
let doc = openapi_json();
let request_body = &doc["paths"]["/queries/{name}"]["post"]["requestBody"];
assert!(
request_body.is_object(),
"POST /queries/{{name}} should document its optional request body"
);
assert_eq!(
request_body["required"].as_bool().unwrap_or(false),
false,
"stored-query invocation body should be optional"
);
let schema = &request_body["content"]["application/json"]["schema"];
let ref_path = schema["$ref"]
.as_str()
.or_else(|| {
schema["oneOf"]
.as_array()
.and_then(|schemas| schemas.iter().find_map(|schema| schema["$ref"].as_str()))
})
.unwrap();
assert!(
ref_path.contains("InvokeStoredQueryRequest"),
"POST /queries/{{name}} requestBody should reference InvokeStoredQueryRequest, got {ref_path}"
);
}
// ---------------------------------------------------------------------------
// Serialization round-trip test
// ---------------------------------------------------------------------------

View file

@ -8,7 +8,7 @@ use axum::body::{Body, to_bytes};
use axum::http::header::AUTHORIZATION;
use axum::http::{Method, Request, StatusCode};
use lance::index::DatasetIndexExt;
use omnigraph::db::{Omnigraph, ReadTarget, SchemaApplyOptions};
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph::error::OmniError;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph_policy::{PolicyChecker, PolicyEngine};
@ -280,6 +280,28 @@ rules:
branch_scope: any
"#;
const STORED_QUERY_SCHEMA_APPLY_POLICY_YAML: &str = r#"
version: 1
groups:
admins: [act-ragnor]
protected_branches: [main]
rules:
- id: admins-can-invoke
allow:
actors: { group: admins }
actions: [invoke_query]
- id: admins-can-read
allow:
actors: { group: admins }
actions: [read]
branch_scope: any
- id: admins-can-schema-apply
allow:
actors: { group: admins }
actions: [schema_apply]
target_branch_scope: protected
"#;
const FIND_PERSON_GQ: &str =
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }";
@ -293,6 +315,22 @@ fn invoke_request(name: &str, token: &str, body: Value) -> Request<Body> {
.unwrap()
}
fn invoke_request_bytes(
name: &str,
token: &str,
body: impl Into<Body>,
content_type: Option<&str>,
) -> Request<Body> {
let mut builder = Request::builder()
.uri(format!("/queries/{name}"))
.method(Method::POST)
.header("authorization", format!("Bearer {token}"));
if let Some(content_type) = content_type {
builder = builder.header("content-type", content_type);
}
builder.body(body.into()).unwrap()
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_stored_read_returns_rows() {
let (_temp, app) = app_with_stored_queries(
@ -312,6 +350,68 @@ async fn invoke_stored_read_returns_rows() {
assert!(body["rows"].is_array(), "read envelope shape; body: {body}");
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_stored_read_accepts_absent_or_empty_body() {
let no_param_query = "query list_people() { match { $p: Person } return { $p.name } }";
let (_temp, app) = app_with_stored_queries(
&[("list_people", no_param_query, false)],
&[("act-invoke", "t-invoke")],
INVOKE_POLICY_YAML,
)
.await;
let (status, body) = json_response(
&app,
invoke_request_bytes("list_people", "t-invoke", Body::empty(), None),
)
.await;
assert_eq!(status, StatusCode::OK, "body: {body}");
assert_eq!(body["query_name"], "list_people");
let (status, body) = json_response(
&app,
invoke_request_bytes(
"list_people",
"t-invoke",
Body::empty(),
Some("application/json"),
),
)
.await;
assert_eq!(status, StatusCode::OK, "body: {body}");
let (status, body) = json_response(
&app,
invoke_request_bytes(
"list_people",
"t-invoke",
Body::from("{}"),
Some("application/json"),
),
)
.await;
assert_eq!(status, StatusCode::OK, "body: {body}");
let (status, body) = json_response(
&app,
invoke_request_bytes(
"list_people",
"t-invoke",
Body::from("{"),
Some("application/json"),
),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST, "body: {body}");
assert!(
body["error"]
.as_str()
.unwrap_or_default()
.contains("invalid stored-query invocation body"),
"malformed JSON should be rejected as bad request; body: {body}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_stored_mutation_double_gates_on_change() {
let specs: &[(&str, &str, bool)] = &[(
@ -787,6 +887,83 @@ async fn schema_apply_route_updates_graph_for_authorized_admin() {
);
}
#[tokio::test(flavor = "multi_thread")]
async fn schema_apply_route_rejects_stored_query_breakage_before_publish() {
let (temp, app) = app_with_stored_queries(
&[("find_person", FIND_PERSON_GQ, true)],
&[("act-ragnor", "admin-token")],
STORED_QUERY_SCHEMA_APPLY_POLICY_YAML,
)
.await;
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
serde_json::to_vec(&SchemaApplyRequest {
schema_source: renamed_age_schema(),
..Default::default()
})
.unwrap(),
))
.unwrap();
let (status, payload) = json_response(&app, request).await;
assert_eq!(status, StatusCode::BAD_REQUEST, "body: {payload}");
let message = payload["error"].as_str().unwrap_or_default();
assert!(
message.contains("find_person") && message.contains("schema check"),
"registry breakage should name the stored query; body: {payload}"
);
let reopened = Omnigraph::open(graph_path(temp.path()).to_str().unwrap())
.await
.unwrap();
let person = &reopened.catalog().node_types["Person"];
assert!(person.properties.contains_key("age"));
assert!(!person.properties.contains_key("years"));
let (invoke_status, invoke_body) = json_response(
&app,
invoke_request(
"find_person",
"admin-token",
json!({ "params": { "name": "Alice" } }),
),
)
.await;
assert_eq!(invoke_status, StatusCode::OK, "body: {invoke_body}");
assert_eq!(invoke_body["row_count"], 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn schema_apply_route_noop_keeps_valid_stored_query_registry() {
let (_temp, app) = app_with_stored_queries(
&[("find_person", FIND_PERSON_GQ, true)],
&[("act-ragnor", "admin-token")],
STORED_QUERY_SCHEMA_APPLY_POLICY_YAML,
)
.await;
let request = Request::builder()
.method(Method::POST)
.uri("/schema/apply")
.header("content-type", "application/json")
.header("authorization", "Bearer admin-token")
.body(Body::from(
serde_json::to_vec(&SchemaApplyRequest {
schema_source: fs::read_to_string(fixture("test.pg")).unwrap(),
..Default::default()
})
.unwrap(),
))
.unwrap();
let (status, payload) = json_response(&app, request).await;
assert_eq!(status, StatusCode::OK, "body: {payload}");
assert_eq!(payload["applied"], false);
}
#[tokio::test]
async fn schema_apply_route_requires_schema_apply_policy_permission() {
let (_temp, app) = app_for_graph_with_auth_tokens_and_policy(

View file

@ -67,6 +67,12 @@ pub struct SchemaApplyResult {
pub steps: Vec<SchemaMigrationStep>,
}
#[derive(Debug, Clone)]
pub struct SchemaApplyPreview {
pub plan: SchemaMigrationPlan,
pub catalog: Catalog,
}
/// Top-level handle to an Omnigraph database.
///
/// An Omnigraph is a Lance-native graph database with git-style branching.
@ -493,6 +499,14 @@ impl Omnigraph {
schema_apply::plan_schema(self, desired_schema_source, options).await
}
pub async fn preview_schema_apply_with_options(
&self,
desired_schema_source: &str,
options: SchemaApplyOptions,
) -> Result<SchemaApplyPreview> {
schema_apply::preview_schema_apply(self, desired_schema_source, options).await
}
pub async fn apply_schema(&self, desired_schema_source: &str) -> Result<SchemaApplyResult> {
self.apply_schema_as(desired_schema_source, SchemaApplyOptions::default(), None)
.await
@ -523,7 +537,28 @@ impl Omnigraph {
options: SchemaApplyOptions,
actor: Option<&str>,
) -> Result<SchemaApplyResult> {
schema_apply::apply_schema(self, desired_schema_source, options, actor).await
self.apply_schema_as_with_catalog_check(desired_schema_source, options, actor, |_| Ok(()))
.await
}
pub async fn apply_schema_as_with_catalog_check<F>(
&self,
desired_schema_source: &str,
options: SchemaApplyOptions,
actor: Option<&str>,
validate_catalog: F,
) -> Result<SchemaApplyResult>
where
F: FnOnce(&Catalog) -> Result<()>,
{
schema_apply::apply_schema(
self,
desired_schema_source,
options,
actor,
validate_catalog,
)
.await
}
pub(crate) async fn ensure_schema_apply_idle(&self, operation: &str) -> Result<()> {

View file

@ -48,50 +48,17 @@ pub(super) async fn plan_schema(
Ok(plan)
}
pub(super) async fn apply_schema(
db: &Omnigraph,
desired_schema_source: &str,
options: SchemaApplyOptions,
actor: Option<&str>,
) -> Result<SchemaApplyResult> {
// Engine-layer policy gate (MR-722 chassis core).
//
// Fires BEFORE acquiring the schema-apply lock or doing any other
// work. When no PolicyChecker is installed this is a no-op and
// the apply path behaves exactly as it did before MR-722. When
// a PolicyChecker IS installed and the actor is None, this is a
// hard error — see Omnigraph::enforce's docstring for the
// forget-the-actor-footgun reasoning.
//
// Scope is TargetBranch("main") to match the HTTP-layer convention
// for SchemaApply: branch=None, target_branch=Some("main"). Cedar
// policies in the wild use `target_branch_scope: protected` to
// gate schema applies, so the engine-layer call has to set the
// target_branch shape that activates that predicate. Wrong scope
// here = silent policy mismatch with HTTP. See
// `omnigraph_policy::ResourceScope::to_branch_pair` for the mapping.
db.enforce(
omnigraph_policy::PolicyAction::SchemaApply,
&omnigraph_policy::ResourceScope::TargetBranch("main".to_string()),
actor,
)?;
acquire_schema_apply_lock(db).await?;
let result = apply_schema_with_lock(db, desired_schema_source, options).await;
let release_result = release_schema_apply_lock(db).await;
match (result, release_result) {
(Ok(result), Ok(())) => Ok(result),
(Ok(_), Err(err)) => Err(err),
(Err(err), Ok(())) => Err(err),
(Err(err), Err(_)) => Err(err),
}
struct PlannedSchemaApply {
plan: SchemaMigrationPlan,
desired_ir: SchemaIR,
desired_catalog: Catalog,
}
pub(super) async fn apply_schema_with_lock(
async fn plan_schema_for_apply(
db: &Omnigraph,
desired_schema_source: &str,
options: SchemaApplyOptions,
) -> Result<SchemaApplyResult> {
) -> Result<PlannedSchemaApply> {
db.ensure_schema_state_valid().await?;
let branches = db.coordinator.read().await.all_branches().await?;
// Skip `main` and internal system branches. The schema-apply lock branch
@ -123,6 +90,87 @@ pub(super) async fn apply_schema_with_lock(
.unwrap_or_else(|| "unsupported schema migration plan".to_string());
return Err(OmniError::manifest(message));
}
let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
fixup_blob_schemas(&mut desired_catalog);
Ok(PlannedSchemaApply {
plan,
desired_ir,
desired_catalog,
})
}
pub(super) async fn preview_schema_apply(
db: &Omnigraph,
desired_schema_source: &str,
options: SchemaApplyOptions,
) -> Result<SchemaApplyPreview> {
let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
Ok(SchemaApplyPreview {
plan: planned.plan,
catalog: planned.desired_catalog,
})
}
pub(super) async fn apply_schema<F>(
db: &Omnigraph,
desired_schema_source: &str,
options: SchemaApplyOptions,
actor: Option<&str>,
validate_catalog: F,
) -> Result<SchemaApplyResult>
where
F: FnOnce(&Catalog) -> Result<()>,
{
// Engine-layer policy gate (MR-722 chassis core).
//
// Fires BEFORE acquiring the schema-apply lock or doing any other
// work. When no PolicyChecker is installed this is a no-op and
// the apply path behaves exactly as it did before MR-722. When
// a PolicyChecker IS installed and the actor is None, this is a
// hard error — see Omnigraph::enforce's docstring for the
// forget-the-actor-footgun reasoning.
//
// Scope is TargetBranch("main") to match the HTTP-layer convention
// for SchemaApply: branch=None, target_branch=Some("main"). Cedar
// policies in the wild use `target_branch_scope: protected` to
// gate schema applies, so the engine-layer call has to set the
// target_branch shape that activates that predicate. Wrong scope
// here = silent policy mismatch with HTTP. See
// `omnigraph_policy::ResourceScope::to_branch_pair` for the mapping.
db.enforce(
omnigraph_policy::PolicyAction::SchemaApply,
&omnigraph_policy::ResourceScope::TargetBranch("main".to_string()),
actor,
)?;
acquire_schema_apply_lock(db).await?;
let result = apply_schema_with_lock(db, desired_schema_source, options, validate_catalog).await;
let release_result = release_schema_apply_lock(db).await;
match (result, release_result) {
(Ok(result), Ok(())) => Ok(result),
(Ok(_), Err(err)) => Err(err),
(Err(err), Ok(())) => Err(err),
(Err(err), Err(_)) => Err(err),
}
}
pub(super) async fn apply_schema_with_lock<F>(
db: &Omnigraph,
desired_schema_source: &str,
options: SchemaApplyOptions,
validate_catalog: F,
) -> Result<SchemaApplyResult>
where
F: FnOnce(&Catalog) -> Result<()>,
{
let planned = plan_schema_for_apply(db, desired_schema_source, options).await?;
validate_catalog(&planned.desired_catalog)?;
let PlannedSchemaApply {
plan,
desired_ir,
desired_catalog,
} = planned;
if plan.steps.is_empty() {
return Ok(SchemaApplyResult {
supported: true,
@ -132,9 +180,6 @@ pub(super) async fn apply_schema_with_lock(
});
}
let mut desired_catalog = build_catalog_from_ir(&desired_ir)?;
fixup_blob_schemas(&mut desired_catalog);
let snapshot = db.snapshot().await;
let base_manifest_version = snapshot.version();
let mut added_tables = BTreeSet::new();

View file

@ -68,7 +68,7 @@ List the graph's **`mcp.expose`** stored queries as a typed tool catalog — eno
### Stored-query invocation (`POST /queries/{name}`)
Invoke a curated, server-side stored query by **name** — the source comes from the graph's `queries:` registry, so the client never sends `.gq`. Body (all fields optional): `{ "params": { … }, "branch": "main", "snapshot": null }`, where `params` keys match the query's declared parameters. The response is the **read envelope** (`ReadOutput`) for a stored read or the **mutation envelope** (`ChangeOutput`) for a stored mutation — serialized untagged, so the wire shape is identical to `/query` / `/mutate`.
Invoke a curated, server-side stored query by **name** — the source comes from the graph's `queries:` registry, so the client never sends `.gq`. The request body itself is optional; omit it for no-param queries, or send `{ "params": { … }, "branch": "main", "snapshot": null }`, where every field is optional and `params` keys match the query's declared parameters. The response is the **read envelope** (`ReadOutput`) for a stored read or the **mutation envelope** (`ChangeOutput`) for a stored mutation — serialized untagged, so the wire shape is identical to `/query` / `/mutate`.
- **Gate:** `invoke_query` (per-graph, graph-scoped) at the boundary. A stored *mutation* is **double-gated** — it also passes the engine's `change` gate, so an actor with `invoke_query` but not `change` gets `403`.
- **Deny == unknown, for callers without `invoke_query`:** for a caller lacking the grant, an `invoke_query` denial and an unknown query name return the **same `404`** (identical body), so the catalog can't be probed. A caller that *holds* `invoke_query` may still get the inner gate's `403` for an existing query it can't `read`/`change` (the double-gate, above) — so existence is visible to grant-holders by design.

View file

@ -899,11 +899,17 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/InvokeStoredQueryRequest"
"oneOf": [
{
"type": "null"
},
{
"$ref": "#/components/schemas/InvokeStoredQueryRequest"
}
]
}
}
},
"required": true
}
},
"responses": {
"200": {