style: apply rustfmt across the workspace

Addresses a review finding: cargo fmt --check was failing on the V0/L1/L2 crates (and some pre-existing engine drift). cargo fmt --all --check is now clean. No behavior change.
This commit is contained in:
Ragnor Comerford 2026-06-03 15:48:47 +02:00
parent b5690d5d8e
commit f454de9906
No known key found for this signature in database
11 changed files with 440 additions and 172 deletions

View file

@ -1,11 +1,11 @@
use omnigraph::db::{GraphCommit, MergeOutcome, ReadTarget, SchemaApplyResult, Snapshot};
use omnigraph::error::{MergeConflict, MergeConflictKind};
use omnigraph::loader::{IngestResult, LoadMode};
use omnigraph_queries::StoredQuery;
use omnigraph_compiler::SchemaMigrationStep;
use omnigraph_compiler::query::ast::Param;
use omnigraph_compiler::result::QueryResult;
use omnigraph_compiler::types::{PropType, ScalarType};
use omnigraph_queries::StoredQuery;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use utoipa::{IntoParams, ToSchema};
@ -266,7 +266,9 @@ pub struct QueryRequest {
/// with `name` when more than one is declared. Mutations
/// (`insert`/`update`/`delete`) get 400 — use `POST /mutate` (or its
/// deprecated alias `POST /change`) instead.
#[schema(example = "query get_person($name: String) {\n match {\n $p: Person { name: $name }\n }\n return { $p.name, $p.age }\n}")]
#[schema(
example = "query get_person($name: String) {\n match {\n $p: Person { name: $name }\n }\n return { $p.name, $p.age }\n}"
)]
pub query: String,
/// Name of the query to run when `query` declares multiple. Optional when
/// only one query is declared.
@ -741,7 +743,11 @@ mod tests {
entry.params.iter().map(|p| (p.name.as_str(), p)).collect();
assert_eq!(by["s"].kind, ParamKind::String);
assert_eq!(by["i"].kind, ParamKind::Int);
assert_eq!(by["big"].kind, ParamKind::BigInt, "I64 → bigint (string on the wire)");
assert_eq!(
by["big"].kind,
ParamKind::BigInt,
"I64 → bigint (string on the wire)"
);
assert_eq!(by["u"].kind, ParamKind::BigInt, "U64 → bigint");
assert_eq!(by["f"].kind, ParamKind::Float);
assert_eq!(by["b"].kind, ParamKind::Bool);
@ -751,7 +757,11 @@ mod tests {
assert!(!by["s"].nullable);
assert!(by["opt"].nullable, "String? → nullable");
assert_eq!(by["list"].kind, ParamKind::List);
assert_eq!(by["list"].item_kind, Some(ParamKind::Int), "[I32] → list of int");
assert_eq!(
by["list"].item_kind,
Some(ParamKind::Int),
"[I32] → list of int"
);
assert_eq!(by["vec"].kind, ParamKind::Vector);
assert_eq!(by["vec"].vector_dim, Some(4));
}

View file

@ -10,13 +10,6 @@ use color_eyre::eyre::{Result, bail};
use omnigraph::db::{Omnigraph, ReadTarget, SnapshotId};
use omnigraph::loader::LoadMode;
use omnigraph::storage::normalize_root_uri;
use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::schema::parser::parse_schema;
use omnigraph_compiler::{
JsonParamMode, ParamMap, QueryLintOutput, QueryLintQueryKind, QueryLintSchemaSource,
QueryLintSeverity, QueryLintStatus, SchemaMigrationPlan, SchemaMigrationStep, build_catalog,
json_params_to_param_map, lint_query_file,
};
use omnigraph_api_types::{
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitListOutput, CommitOutput,
@ -25,13 +18,20 @@ use omnigraph_api_types::{
SnapshotTableOutput, commit_output, ingest_output, read_output, schema_apply_output,
snapshot_payload,
};
use omnigraph_queries::{QueryRegistry, check, format_check_breakages};
use omnigraph_compiler::query::parser::parse_query;
use omnigraph_compiler::schema::parser::parse_schema;
use omnigraph_compiler::{
JsonParamMode, ParamMap, QueryLintOutput, QueryLintQueryKind, QueryLintSchemaSource,
QueryLintSeverity, QueryLintStatus, SchemaMigrationPlan, SchemaMigrationStep, build_catalog,
json_params_to_param_map, lint_query_file,
};
use omnigraph_config::{
AliasCommand, OmnigraphConfig, ReadOutputFormat, graph_resource_id_for_selection, load_config,
};
use omnigraph_policy::{
PolicyAction, PolicyDecision, PolicyEngine, PolicyRequest, PolicyTestConfig,
};
use omnigraph_queries::{QueryRegistry, check, format_check_breakages};
use reqwest::Method;
use reqwest::header::AUTHORIZATION;
use serde::Serialize;
@ -803,13 +803,11 @@ struct ResolvedPolicyContext {
fn resolve_policy_context(config: &OmnigraphConfig) -> Result<ResolvedPolicyContext> {
let selected = config.resolve_policy_tooling_graph_selection()?;
let policy_file = config
.resolve_policy_file_for(selected)
.ok_or_else(|| {
color_eyre::eyre::eyre!(
"policy.file or graphs.<name>.policy.file must be set in omnigraph.yaml"
)
})?;
let policy_file = config.resolve_policy_file_for(selected).ok_or_else(|| {
color_eyre::eyre::eyre!(
"policy.file or graphs.<name>.policy.file must be set in omnigraph.yaml"
)
})?;
let graph_id = match selected {
Some(name) => graph_resource_id_for_selection(Some(name), ""),
None => graph_resource_id_for_selection(None, "default"),
@ -2168,16 +2166,14 @@ fn rewrite_deprecated_argv(args: Vec<OsString>) -> Vec<OsString> {
}
if let Some(sub) = args.get(1).and_then(|s| s.to_str()) {
match sub {
"read" => eprintln!(
"warning: `omnigraph read` is deprecated; use `omnigraph query` instead"
),
"read" => {
eprintln!("warning: `omnigraph read` is deprecated; use `omnigraph query` instead")
}
"change" => eprintln!(
"warning: `omnigraph change` is deprecated; use `omnigraph mutate` instead"
),
"check" => {
eprintln!(
"warning: `omnigraph check` is deprecated; use `omnigraph lint` instead"
);
eprintln!("warning: `omnigraph check` is deprecated; use `omnigraph lint` instead");
// Rewrite the top-level subcommand to `lint`; pass through the rest.
let mut out = Vec::with_capacity(args.len());
out.push(args[0].clone());
@ -3159,8 +3155,8 @@ mod tests {
use super::{
DEFAULT_BEARER_TOKEN_ENV, apply_bearer_token, bearer_token_from_env_file,
legacy_change_request_body, load_cli_config, load_env_file_into_process,
normalize_bearer_token, parse_env_assignment, resolve_policy_context,
resolve_cli_graph, resolve_remote_bearer_token,
normalize_bearer_token, parse_env_assignment, resolve_cli_graph, resolve_policy_context,
resolve_remote_bearer_token,
};
use omnigraph_config::load_config;
use reqwest::header::AUTHORIZATION;
@ -3422,7 +3418,8 @@ graphs:
}
#[test]
fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri() {
fn graph_identity_resolve_policy_context_named_cli_graph_uses_graph_key_not_project_name_or_uri()
{
let temp = tempdir().unwrap();
let config_path = temp.path().join("omnigraph.yaml");
fs::write(

View file

@ -798,8 +798,7 @@ fn deprecated_read_and_change_subcommands_emit_warnings() {
let output = cli().arg("read").output().unwrap();
let stderr = String::from_utf8(output.stderr).unwrap();
assert!(
stderr.contains("`omnigraph read` is deprecated")
&& stderr.contains("`omnigraph query`"),
stderr.contains("`omnigraph read` is deprecated") && stderr.contains("`omnigraph query`"),
"expected `omnigraph read` deprecation warning; got: {stderr}"
);
@ -2394,9 +2393,19 @@ fn queries_validate_exits_zero_on_clean_registry() {
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "find_person", "find_person.gq"),
&queries_test_config(
&graph.path().to_string_lossy(),
"find_person",
"find_person.gq",
),
);
let output = output_success(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let output = output_success(cli().arg("queries").arg("validate").arg("--config").arg(&config));
let stdout = stdout_string(&output);
assert!(stdout.contains("OK"), "stdout:\n{stdout}");
}
@ -2405,12 +2414,21 @@ fn queries_validate_exits_zero_on_clean_registry() {
fn queries_validate_exits_nonzero_on_type_broken_query() {
let graph = SystemGraph::loaded();
// `Widget` is not in the fixture schema.
graph.write_query("ghost.gq", "query ghost() { match { $w: Widget } return { $w.name } }");
graph.write_query(
"ghost.gq",
"query ghost() { match { $w: Widget } return { $w.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "ghost", "ghost.gq"),
);
let output = output_failure(cli().arg("queries").arg("validate").arg("--config").arg(&config));
let output = output_failure(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(
stdout.contains("ghost"),
@ -2444,7 +2462,13 @@ fn queries_list_prints_registered_query() {
graph.path().to_string_lossy().replace('\'', "''")
),
);
let output = output_success(cli().arg("queries").arg("list").arg("--config").arg(&config));
let output = output_success(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("find_person"), "stdout:\n{stdout}");
assert!(
@ -2480,7 +2504,13 @@ fn queries_list_requires_graph_selection_for_per_graph_only_registries() {
),
);
let output = output_failure(cli().arg("queries").arg("list").arg("--config").arg(&config));
let output = output_failure(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("local") && stderr.contains("--target local"),
@ -2505,7 +2535,13 @@ fn queries_list_without_graph_selection_lists_top_level_registry() {
),
);
let output = output_success(cli().arg("queries").arg("list").arg("--config").arg(&config));
let output = output_success(
cli()
.arg("queries")
.arg("list")
.arg("--config")
.arg(&config),
);
let stdout = stdout_string(&output);
assert!(stdout.contains("top_find"), "stdout:\n{stdout}");
}
@ -2524,7 +2560,11 @@ fn queries_list_unknown_target_errors() {
);
let config = graph.write_config(
"omnigraph.yaml",
&queries_test_config(&graph.path().to_string_lossy(), "find_person", "find_person.gq"),
&queries_test_config(
&graph.path().to_string_lossy(),
"find_person",
"find_person.gq",
),
);
let output = output_failure(
cli()
@ -2566,7 +2606,7 @@ fn queries_commands_reject_named_graph_with_populated_top_level_block() {
" file: ./find_person.gq\n",
"cli:\n",
" graph: local\n",
"queries:\n", // populated top-level block: the coherence violation
"queries:\n", // populated top-level block: the coherence violation
" legacy:\n",
" file: ./legacy.gq\n",
"policy: {{}}\n",
@ -2592,8 +2632,14 @@ fn queries_validate_exits_nonzero_on_duplicate_tool_name() {
// collision — `queries validate` must fail (offline, before the engine
// opens) and name both queries plus the contested tool.
let graph = SystemGraph::loaded();
graph.write_query("a.gq", "query a() { match { $p: Person } return { $p.name } }");
graph.write_query("b.gq", "query b() { match { $p: Person } return { $p.name } }");
graph.write_query(
"a.gq",
"query a() { match { $p: Person } return { $p.name } }",
);
graph.write_query(
"b.gq",
"query b() { match { $p: Person } return { $p.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
&format!(
@ -2615,7 +2661,13 @@ fn queries_validate_exits_nonzero_on_duplicate_tool_name() {
graph.path().to_string_lossy().replace('\'', "''")
),
);
let output = output_failure(cli().arg("queries").arg("validate").arg("--config").arg(&config));
let output = output_failure(
cli()
.arg("queries")
.arg("validate")
.arg("--config")
.arg(&config),
);
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(
stderr.contains("dup") && stderr.contains("'a'") && stderr.contains("'b'"),
@ -2635,7 +2687,10 @@ fn queries_validate_positional_uri_ignores_default_graph() {
);
// `Widget` is not in the fixture schema — the default graph's per-graph
// query would break validate if it were (wrongly) selected.
graph.write_query("broken.gq", "query broken() { match { $w: Widget } return { $w.name } }");
graph.write_query(
"broken.gq",
"query broken() { match { $w: Widget } return { $w.name } }",
);
let config = graph.write_config(
"omnigraph.yaml",
concat!(

View file

@ -262,7 +262,10 @@ query insert_person($name: String, $age: I32) {
}))
.send()
.unwrap();
assert_eq!(http_query_mutation.status(), reqwest::StatusCode::BAD_REQUEST);
assert_eq!(
http_query_mutation.status(),
reqwest::StatusCode::BAD_REQUEST
);
// `run publish` / `run list` removed. Direct-to-target writes
// already landed via the change call above; the commit graph is now

View file

@ -450,10 +450,7 @@ impl OmnigraphConfig {
/// The per-graph stored-query registry entries for a named target
/// (multi-graph mode). Returns `None` if the target is unknown.
pub fn target_query_entries(
&self,
target_name: &str,
) -> Option<&BTreeMap<String, QueryEntry>> {
pub fn target_query_entries(&self, target_name: &str) -> Option<&BTreeMap<String, QueryEntry>> {
self.graphs.get(target_name).map(|target| &target.queries)
}
@ -957,11 +954,17 @@ policy: {}
let config = load_config_in(temp.path(), None).unwrap();
// A known graph passes through unchanged.
assert_eq!(config.resolve_graph_selection(Some("local")).unwrap(), Some("local"));
assert_eq!(
config.resolve_graph_selection(Some("local")).unwrap(),
Some("local")
);
// An anonymous selection stays anonymous (→ top-level registry downstream).
assert_eq!(config.resolve_graph_selection(None).unwrap(), None);
// An unknown name errors, naming the graph (matching resolve_target_uri).
let err = config.resolve_graph_selection(Some("ghost")).unwrap_err().to_string();
let err = config
.resolve_graph_selection(Some("ghost"))
.unwrap_err()
.to_string();
assert!(
err.contains("ghost") && err.contains("not found"),
"unknown graph must error naming it: {err}"
@ -1023,7 +1026,10 @@ policy: {}
let temp = tempdir().unwrap();
fs::write(temp.path().join("omnigraph.yaml"), "policy: {}\n").unwrap();
let config = load_config_in(temp.path(), None).unwrap();
assert_eq!(config.resolve_policy_tooling_graph_selection().unwrap(), None);
assert_eq!(
config.resolve_policy_tooling_graph_selection().unwrap(),
None
);
let temp = tempdir().unwrap();
fs::write(
@ -1184,12 +1190,7 @@ queries:
let config = load_config_in(temp.path(), None).unwrap();
// Additive: no `queries:` anywhere → empty registries everywhere.
assert!(config.query_entries().is_empty());
assert!(
config
.target_query_entries("local")
.unwrap()
.is_empty()
);
assert!(config.target_query_entries("local").unwrap().is_empty());
}
#[test]
@ -1294,7 +1295,12 @@ cli:
fn storage_bare_resolves_embedded() {
let config = load_yaml("version: 1\ngraphs:\n local:\n storage: ./demo.omni\n");
match config.resolve_graph(None, Some("local")).unwrap() {
GraphLocator::Embedded { uri, selected, graph_id, .. } => {
GraphLocator::Embedded {
uri,
selected,
graph_id,
..
} => {
assert!(uri.ends_with("demo.omni"), "uri: {uri}");
assert_eq!(selected.as_deref(), Some("local"));
assert_eq!(graph_id, "local");
@ -1310,7 +1316,12 @@ cli:
region: eu-west-1\n endpoint: https://minio.local\n",
);
match config.resolve_graph(None, Some("prod")).unwrap() {
GraphLocator::Embedded { uri, region, endpoint, .. } => {
GraphLocator::Embedded {
uri,
region,
endpoint,
..
} => {
assert_eq!(uri, "s3://b/prod.omni");
assert_eq!(region.as_deref(), Some("eu-west-1"));
assert_eq!(endpoint.as_deref(), Some("https://minio.local"));
@ -1326,7 +1337,12 @@ cli:
graphs:\n reports:\n server: prod\n",
);
match config.resolve_graph(None, Some("reports")).unwrap() {
GraphLocator::Remote { endpoint, server, graph_id, .. } => {
GraphLocator::Remote {
endpoint,
server,
graph_id,
..
} => {
assert_eq!(endpoint, "https://og.internal:8080");
assert_eq!(server, "prod");
assert_eq!(graph_id, "reports", "graph_id defaults to the entry key");
@ -1337,11 +1353,15 @@ cli:
#[test]
fn qualified_server_slash_graph_resolves_without_an_alias() {
let config = load_yaml(
"version: 1\nservers:\n prod:\n endpoint: https://og.internal:8080\n",
);
let config =
load_yaml("version: 1\nservers:\n prod:\n endpoint: https://og.internal:8080\n");
match config.resolve_graph(None, Some("prod/production")).unwrap() {
GraphLocator::Remote { endpoint, server, graph_id, .. } => {
GraphLocator::Remote {
endpoint,
server,
graph_id,
..
} => {
assert_eq!(endpoint, "https://og.internal:8080");
assert_eq!(server, "prod");
assert_eq!(graph_id, "production");
@ -1356,13 +1376,19 @@ cli:
"version: 1\nservers:\n p:\n endpoint: https://h\n\
graphs:\n bad:\n storage: s3://b/x\n server: p\n",
);
assert!(err.contains("exactly one of `storage` or `server`"), "{err}");
assert!(
err.contains("exactly one of `storage` or `server`"),
"{err}"
);
}
#[test]
fn graph_with_neither_storage_nor_server_is_rejected() {
let err = load_yaml_err("version: 1\ngraphs:\n bad:\n branch: main\n");
assert!(err.contains("set `storage:`") && err.contains("bad"), "{err}");
assert!(
err.contains("set `storage:`") && err.contains("bad"),
"{err}"
);
}
#[test]
@ -1386,7 +1412,10 @@ cli:
#[test]
fn unknown_graph_name_errors() {
let config = load_yaml("version: 1\ngraphs:\n local:\n storage: ./demo.omni\n");
let err = config.resolve_graph(None, Some("ghost")).unwrap_err().to_string();
let err = config
.resolve_graph(None, Some("ghost"))
.unwrap_err()
.to_string();
assert!(err.contains("ghost") && err.contains("not found"), "{err}");
}
@ -1396,7 +1425,10 @@ cli:
let err = load_yaml_err(
"version: 1\ngraphs:\n g:\n storage:\n uri: s3://b/x\n profile: default\n",
);
assert!(err.to_lowercase().contains("profile") || err.contains("unknown field"), "{err}");
assert!(
err.to_lowercase().contains("profile") || err.contains("unknown field"),
"{err}"
);
}
#[test]
@ -1404,7 +1436,9 @@ cli:
// No `version:` (legacy); a remote uri with the /graphs/{gid} hack splits.
let config = load_yaml("graphs:\n prod:\n uri: https://host:8080/graphs/production\n");
match config.resolve_graph(None, Some("prod")).unwrap() {
GraphLocator::Remote { endpoint, graph_id, .. } => {
GraphLocator::Remote {
endpoint, graph_id, ..
} => {
assert_eq!(endpoint, "https://host:8080");
assert_eq!(graph_id, "production");
}

View file

@ -108,30 +108,28 @@ impl QueryRegistry {
for spec in specs {
match parse_query(&spec.source) {
Ok(file) => {
match file.queries.into_iter().find(|q| q.name == spec.name) {
Some(decl) => {
by_name.insert(
spec.name.clone(),
StoredQuery {
name: spec.name,
source: Arc::from(spec.source),
decl,
expose: spec.expose,
tool_name: spec.tool_name,
},
);
}
None => errors.push(LoadError {
query: Some(spec.name.clone()),
message: format!(
"no `query {}` declaration found in its `.gq` file \
(the registry key must match the query symbol)",
spec.name
),
}),
Ok(file) => match file.queries.into_iter().find(|q| q.name == spec.name) {
Some(decl) => {
by_name.insert(
spec.name.clone(),
StoredQuery {
name: spec.name,
source: Arc::from(spec.source),
decl,
expose: spec.expose,
tool_name: spec.tool_name,
},
);
}
}
None => errors.push(LoadError {
query: Some(spec.name.clone()),
message: format!(
"no `query {}` declaration found in its `.gq` file \
(the registry key must match the query symbol)",
spec.name
),
}),
},
Err(err) => errors.push(LoadError {
query: Some(spec.name),
message: format!("parse error: {err}"),
@ -409,7 +407,10 @@ mod tests {
let q = reg.lookup("b").unwrap();
assert_eq!(q.name, "b");
assert_eq!(q.decl.params[0].name, "y");
assert!(reg.lookup("a").is_none(), "only the selected symbol is registered");
assert!(
reg.lookup("a").is_none(),
"only the selected symbol is registered"
);
}
#[test]
@ -418,8 +419,18 @@ mod tests {
// the catalog key space — refused at load, naming both queries and
// the contested tool.
let errors = QueryRegistry::from_specs(vec![
spec_tool("a", "query a() { match { $u: User } return { $u.name } }", true, "dup"),
spec_tool("b", "query b() { match { $u: User } return { $u.name } }", true, "dup"),
spec_tool(
"a",
"query a() { match { $u: User } return { $u.name } }",
true,
"dup",
),
spec_tool(
"b",
"query b() { match { $u: User } return { $u.name } }",
true,
"dup",
),
])
.unwrap_err();
assert_eq!(errors.len(), 1);
@ -434,8 +445,18 @@ mod tests {
// Unexposed queries have no MCP tool, so a shared effective tool
// name is inert — must not error (pins the exposed-only scope).
let reg = QueryRegistry::from_specs(vec![
spec_tool("a", "query a() { match { $u: User } return { $u.name } }", false, "dup"),
spec_tool("b", "query b() { match { $u: User } return { $u.name } }", false, "dup"),
spec_tool(
"a",
"query a() { match { $u: User } return { $u.name } }",
false,
"dup",
),
spec_tool(
"b",
"query b() { match { $u: User } return { $u.name } }",
false,
"dup",
),
])
.unwrap();
assert_eq!(reg.len(), 2);
@ -453,8 +474,16 @@ mod tests {
#[test]
fn errors_collect_rather_than_fail_fast() {
let errors = QueryRegistry::from_specs(vec![
spec("good", "query good() { match { $u: User } return { $u.name } }", false),
spec("mismatch", "query other() { match { $u: User } return { $u.name } }", false),
spec(
"good",
"query good() { match { $u: User } return { $u.name } }",
false,
),
spec(
"mismatch",
"query other() { match { $u: User } return { $u.name } }",
false,
),
spec("broken", "query broken(", false),
])
.unwrap_err();
@ -536,8 +565,16 @@ embedding: Vector(4)
#[test]
fn check_collects_every_breakage_not_fail_fast() {
let reg = QueryRegistry::from_specs(vec![
spec("a", "query a() { match { $w: Widget } return { $w.x } }", false),
spec("b", "query b() { match { $g: Gadget } return { $g.y } }", false),
spec(
"a",
"query a() { match { $w: Widget } return { $w.x } }",
false,
),
spec(
"b",
"query b() { match { $g: Gadget } return { $g.y } }",
false,
),
spec(
"ok",
"query ok() { match { $u: User } return { $u.name } }",
@ -546,7 +583,12 @@ embedding: Vector(4)
])
.unwrap();
let report = check(&reg, &test_catalog());
assert_eq!(report.breakages.len(), 2, "both bad queries reported: {:?}", report);
assert_eq!(
report.breakages.len(),
2,
"both bad queries reported: {:?}",
report
);
}
#[test]
@ -590,7 +632,11 @@ embedding: Vector(4)
)])
.unwrap();
let report = check(&reg, &test_catalog());
assert!(report.is_clean(), "no breakage or warning expected: {:?}", report);
assert!(
report.is_clean(),
"no breakage or warning expected: {:?}",
report
);
}
// --- load() error collection (file I/O + parse in one pass) ---
@ -615,14 +661,24 @@ embedding: Vector(4)
let config = load_config(Some(&temp.path().join("omnigraph.yaml"))).unwrap();
let errors = QueryRegistry::load(&config, config.query_entries()).unwrap_err();
let joined = errors.iter().map(|e| e.to_string()).collect::<Vec<_>>().join("\n");
let joined = errors
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join("\n");
// Both the missing file AND the parse error surface in one pass —
// the I/O failure must not mask the parse failure.
assert!(joined.contains("missing"), "I/O error must surface: {joined}");
assert!(
joined.contains("missing"),
"I/O error must surface: {joined}"
);
assert!(
joined.contains("broken") && joined.contains("parse error"),
"the parse error in a readable file must surface in the same pass: {joined}"
);
assert!(!joined.contains("'good'"), "the valid entry is not an error: {joined}");
assert!(
!joined.contains("'good'"),
"the valid entry is not an error: {joined}"
);
}
}

View file

@ -25,10 +25,10 @@ use api::{
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest,
InvokeStoredQueryResponse, QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest,
SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output,
schema_apply_output, snapshot_payload,
HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest, InvokeStoredQueryResponse,
QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest, SchemaApplyOutput,
SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output, schema_apply_output,
snapshot_payload,
};
pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
use axum::body::{Body, Bytes};
@ -304,7 +304,14 @@ impl AppState {
) -> Self {
let bearer_tokens = hash_bearer_tokens(bearer_tokens);
let per_graph_policy = policy_engine.map(Arc::new);
Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload), None)
Self::build_single_mode(
uri,
db,
bearer_tokens,
per_graph_policy,
Arc::new(workload),
None,
)
}
/// Like `new_single`, but attaches a pre-validated stored-query
@ -421,13 +428,8 @@ impl AppState {
bearer_tokens: Vec<(String, String)>,
policy_file: Option<&PathBuf>,
) -> Result<Self> {
Self::open_single_with_queries(
uri,
bearer_tokens,
policy_file,
QueryRegistry::default(),
)
.await
Self::open_single_with_queries(uri, bearer_tokens, policy_file, QueryRegistry::default())
.await
}
/// Single-mode boot with a stored-query registry: open the engine,
@ -993,8 +995,11 @@ pub fn load_server_settings(
// `query_entries_for` so server and CLI resolve identically.
// Load + identity-check now; the schema type-check happens
// when this graph's engine opens.
let queries = QueryRegistry::load(&config, config.query_entries_for(Some(name.as_str())))
.map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(name, &errs)))?;
let queries =
QueryRegistry::load(&config, config.query_entries_for(Some(name.as_str())))
.map_err(|errs| {
color_eyre::eyre::eyre!(format_registry_load_errors(name, &errs))
})?;
graphs.push(GraphStartupConfig {
graph_id: name.clone(),
uri,
@ -1112,15 +1117,21 @@ pub fn build_app(state: AppState) -> Router {
// flagged and their responses include RFC 9745 Deprecation +
// RFC 8288 Link headers. Suppress the call-site warning for the
// route registration itself.
.route("/read", post({
#[allow(deprecated)]
server_read
}))
.route(
"/read",
post({
#[allow(deprecated)]
server_read
}),
)
.route("/query", post(server_query))
.route("/change", post({
#[allow(deprecated)]
server_change
}))
.route(
"/change",
post({
#[allow(deprecated)]
server_change
}),
)
.route("/mutate", post(server_mutate))
.route("/queries", get(server_list_queries))
.route("/queries/{name}", post(server_invoke_query))
@ -1870,7 +1881,9 @@ fn deprecation_headers(successor_link: &'static str) -> [(HeaderName, HeaderValu
),
security(("bearer_token" = [])),
)]
#[deprecated(note = "use POST /query instead; /read is kept indefinitely for byte-stable back-compat")]
#[deprecated(
note = "use POST /query instead; /read is kept indefinitely for byte-stable back-compat"
)]
/// **Deprecated** — use [`POST /query`](#tag/queries/operation/query) instead.
///
/// Execute a GQ read query. Behavior is unchanged from prior releases; the
@ -2041,10 +2054,8 @@ async fn run_mutate(
// estimated bytes per actor. Cedar runs FIRST so denied requests
// don't consume admission slots. Estimate uses the request body
// size as a coarse proxy; engine memory pressure can run higher.
let est_bytes = query.len() as u64
+ params_json
.map(|p| p.to_string().len() as u64)
.unwrap_or(0);
let est_bytes =
query.len() as u64 + params_json.map(|p| p.to_string().len() as u64).unwrap_or(0);
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
@ -2118,8 +2129,8 @@ async fn run_query(
target_branch: None,
},
)?;
let query_decl =
select_named_query_decl(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
let query_decl = select_named_query_decl(query, name)
.map_err(|err| ApiError::bad_request(err.to_string()))?;
if reject_mutations && !query_decl.mutations.is_empty() {
return Err(ApiError::bad_request(format!(
"query '{}' contains mutations (insert/update/delete); use POST /mutate for write queries",
@ -3092,11 +3103,18 @@ mod tests {
/// as 404 without also masking a 401/500. Pins each outcome.
#[test]
fn authorize_splits_decision_from_operational_error() {
use super::{Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor, authorize};
use super::{
Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor,
authorize,
};
use std::sync::Arc;
fn req(action: PolicyAction) -> PolicyRequest {
PolicyRequest { action, branch: None, target_branch: None }
PolicyRequest {
action,
branch: None,
target_branch: None,
}
}
let actor = ResolvedActor::cluster_static(Arc::from("act-alice"));
@ -3136,7 +3154,11 @@ mod tests {
authorize(
Some(&actor),
Some(&engine),
PolicyRequest { action: PolicyAction::Read, branch: Some("main".to_string()), target_branch: None },
PolicyRequest {
action: PolicyAction::Read,
branch: Some("main".to_string()),
target_branch: None
},
)
.unwrap(),
Authz::Allowed
@ -3145,11 +3167,17 @@ mod tests {
match authorize(
Some(&actor),
Some(&engine),
PolicyRequest { action: PolicyAction::Change, branch: Some("main".to_string()), target_branch: None },
PolicyRequest {
action: PolicyAction::Change,
branch: Some("main".to_string()),
target_branch: None,
},
)
.unwrap()
{
Authz::Denied(message) => assert!(!message.is_empty(), "a deny carries its decision message"),
Authz::Denied(message) => {
assert!(!message.is_empty(), "a deny carries its decision message")
}
Authz::Allowed => panic!("change must be denied: only read is allowed"),
}
// Policy installed but no actor → operational failure (`Err`), NOT a
@ -3188,8 +3216,7 @@ mod tests {
};
// Empty registry → nothing attached, no error.
let empty =
super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
let empty = super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
assert!(empty.is_none());
// A query that type-checks → attached.
@ -3198,7 +3225,11 @@ mod tests {
"query find_user() { match { $u: User } return { $u.name } }",
)])
.unwrap();
assert!(super::validate_and_attach(ok, &catalog, "g").unwrap().is_some());
assert!(
super::validate_and_attach(ok, &catalog, "g")
.unwrap()
.is_some()
);
// A query referencing a type the schema lacks → boot refusal that
// names both the graph label and the offending query.
@ -3211,7 +3242,10 @@ mod tests {
let msg = err.to_string();
assert!(msg.contains("graph-x"), "labels the graph: {msg}");
assert!(msg.contains("ghost"), "names the query: {msg}");
assert!(msg.contains("schema check"), "mentions the schema check: {msg}");
assert!(
msg.contains("schema check"),
"mentions the schema check: {msg}"
);
}
#[test]

View file

@ -175,7 +175,11 @@ async fn server_boots_with_a_valid_stored_query_registry() {
registry,
)
.await;
assert!(state.is_ok(), "valid registry should boot: {:?}", state.err());
assert!(
state.is_ok(),
"valid registry should boot: {:?}",
state.err()
);
}
#[tokio::test]
@ -202,7 +206,10 @@ async fn server_refuses_boot_on_type_broken_stored_query() {
Err(err) => err,
};
let msg = err.to_string();
assert!(msg.contains("ghost"), "error should name the broken query: {msg}");
assert!(
msg.contains("ghost"),
"error should name the broken query: {msg}"
);
assert!(
msg.contains("schema check"),
"error should mention the schema check: {msg}"
@ -341,12 +348,19 @@ async fn invoke_stored_read_returns_rows() {
.await;
let (status, body) = json_response(
&app,
invoke_request("find_person", "t-invoke", json!({ "params": { "name": "Alice" } })),
invoke_request(
"find_person",
"t-invoke",
json!({ "params": { "name": "Alice" } }),
),
)
.await;
assert_eq!(status, StatusCode::OK, "body: {body}");
assert_eq!(body["query_name"], "find_person");
assert_eq!(body["row_count"], 1, "Alice is in the fixture; body: {body}");
assert_eq!(
body["row_count"], 1,
"Alice is in the fixture; body: {body}"
);
assert!(body["rows"].is_array(), "read envelope shape; body: {body}");
}
@ -429,7 +443,11 @@ async fn invoke_stored_mutation_double_gates_on_change() {
// Has invoke_query but NOT change → the inner change gate denies (403).
let (status, body) = json_response(
&app,
invoke_request("add_person", "t-invoke", json!({ "params": { "name": "Eve" } })),
invoke_request(
"add_person",
"t-invoke",
json!({ "params": { "name": "Eve" } }),
),
)
.await;
assert_eq!(
@ -441,7 +459,11 @@ async fn invoke_stored_mutation_double_gates_on_change() {
// Has invoke_query + change → applied.
let (status, body) = json_response(
&app,
invoke_request("add_person", "t-full", json!({ "params": { "name": "Eve" } })),
invoke_request(
"add_person",
"t-full",
json!({ "params": { "name": "Eve" } }),
),
)
.await;
assert_eq!(status, StatusCode::OK, "body: {body}");
@ -459,7 +481,11 @@ async fn invoke_stored_query_bad_param_is_400() {
// `name` is declared String; pass a number.
let (status, body) = json_response(
&app,
invoke_request("find_person", "t-invoke", json!({ "params": { "name": 123 } })),
invoke_request(
"find_person",
"t-invoke",
json!({ "params": { "name": 123 } }),
),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST, "body: {body}");
@ -479,12 +505,19 @@ async fn invoke_unknown_query_and_denied_actor_return_identical_404() {
.await;
// Authorized actor, unknown query name → 404.
let (unknown_status, unknown_body) =
json_response(&app, invoke_request("does_not_exist", "t-invoke", json!({}))).await;
let (unknown_status, unknown_body) = json_response(
&app,
invoke_request("does_not_exist", "t-invoke", json!({})),
)
.await;
// Denied actor (no invoke_query), real query name → 404.
let (denied_status, denied_body) = json_response(
&app,
invoke_request("find_person", "t-noinvoke", json!({ "params": { "name": "Alice" } })),
invoke_request(
"find_person",
"t-noinvoke",
json!({ "params": { "name": "Alice" } }),
),
)
.await;
@ -511,17 +544,28 @@ async fn invoke_query_holder_without_read_sees_403_not_404() {
.await;
let (exists_status, _) = json_response(
&app,
invoke_request("find_person", "t-invokeonly", json!({ "params": { "name": "Alice" } })),
invoke_request(
"find_person",
"t-invokeonly",
json!({ "params": { "name": "Alice" } }),
),
)
.await;
let (absent_status, _) = json_response(
&app,
invoke_request("does_not_exist", "t-invokeonly", json!({})),
)
.await;
let (absent_status, _) =
json_response(&app, invoke_request("does_not_exist", "t-invokeonly", json!({}))).await;
assert_eq!(
exists_status,
StatusCode::FORBIDDEN,
"an existing read query the holder can't read → inner-gate 403"
);
assert_eq!(absent_status, StatusCode::NOT_FOUND, "unknown query still 404s");
assert_eq!(
absent_status,
StatusCode::NOT_FOUND,
"unknown query still 404s"
);
}
fn get_request(uri: &str, token: &str) -> Request<Body> {
@ -543,7 +587,11 @@ async fn list_queries_returns_only_exposed_with_typed_params() {
"query add_person($name: String) { insert Person { name: $name } }",
true,
),
("hidden", "query hidden() { match { $p: Person } return { $p.name } }", false),
(
"hidden",
"query hidden() { match { $p: Person } return { $p.name } }",
false,
),
],
&[("act-invoke", "t-invoke")],
INVOKE_POLICY_YAML,
@ -553,12 +601,18 @@ async fn list_queries_returns_only_exposed_with_typed_params() {
assert_eq!(status, StatusCode::OK, "body: {body}");
let entries = body["queries"].as_array().unwrap();
let names: Vec<&str> = entries.iter().map(|q| q["name"].as_str().unwrap()).collect();
let names: Vec<&str> = entries
.iter()
.map(|q| q["name"].as_str().unwrap())
.collect();
assert!(
names.contains(&"find_person") && names.contains(&"add_person"),
"exposed queries listed: {names:?}"
);
assert!(!names.contains(&"hidden"), "non-exposed query hidden from the catalog: {names:?}");
assert!(
!names.contains(&"hidden"),
"non-exposed query hidden from the catalog: {names:?}"
);
let fp = entries.iter().find(|q| q["name"] == "find_person").unwrap();
assert_eq!(fp["mutation"], false);
@ -5728,9 +5782,14 @@ graphs:
"policy:\n file: ./top.yaml\ngraphs:\n prod:\n uri: /tmp/prod.omni\n",
)
.unwrap();
let err =
load_server_settings(Some(&config_path), None, Some("prod".to_string()), None, true)
.unwrap_err();
let err = load_server_settings(
Some(&config_path),
None,
Some("prod".to_string()),
None,
true,
)
.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("prod") && msg.contains("policy.file") && msg.contains("graphs.prod"),
@ -5755,9 +5814,14 @@ graphs:
queries:\n pq:\n file: ./prod.gq\n",
)
.unwrap();
let settings =
load_server_settings(Some(&config_path), None, Some("prod".to_string()), None, true)
.unwrap();
let settings = load_server_settings(
Some(&config_path),
None,
Some("prod".to_string()),
None,
true,
)
.unwrap();
match settings.mode {
ServerConfigMode::Single {
graph_id,

View file

@ -1113,7 +1113,11 @@ impl Omnigraph {
let dataset_uri = self.table_store.dataset_uri(&table_path);
let outcome = match crate::failpoints::maybe_fail("branch_delete.before_table_cleanup")
{
Ok(()) => self.table_store.force_delete_branch(&dataset_uri, branch).await,
Ok(()) => {
self.table_store
.force_delete_branch(&dataset_uri, branch)
.await
}
Err(injected) => Err(injected),
};
if let Err(err) = outcome {

View file

@ -411,7 +411,11 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
};
for branch in orphan_branches(listed, &keep) {
let outcome = match crate::failpoints::maybe_fail("cleanup.reconcile_fork") {
Ok(()) => db.table_store.force_delete_branch(&full_path, &branch).await,
Ok(()) => {
db.table_store
.force_delete_branch(&full_path, &branch)
.await
}
Err(injected) => Err(injected),
};
match outcome {
@ -438,7 +442,9 @@ pub async fn reconcile_orphaned_branches(db: &Omnigraph) -> Result<BranchReconci
error = %err,
"commit-graph orphan reconcile failed; will retry next cleanup",
);
stats.failures.push(("_graph_commits".to_string(), err.to_string()));
stats
.failures
.push(("_graph_commits".to_string(), err.to_string()));
}
Ok(stats)
@ -466,7 +472,9 @@ async fn reconcile_commit_graph_orphans(
error = %err,
"reclaiming orphaned commit-graph branch failed; will retry next cleanup",
);
stats.failures.push(("_graph_commits".to_string(), err.to_string()));
stats
.failures
.push(("_graph_commits".to_string(), err.to_string()));
}
}
}

View file

@ -329,7 +329,10 @@ async fn compact_files_still_fails_on_blob_columns() {
]));
RecordBatch::try_new(
schema,
vec![Arc::new(StringArray::from(ids)) as _, Arc::new(content) as _],
vec![
Arc::new(StringArray::from(ids)) as _,
Arc::new(content) as _,
],
)
.unwrap()
}