From d67b10fa6e1af409f32fb5f59c9733a981c5251c Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Sat, 30 May 2026 22:36:56 +0200 Subject: [PATCH] Add POST /queries/{name} stored-query invocation handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Invoke a curated server-side stored query by name: source + name come from the per-graph queries: registry, the client sends only runtime inputs (params, branch, snapshot). Gated by the invoke_query Cedar action at the boundary; the handler delegates to the existing run_query/run_mutate, whose inner Read/Change enforce still runs — so a stored mutation is double-gated (invoke_query to reach the tool, change for the write). - InvokeStoredQueryRequest + an untagged InvokeStoredQueryResponse { Read(ReadOutput), Change(ChangeOutput) } → one Json<_> return type and a oneOf 200 schema (a correct contract, not a wrong-but-simple one). - Route lives in per_graph_protected → single-mode /queries/{name} and multi-mode /graphs/{id}/queries/{name} for free. - Deny == unknown: an invoke_query denial and a missing query both return the same 404, so the catalog can't be probed by an unauthorized caller. - OpenAPI regenerated; tests cover read, mutation double-gate (403 vs 200), bad-param 400, and the identical-404 deny path. Completes the MR-969 V1 invocation slice (registry + /queries/{name} + invoke_query). --- crates/omnigraph-server/src/api.rs | 30 ++++ crates/omnigraph-server/src/lib.rs | 118 +++++++++++++++- crates/omnigraph-server/tests/openapi.rs | 2 + crates/omnigraph-server/tests/server.rs | 171 +++++++++++++++++++++++ openapi.json | 142 +++++++++++++++++++ 5 files changed, 462 insertions(+), 1 deletion(-) diff --git a/crates/omnigraph-server/src/api.rs b/crates/omnigraph-server/src/api.rs index 2c818ae..0d769e5 100644 --- a/crates/omnigraph-server/src/api.rs +++ b/crates/omnigraph-server/src/api.rs @@ -300,6 +300,36 @@ pub struct ChangeRequest { pub branch: Option, } +/// Body for `POST /queries/{name}` — invokes the server-side stored query +/// named in the path. The query source and name come from the registry, +/// never the body; only the runtime inputs are supplied here. +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct InvokeStoredQueryRequest { + /// JSON object whose keys match the stored query's declared parameters. + #[serde(default)] + pub params: Option, + /// Branch to run against. Defaults to `main`; for a stored mutation the + /// write targets this branch. + #[serde(default)] + pub branch: Option, + /// Snapshot id to read from (read queries only — rejected for a stored + /// mutation). Mutually exclusive with `branch`. + #[serde(default)] + pub snapshot: Option, +} + +/// Response for `POST /queries/{name}`: the read envelope for a stored +/// read, or the mutation envelope for a stored mutation. Serialized +/// **untagged**, so the wire shape is exactly [`ReadOutput`] or +/// [`ChangeOutput`] — classification follows the stored query, not a +/// wrapper field. +#[derive(Debug, Serialize, ToSchema)] +#[serde(untagged)] +pub enum InvokeStoredQueryResponse { + Read(ReadOutput), + Change(ChangeOutput), +} + #[derive(Debug, Clone, Default, Serialize, Deserialize, ToSchema)] pub struct SchemaApplyRequest { /// Project schema in `.pg` source form. The diff against the current diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 313be04..10d2b2d 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -25,7 +25,8 @@ use api::{ BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput, BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput, CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse, - HealthOutput, IngestOutput, IngestRequest, QueryRequest, ReadOutput, ReadRequest, + HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest, + InvokeStoredQueryResponse, QueryRequest, ReadOutput, ReadRequest, SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output, schema_apply_output, snapshot_payload, }; @@ -97,6 +98,7 @@ fn hash_bearer_token(token: &str) -> BearerTokenHash { server_export, #[allow(deprecated)] server_change, server_mutate, + server_invoke_query, server_schema_apply, server_schema_get, server_ingest, @@ -1090,6 +1092,7 @@ pub fn build_app(state: AppState) -> Router { server_change })) .route("/mutate", post(server_mutate)) + .route("/queries/{name}", post(server_invoke_query)) .route("/schema", get(server_schema_get)) .route("/schema/apply", post(server_schema_apply)) .route( @@ -2153,6 +2156,119 @@ async fn server_mutate( )) } +/// Path parameter for `POST /queries/{name}`. +#[derive(Deserialize)] +struct QueryNamePath { + name: String, +} + +#[utoipa::path( + post, + path = "/queries/{name}", + tag = "queries", + operation_id = "invoke_query", + params(("name" = String, Path, description = "Stored query name (the registry key)")), + request_body = InvokeStoredQueryRequest, + responses( + (status = 200, description = "Read envelope (ReadOutput) or mutation envelope (ChangeOutput), serialized untagged", body = InvokeStoredQueryResponse), + (status = 400, description = "Bad request (param type error; snapshot on a stored mutation)", body = ErrorOutput), + (status = 401, description = "Unauthorized", body = ErrorOutput), + (status = 403, description = "Forbidden (the inner `change` gate for a stored mutation)", body = ErrorOutput), + (status = 404, description = "Unknown stored query, or `invoke_query` denied (indistinguishable)", body = ErrorOutput), + (status = 409, description = "Merge conflict", body = ErrorOutput), + (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput), + ), + security(("bearer_token" = [])), +)] +/// Invoke a curated, server-side stored query by name. +/// +/// The query source comes from the graph's `queries:` registry, not the +/// request body — callers send only runtime inputs (`params`, `branch`, +/// `snapshot`). Gated by the `invoke_query` Cedar action at the boundary; +/// a stored *mutation* additionally passes the engine's `change` gate +/// (double-gated). A denied actor and an unknown query both return the +/// same 404, so the catalog can't be probed. +async fn server_invoke_query( + State(state): State, + Extension(handle): Extension>, + actor: Option>, + Path(QueryNamePath { name }): Path, + Json(req): Json, +) -> std::result::Result, ApiError> { + // Deny is indistinguishable from a missing query: both 404 with this + // exact message, so an unauthorized caller can't probe the catalog. + const NOT_FOUND: &str = "stored query not found"; + let actor_ref = actor.as_ref().map(|Extension(actor)| actor); + + // Boundary gate (authentication already ran in `require_bearer_auth`). + authorize_request( + actor_ref, + handle.policy.as_deref(), + PolicyRequest { + action: PolicyAction::InvokeQuery, + branch: req.branch.clone().or_else(|| Some("main".to_string())), + target_branch: None, + }, + ) + .map_err(|_| ApiError::not_found(NOT_FOUND))?; + + // Resolve against the per-graph registry (same 404 on a miss). + let stored = handle + .queries + .as_ref() + .and_then(|registry| registry.lookup(&name)) + .ok_or_else(|| ApiError::not_found(NOT_FOUND))?; + + // Detach what we need before `handle` moves into the runner — the + // registry borrow lives inside `handle`. + let source = Arc::clone(&stored.source); + let query_name = stored.name.clone(); + let is_mutation = stored.is_mutation(); + + info!( + graph = %handle.uri, + actor = ?actor_ref.map(|a| a.actor_id.as_ref()), + query = %query_name, + kind = if is_mutation { "mutate" } else { "read" }, + "stored query invoked" + ); + + if is_mutation { + if req.snapshot.is_some() { + return Err(ApiError::bad_request( + "stored mutation cannot target a snapshot", + )); + } + let branch = req.branch.unwrap_or_else(|| "main".to_string()); + let output = run_mutate( + state, + handle, + actor_ref, + &source, + Some(&query_name), + req.params.as_ref(), + branch, + ) + .await?; + Ok(Json(InvokeStoredQueryResponse::Change(output))) + } else { + let (selected, target, result) = run_query( + handle, + actor_ref, + &source, + Some(&query_name), + req.params.as_ref(), + req.branch, + req.snapshot, + true, + ) + .await?; + Ok(Json(InvokeStoredQueryResponse::Read(api::read_output( + selected, &target, result, + )))) + } +} + #[utoipa::path( get, path = "/schema", diff --git a/crates/omnigraph-server/tests/openapi.rs b/crates/omnigraph-server/tests/openapi.rs index 0f447dd..15497d3 100644 --- a/crates/omnigraph-server/tests/openapi.rs +++ b/crates/omnigraph-server/tests/openapi.rs @@ -168,6 +168,7 @@ const EXPECTED_PATHS: &[&str] = &[ "/export", "/change", "/mutate", + "/queries/{name}", "/schema", "/schema/apply", "/ingest", @@ -701,6 +702,7 @@ fn protected_endpoints_reference_bearer_token_security() { ("/read", "post"), ("/change", "post"), ("/schema/apply", "post"), + ("/queries/{name}", "post"), ("/ingest", "post"), ("/export", "post"), ("/snapshot", "get"), diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 852fa5f..ca5fa58 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -209,6 +209,177 @@ async fn server_refuses_boot_on_type_broken_stored_query() { ); } +/// Build a single-mode app with a stored-query registry plus a bearer→actor +/// pairing and a policy, so invoke tests exercise the `invoke_query` +/// boundary gate and the inner read/change gates together. +async fn app_with_stored_queries( + specs: &[(&str, &str, bool)], + tokens: &[(&str, &str)], + policy: &str, +) -> (tempfile::TempDir, Router) { + let temp = init_loaded_graph().await; + let graph = graph_path(temp.path()); + let policy_path = temp.path().join("policy.yaml"); + fs::write(&policy_path, policy).unwrap(); + let registry = stored_query_registry(specs); + let state = AppState::open_single_with_queries( + graph.to_string_lossy().to_string(), + tokens + .iter() + .map(|(actor, token)| ((*actor).to_string(), (*token).to_string())) + .collect(), + Some(&policy_path), + registry, + ) + .await + .unwrap(); + (temp, build_app(state)) +} + +/// - `act-invoke`: invoke_query + read (stored reads, not mutations) +/// - `act-full`: invoke_query + read + change (stored mutations) +/// - `act-noinvoke`: read only, no invoke_query (boundary-denied) +const INVOKE_POLICY_YAML: &str = r#" +version: 1 +groups: + invokers: ["act-invoke"] + full: ["act-full"] + readers: ["act-noinvoke"] +protected_branches: [main] +rules: + - id: invokers-invoke-and-read + allow: + actors: { group: invokers } + actions: [invoke_query, read] + branch_scope: any + - id: full-invoke-read-change + allow: + actors: { group: full } + actions: [invoke_query, read, change] + branch_scope: any + - id: readers-read-only + allow: + actors: { group: readers } + actions: [read] + branch_scope: any +"#; + +const FIND_PERSON_GQ: &str = + "query find_person($name: String) { match { $p: Person { name: $name } } return { $p.age } }"; + +fn invoke_request(name: &str, token: &str, body: Value) -> Request { + Request::builder() + .uri(format!("/queries/{name}")) + .method(Method::POST) + .header("content-type", "application/json") + .header("authorization", format!("Bearer {token}")) + .body(Body::from(serde_json::to_vec(&body).unwrap())) + .unwrap() +} + +#[tokio::test(flavor = "multi_thread")] +async fn invoke_stored_read_returns_rows() { + let (_temp, app) = app_with_stored_queries( + &[("find_person", FIND_PERSON_GQ, false)], + &[("act-invoke", "t-invoke")], + INVOKE_POLICY_YAML, + ) + .await; + let (status, body) = json_response( + &app, + invoke_request("find_person", "t-invoke", json!({ "params": { "name": "Alice" } })), + ) + .await; + assert_eq!(status, StatusCode::OK, "body: {body}"); + assert_eq!(body["query_name"], "find_person"); + assert_eq!(body["row_count"], 1, "Alice is in the fixture; body: {body}"); + assert!(body["rows"].is_array(), "read envelope shape; body: {body}"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn invoke_stored_mutation_double_gates_on_change() { + let specs: &[(&str, &str, bool)] = &[( + "add_person", + "query add_person($name: String) { insert Person { name: $name } }", + false, + )]; + let (_temp, app) = app_with_stored_queries( + specs, + &[("act-invoke", "t-invoke"), ("act-full", "t-full")], + INVOKE_POLICY_YAML, + ) + .await; + + // Has invoke_query but NOT change → the inner change gate denies (403). + let (status, body) = json_response( + &app, + invoke_request("add_person", "t-invoke", json!({ "params": { "name": "Eve" } })), + ) + .await; + assert_eq!( + status, + StatusCode::FORBIDDEN, + "invoke_query without change must 403; body: {body}" + ); + + // Has invoke_query + change → applied. + let (status, body) = json_response( + &app, + invoke_request("add_person", "t-full", json!({ "params": { "name": "Eve" } })), + ) + .await; + assert_eq!(status, StatusCode::OK, "body: {body}"); + assert_eq!(body["affected_nodes"], 1, "body: {body}"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn invoke_stored_query_bad_param_is_400() { + let (_temp, app) = app_with_stored_queries( + &[("find_person", FIND_PERSON_GQ, false)], + &[("act-invoke", "t-invoke")], + INVOKE_POLICY_YAML, + ) + .await; + // `name` is declared String; pass a number. + let (status, body) = json_response( + &app, + invoke_request("find_person", "t-invoke", json!({ "params": { "name": 123 } })), + ) + .await; + assert_eq!(status, StatusCode::BAD_REQUEST, "body: {body}"); + assert!( + body["error"].as_str().unwrap_or_default().contains("name"), + "400 should name the offending param; body: {body}" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn invoke_unknown_query_and_denied_actor_return_identical_404() { + let (_temp, app) = app_with_stored_queries( + &[("find_person", FIND_PERSON_GQ, false)], + &[("act-invoke", "t-invoke"), ("act-noinvoke", "t-noinvoke")], + INVOKE_POLICY_YAML, + ) + .await; + + // Authorized actor, unknown query name → 404. + let (unknown_status, unknown_body) = + json_response(&app, invoke_request("does_not_exist", "t-invoke", json!({}))).await; + // Denied actor (no invoke_query), real query name → 404. + let (denied_status, denied_body) = json_response( + &app, + invoke_request("find_person", "t-noinvoke", json!({ "params": { "name": "Alice" } })), + ) + .await; + + assert_eq!(unknown_status, StatusCode::NOT_FOUND); + assert_eq!(denied_status, StatusCode::NOT_FOUND); + assert_eq!( + unknown_body, denied_body, + "deny must be byte-identical to a missing query (no catalog probing)" + ); +} + fn drifted_test_schema() -> String { fs::read_to_string(fixture("test.pg")) .unwrap() diff --git a/openapi.json b/openapi.json index d1fa337..f0108c3 100644 --- a/openapi.json +++ b/openapi.json @@ -829,6 +829,114 @@ ] } }, + "/queries/{name}": { + "post": { + "tags": [ + "queries" + ], + "summary": "Invoke a curated, server-side stored query by name.", + "description": "The query source comes from the graph's `queries:` registry, not the\nrequest body — callers send only runtime inputs (`params`, `branch`,\n`snapshot`). Gated by the `invoke_query` Cedar action at the boundary;\na stored *mutation* additionally passes the engine's `change` gate\n(double-gated). A denied actor and an unknown query both return the\nsame 404, so the catalog can't be probed.", + "operationId": "invoke_query", + "parameters": [ + { + "name": "name", + "in": "path", + "description": "Stored query name (the registry key)", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InvokeStoredQueryRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "Read envelope (ReadOutput) or mutation envelope (ChangeOutput), serialized untagged", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InvokeStoredQueryResponse" + } + } + } + }, + "400": { + "description": "Bad request (param type error; snapshot on a stored mutation)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "401": { + "description": "Unauthorized", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "403": { + "description": "Forbidden (the inner `change` gate for a stored mutation)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "404": { + "description": "Unknown stored query, or `invoke_query` denied (indistinguishable)", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "409": { + "description": "Merge conflict", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + }, + "429": { + "description": "Per-actor admission cap exceeded; honor `Retry-After` header", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorOutput" + } + } + } + } + }, + "security": [ + { + "bearer_token": [] + } + ] + } + }, "/query": { "post": { "tags": [ @@ -1628,6 +1736,40 @@ } } }, + "InvokeStoredQueryRequest": { + "type": "object", + "description": "Body for `POST /queries/{name}` — invokes the server-side stored query\nnamed in the path. The query source and name come from the registry,\nnever the body; only the runtime inputs are supplied here.", + "properties": { + "branch": { + "type": [ + "string", + "null" + ], + "description": "Branch to run against. Defaults to `main`; for a stored mutation the\nwrite targets this branch." + }, + "params": { + "description": "JSON object whose keys match the stored query's declared parameters." + }, + "snapshot": { + "type": [ + "string", + "null" + ], + "description": "Snapshot id to read from (read queries only — rejected for a stored\nmutation). Mutually exclusive with `branch`." + } + } + }, + "InvokeStoredQueryResponse": { + "oneOf": [ + { + "$ref": "#/components/schemas/ReadOutput" + }, + { + "$ref": "#/components/schemas/ChangeOutput" + } + ], + "description": "Response for `POST /queries/{name}`: the read envelope for a stored\nread, or the mutation envelope for a stored mutation. Serialized\n**untagged**, so the wire shape is exactly [`ReadOutput`] or\n[`ChangeOutput`] — classification follows the stored query, not a\nwrapper field." + }, "LoadMode": { "type": "string", "description": "Shadow enum for documenting [`LoadMode`] in the OpenAPI schema.",