Add POST /queries/{name} stored-query invocation handler

Invoke a curated server-side stored query by name: source + name come from
the per-graph queries: registry, the client sends only runtime inputs
(params, branch, snapshot). Gated by the invoke_query Cedar action at the
boundary; the handler delegates to the existing run_query/run_mutate, whose
inner Read/Change enforce still runs — so a stored mutation is double-gated
(invoke_query to reach the tool, change for the write).

- InvokeStoredQueryRequest + an untagged InvokeStoredQueryResponse
  { Read(ReadOutput), Change(ChangeOutput) } → one Json<_> return type and a
  oneOf 200 schema (a correct contract, not a wrong-but-simple one).
- Route lives in per_graph_protected → single-mode /queries/{name} and
  multi-mode /graphs/{id}/queries/{name} for free.
- Deny == unknown: an invoke_query denial and a missing query both return the
  same 404, so the catalog can't be probed by an unauthorized caller.
- OpenAPI regenerated; tests cover read, mutation double-gate (403 vs 200),
  bad-param 400, and the identical-404 deny path.

Completes the MR-969 V1 invocation slice (registry + /queries/{name} + invoke_query).
This commit is contained in:
Ragnor Comerford 2026-05-30 22:36:56 +02:00
parent 6983608f4f
commit d67b10fa6e
No known key found for this signature in database
5 changed files with 462 additions and 1 deletions

View file

@ -300,6 +300,36 @@ pub struct ChangeRequest {
pub branch: Option<String>,
}
/// Body for `POST /queries/{name}` — invokes the server-side stored query
/// named in the path. The query source and name come from the registry,
/// never the body; only the runtime inputs are supplied here.
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct InvokeStoredQueryRequest {
/// JSON object whose keys match the stored query's declared parameters.
#[serde(default)]
pub params: Option<Value>,
/// Branch to run against. Defaults to `main`; for a stored mutation the
/// write targets this branch.
#[serde(default)]
pub branch: Option<String>,
/// Snapshot id to read from (read queries only — rejected for a stored
/// mutation). Mutually exclusive with `branch`.
#[serde(default)]
pub snapshot: Option<String>,
}
/// Response for `POST /queries/{name}`: the read envelope for a stored
/// read, or the mutation envelope for a stored mutation. Serialized
/// **untagged**, so the wire shape is exactly [`ReadOutput`] or
/// [`ChangeOutput`] — classification follows the stored query, not a
/// wrapper field.
#[derive(Debug, Serialize, ToSchema)]
#[serde(untagged)]
pub enum InvokeStoredQueryResponse {
Read(ReadOutput),
Change(ChangeOutput),
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)]
pub struct SchemaApplyRequest {
/// Project schema in `.pg` source form. The diff against the current

View file

@ -25,7 +25,8 @@ use api::{
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
HealthOutput, IngestOutput, IngestRequest, QueryRequest, ReadOutput, ReadRequest,
HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest,
InvokeStoredQueryResponse, QueryRequest, ReadOutput, ReadRequest,
SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output,
schema_apply_output, snapshot_payload,
};
@ -97,6 +98,7 @@ fn hash_bearer_token(token: &str) -> BearerTokenHash {
server_export,
#[allow(deprecated)] server_change,
server_mutate,
server_invoke_query,
server_schema_apply,
server_schema_get,
server_ingest,
@ -1090,6 +1092,7 @@ pub fn build_app(state: AppState) -> Router {
server_change
}))
.route("/mutate", post(server_mutate))
.route("/queries/{name}", post(server_invoke_query))
.route("/schema", get(server_schema_get))
.route("/schema/apply", post(server_schema_apply))
.route(
@ -2153,6 +2156,119 @@ async fn server_mutate(
))
}
/// Path parameter for `POST /queries/{name}`.
#[derive(Deserialize)]
struct QueryNamePath {
name: String,
}
#[utoipa::path(
post,
path = "/queries/{name}",
tag = "queries",
operation_id = "invoke_query",
params(("name" = String, Path, description = "Stored query name (the registry key)")),
request_body = InvokeStoredQueryRequest,
responses(
(status = 200, description = "Read envelope (ReadOutput) or mutation envelope (ChangeOutput), serialized untagged", body = InvokeStoredQueryResponse),
(status = 400, description = "Bad request (param type error; snapshot on a stored mutation)", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden (the inner `change` gate for a stored mutation)", body = ErrorOutput),
(status = 404, description = "Unknown stored query, or `invoke_query` denied (indistinguishable)", body = ErrorOutput),
(status = 409, description = "Merge conflict", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
/// Invoke a curated, server-side stored query by name.
///
/// The query source comes from the graph's `queries:` registry, not the
/// request body — callers send only runtime inputs (`params`, `branch`,
/// `snapshot`). Gated by the `invoke_query` Cedar action at the boundary;
/// a stored *mutation* additionally passes the engine's `change` gate
/// (double-gated). A denied actor and an unknown query both return the
/// same 404, so the catalog can't be probed.
async fn server_invoke_query(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Path(QueryNamePath { name }): Path<QueryNamePath>,
Json(req): Json<InvokeStoredQueryRequest>,
) -> std::result::Result<Json<InvokeStoredQueryResponse>, ApiError> {
// Deny is indistinguishable from a missing query: both 404 with this
// exact message, so an unauthorized caller can't probe the catalog.
const NOT_FOUND: &str = "stored query not found";
let actor_ref = actor.as_ref().map(|Extension(actor)| actor);
// Boundary gate (authentication already ran in `require_bearer_auth`).
authorize_request(
actor_ref,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::InvokeQuery,
branch: req.branch.clone().or_else(|| Some("main".to_string())),
target_branch: None,
},
)
.map_err(|_| ApiError::not_found(NOT_FOUND))?;
// Resolve against the per-graph registry (same 404 on a miss).
let stored = handle
.queries
.as_ref()
.and_then(|registry| registry.lookup(&name))
.ok_or_else(|| ApiError::not_found(NOT_FOUND))?;
// Detach what we need before `handle` moves into the runner — the
// registry borrow lives inside `handle`.
let source = Arc::clone(&stored.source);
let query_name = stored.name.clone();
let is_mutation = stored.is_mutation();
info!(
graph = %handle.uri,
actor = ?actor_ref.map(|a| a.actor_id.as_ref()),
query = %query_name,
kind = if is_mutation { "mutate" } else { "read" },
"stored query invoked"
);
if is_mutation {
if req.snapshot.is_some() {
return Err(ApiError::bad_request(
"stored mutation cannot target a snapshot",
));
}
let branch = req.branch.unwrap_or_else(|| "main".to_string());
let output = run_mutate(
state,
handle,
actor_ref,
&source,
Some(&query_name),
req.params.as_ref(),
branch,
)
.await?;
Ok(Json(InvokeStoredQueryResponse::Change(output)))
} else {
let (selected, target, result) = run_query(
handle,
actor_ref,
&source,
Some(&query_name),
req.params.as_ref(),
req.branch,
req.snapshot,
true,
)
.await?;
Ok(Json(InvokeStoredQueryResponse::Read(api::read_output(
selected, &target, result,
))))
}
}
#[utoipa::path(
get,
path = "/schema",

View file

@ -168,6 +168,7 @@ const EXPECTED_PATHS: &[&str] = &[
"/export",
"/change",
"/mutate",
"/queries/{name}",
"/schema",
"/schema/apply",
"/ingest",
@ -701,6 +702,7 @@ fn protected_endpoints_reference_bearer_token_security() {
("/read", "post"),
("/change", "post"),
("/schema/apply", "post"),
("/queries/{name}", "post"),
("/ingest", "post"),
("/export", "post"),
("/snapshot", "get"),

View file

@ -209,6 +209,177 @@ async fn server_refuses_boot_on_type_broken_stored_query() {
);
}
/// Build a single-mode app with a stored-query registry plus a bearer→actor
/// pairing and a policy, so invoke tests exercise the `invoke_query`
/// boundary gate and the inner read/change gates together.
async fn app_with_stored_queries(
specs: &[(&str, &str, bool)],
tokens: &[(&str, &str)],
policy: &str,
) -> (tempfile::TempDir, Router) {
let temp = init_loaded_graph().await;
let graph = graph_path(temp.path());
let policy_path = temp.path().join("policy.yaml");
fs::write(&policy_path, policy).unwrap();
let registry = stored_query_registry(specs);
let state = AppState::open_single_with_queries(
graph.to_string_lossy().to_string(),
tokens
.iter()
.map(|(actor, token)| ((*actor).to_string(), (*token).to_string()))
.collect(),
Some(&policy_path),
registry,
)
.await
.unwrap();
(temp, build_app(state))
}
/// - `act-invoke`: invoke_query + read (stored reads, not mutations)
/// - `act-full`: invoke_query + read + change (stored mutations)
/// - `act-noinvoke`: read only, no invoke_query (boundary-denied)
const INVOKE_POLICY_YAML: &str = r#"
version: 1
groups:
invokers: ["act-invoke"]
full: ["act-full"]
readers: ["act-noinvoke"]
protected_branches: [main]
rules:
- id: invokers-invoke-and-read
allow:
actors: { group: invokers }
actions: [invoke_query, read]
branch_scope: any
- id: full-invoke-read-change
allow:
actors: { group: full }
actions: [invoke_query, read, change]
branch_scope: any
- id: readers-read-only
allow:
actors: { group: readers }
actions: [read]
branch_scope: any
"#;
const FIND_PERSON_GQ: &str =
"query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }";
fn invoke_request(name: &str, token: &str, body: Value) -> Request<Body> {
Request::builder()
.uri(format!("/queries/{name}"))
.method(Method::POST)
.header("content-type", "application/json")
.header("authorization", format!("Bearer {token}"))
.body(Body::from(serde_json::to_vec(&body).unwrap()))
.unwrap()
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_stored_read_returns_rows() {
let (_temp, app) = app_with_stored_queries(
&[("find_person", FIND_PERSON_GQ, false)],
&[("act-invoke", "t-invoke")],
INVOKE_POLICY_YAML,
)
.await;
let (status, body) = json_response(
&app,
invoke_request("find_person", "t-invoke", json!({ "params": { "name": "Alice" } })),
)
.await;
assert_eq!(status, StatusCode::OK, "body: {body}");
assert_eq!(body["query_name"], "find_person");
assert_eq!(body["row_count"], 1, "Alice is in the fixture; body: {body}");
assert!(body["rows"].is_array(), "read envelope shape; body: {body}");
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_stored_mutation_double_gates_on_change() {
let specs: &[(&str, &str, bool)] = &[(
"add_person",
"query add_person($name: String) { insert Person { name: $name } }",
false,
)];
let (_temp, app) = app_with_stored_queries(
specs,
&[("act-invoke", "t-invoke"), ("act-full", "t-full")],
INVOKE_POLICY_YAML,
)
.await;
// Has invoke_query but NOT change → the inner change gate denies (403).
let (status, body) = json_response(
&app,
invoke_request("add_person", "t-invoke", json!({ "params": { "name": "Eve" } })),
)
.await;
assert_eq!(
status,
StatusCode::FORBIDDEN,
"invoke_query without change must 403; body: {body}"
);
// Has invoke_query + change → applied.
let (status, body) = json_response(
&app,
invoke_request("add_person", "t-full", json!({ "params": { "name": "Eve" } })),
)
.await;
assert_eq!(status, StatusCode::OK, "body: {body}");
assert_eq!(body["affected_nodes"], 1, "body: {body}");
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_stored_query_bad_param_is_400() {
let (_temp, app) = app_with_stored_queries(
&[("find_person", FIND_PERSON_GQ, false)],
&[("act-invoke", "t-invoke")],
INVOKE_POLICY_YAML,
)
.await;
// `name` is declared String; pass a number.
let (status, body) = json_response(
&app,
invoke_request("find_person", "t-invoke", json!({ "params": { "name": 123 } })),
)
.await;
assert_eq!(status, StatusCode::BAD_REQUEST, "body: {body}");
assert!(
body["error"].as_str().unwrap_or_default().contains("name"),
"400 should name the offending param; body: {body}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn invoke_unknown_query_and_denied_actor_return_identical_404() {
let (_temp, app) = app_with_stored_queries(
&[("find_person", FIND_PERSON_GQ, false)],
&[("act-invoke", "t-invoke"), ("act-noinvoke", "t-noinvoke")],
INVOKE_POLICY_YAML,
)
.await;
// Authorized actor, unknown query name → 404.
let (unknown_status, unknown_body) =
json_response(&app, invoke_request("does_not_exist", "t-invoke", json!({}))).await;
// Denied actor (no invoke_query), real query name → 404.
let (denied_status, denied_body) = json_response(
&app,
invoke_request("find_person", "t-noinvoke", json!({ "params": { "name": "Alice" } })),
)
.await;
assert_eq!(unknown_status, StatusCode::NOT_FOUND);
assert_eq!(denied_status, StatusCode::NOT_FOUND);
assert_eq!(
unknown_body, denied_body,
"deny must be byte-identical to a missing query (no catalog probing)"
);
}
fn drifted_test_schema() -> String {
fs::read_to_string(fixture("test.pg"))
.unwrap()

View file

@ -829,6 +829,114 @@
]
}
},
"/queries/{name}": {
"post": {
"tags": [
"queries"
],
"summary": "Invoke a curated, server-side stored query by name.",
"description": "The query source comes from the graph's `queries:` registry, not the\nrequest body — callers send only runtime inputs (`params`, `branch`,\n`snapshot`). Gated by the `invoke_query` Cedar action at the boundary;\na stored *mutation* additionally passes the engine's `change` gate\n(double-gated). A denied actor and an unknown query both return the\nsame 404, so the catalog can't be probed.",
"operationId": "invoke_query",
"parameters": [
{
"name": "name",
"in": "path",
"description": "Stored query name (the registry key)",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/InvokeStoredQueryRequest"
}
}
},
"required": true
},
"responses": {
"200": {
"description": "Read envelope (ReadOutput) or mutation envelope (ChangeOutput), serialized untagged",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/InvokeStoredQueryResponse"
}
}
}
},
"400": {
"description": "Bad request (param type error; snapshot on a stored mutation)",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
},
"401": {
"description": "Unauthorized",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
},
"403": {
"description": "Forbidden (the inner `change` gate for a stored mutation)",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
},
"404": {
"description": "Unknown stored query, or `invoke_query` denied (indistinguishable)",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
},
"409": {
"description": "Merge conflict",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
},
"429": {
"description": "Per-actor admission cap exceeded; honor `Retry-After` header",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ErrorOutput"
}
}
}
}
},
"security": [
{
"bearer_token": []
}
]
}
},
"/query": {
"post": {
"tags": [
@ -1628,6 +1736,40 @@
}
}
},
"InvokeStoredQueryRequest": {
"type": "object",
"description": "Body for `POST /queries/{name}` — invokes the server-side stored query\nnamed in the path. The query source and name come from the registry,\nnever the body; only the runtime inputs are supplied here.",
"properties": {
"branch": {
"type": [
"string",
"null"
],
"description": "Branch to run against. Defaults to `main`; for a stored mutation the\nwrite targets this branch."
},
"params": {
"description": "JSON object whose keys match the stored query's declared parameters."
},
"snapshot": {
"type": [
"string",
"null"
],
"description": "Snapshot id to read from (read queries only — rejected for a stored\nmutation). Mutually exclusive with `branch`."
}
}
},
"InvokeStoredQueryResponse": {
"oneOf": [
{
"$ref": "#/components/schemas/ReadOutput"
},
{
"$ref": "#/components/schemas/ChangeOutput"
}
],
"description": "Response for `POST /queries/{name}`: the read envelope for a stored\nread, or the mutation envelope for a stored mutation. Serialized\n**untagged**, so the wire shape is exactly [`ReadOutput`] or\n[`ChangeOutput`] — classification follows the stored query, not a\nwrapper field."
},
"LoadMode": {
"type": "string",
"description": "Shadow enum for documenting [`LoadMode`] in the OpenAPI schema.",