From 66c37d289a95479a8f74f9155bd2a7884bca98e1 Mon Sep 17 00:00:00 2001 From: Ragnor Comerford Date: Tue, 9 Jun 2026 14:51:45 +0200 Subject: [PATCH] =?UTF-8?q?feat(server):=20MCP=20built-in=20tool=20surface?= =?UTF-8?q?=20(RFC-003=20=C2=A75.2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Project 14 built-in operations as MCP tools over the /mcp endpoint: health, snapshot, schema_get, branches_list, commits_list, commits_get, graphs_list, query, mutate, ingest, branches_create, branches_delete, branches_merge, schema_apply. Each tool reuses the exact do_* / run_query / run_mutate path (and the exact Cedar action) its REST route enforces, so there is no new business logic and no second authorization surface. - list_tools is Cedar-filtered: a tool is listed only if the actor's policy permits its action (a policy-engine error propagates; a denial hides). - call_tool enforces the same gate, masks a denied or unknown tool identically ("unknown tool: ", deny == missing), and classifies engine failures as isError tool results (4xx/409) vs JSON-RPC errors (5xx) per the 2025-11-25 spec split. - Tool annotations set readOnlyHint / destructiveHint / openWorldHint explicitly (rmcp defaults destructive + open_world to true). - The actor and graph handle are read from the request extensions the bearer/handle middleware attached, threaded through rmcp's RequestContext (RFC-003 §5.8); a multi-graph tool call resolves the per-request graph for free. - Bound MCP request bodies with tower-http RequestBodyLimitLayer at the ingest limit: rmcp reads the body directly, so axum's DefaultBodyLimit does not cover /mcp. Tests (tests/server.rs, 11 MCP cases): tools/list contents + Cedar filtering by policy, a snapshot read and a mutate write end to end through the extension passthrough, masked deny, malformed-query isError, unknown-tool JSON-RPC error, and the read/write annotation hints. docs/user/server.md deferred to the docs phase per the RFC-003 rollout. Co-Authored-By: Claude --- crates/omnigraph-server/Cargo.toml | 2 +- crates/omnigraph-server/src/lib.rs | 14 +- crates/omnigraph-server/src/mcp/builtins.rs | 523 ++++++++++++++++++++ crates/omnigraph-server/src/mcp/mod.rs | 86 +++- crates/omnigraph-server/tests/server.rs | 281 +++++++++++ 5 files changed, 887 insertions(+), 19 deletions(-) create mode 100644 crates/omnigraph-server/src/mcp/builtins.rs diff --git a/crates/omnigraph-server/Cargo.toml b/crates/omnigraph-server/Cargo.toml index 6c0f421..e92461e 100644 --- a/crates/omnigraph-server/Cargo.toml +++ b/crates/omnigraph-server/Cargo.toml @@ -31,7 +31,7 @@ tokio = { workspace = true } serde_yaml = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } -tower-http = { workspace = true } +tower-http = { workspace = true, features = ["limit"] } utoipa = { workspace = true } futures = { workspace = true } sha2 = { workspace = true } diff --git a/crates/omnigraph-server/src/lib.rs b/crates/omnigraph-server/src/lib.rs index 338b0cf..4e8329d 100644 --- a/crates/omnigraph-server/src/lib.rs +++ b/crates/omnigraph-server/src/lib.rs @@ -610,6 +610,18 @@ fn hash_bearer_tokens(bearer_tokens: Vec<(String, String)>) -> Arc<[(BearerToken } impl ApiError { + /// HTTP status this error maps to. Lets the MCP layer classify operational + /// failures (500 → JSON-RPC error) from tool-execution errors (4xx/409 → + /// `isError` content the model can self-correct on). + pub(crate) fn status_code(&self) -> StatusCode { + self.status + } + + /// The human-readable error message, for building MCP tool-error content. + pub(crate) fn message_str(&self) -> &str { + &self.message + } + pub fn unauthorized(message: impl Into) -> Self { Self { status: StatusCode::UNAUTHORIZED, @@ -1143,7 +1155,7 @@ pub fn build_app(state: AppState) -> Router { // shares this group's bearer-auth + graph-handle middleware. Single // mode serves it flat at `/mcp`; multi mode nests it to // `/graphs/{graph_id}/mcp` (per-graph isolation for free). - .route_service("/mcp", mcp::mcp_service(state.clone())) + .merge(mcp::mcp_router(state.clone())) .route_layer(middleware::from_fn_with_state( state.clone(), resolve_graph_handle, diff --git a/crates/omnigraph-server/src/mcp/builtins.rs b/crates/omnigraph-server/src/mcp/builtins.rs new file mode 100644 index 0000000..22f9354 --- /dev/null +++ b/crates/omnigraph-server/src/mcp/builtins.rs @@ -0,0 +1,523 @@ +//! Built-in MCP tools — RFC-003 §5.2. +//! +//! Each tool reuses the EXACT `do_*` / `run_query` / `run_mutate` path (and the +//! exact Cedar action) its HTTP route uses, so there is no new business logic +//! and no second authorization surface. `list_tools` Cedar-filters the set; +//! `call_tool` enforces the same gate (deny ≡ unknown tool) then dispatches. + +use std::sync::Arc; + +use rmcp::ErrorData as McpError; +use rmcp::RoleServer; +use rmcp::model::{CallToolResult, Content, JsonObject, Tool, ToolAnnotations}; +use rmcp::service::RequestContext; +use serde::Serialize; +use serde_json::{Value, json}; + +use crate::{ + AppState, Authz, GraphHandle, PolicyAction, PolicyRequest, ResolvedActor, api, authorize, + do_branch_create, do_branch_delete, do_branch_merge, do_branches_list, do_commit_show, + do_commits_list, do_graphs_list, do_ingest, do_schema_apply, do_schema_get, do_snapshot, + run_mutate, run_query, +}; +use omnigraph::loader::LoadMode; + +/// Per-request context for a tool call. `state` comes from the shared handler; +/// `handle` (per-graph) and `actor` are read back out of the request extensions +/// that the `resolve_graph_handle` / `require_bearer_auth` middleware attached +/// before rmcp ran (RFC-003 §5.8 — the handler consumes a resolved actor and +/// branches on nothing about how it was verified). +pub(crate) struct ToolCx { + pub state: AppState, + pub handle: Option>, + pub actor: Option, +} + +impl ToolCx { + fn graph(&self) -> Result<&Arc, McpError> { + self.handle.as_ref().ok_or_else(|| { + McpError::internal_error("graph handle missing from MCP request context", None) + }) + } + + fn actor_ref(&self) -> Option<&ResolvedActor> { + self.actor.as_ref() + } +} + +/// Pull the per-request actor + graph handle out of the `http::request::Parts` +/// rmcp threads into `RequestContext.extensions`. +pub(crate) fn resolve_cx( + state: &AppState, + ctx: &RequestContext, +) -> Result { + let parts = ctx + .extensions + .get::() + .ok_or_else(|| { + McpError::internal_error("request parts missing from MCP request context", None) + })?; + Ok(ToolCx { + state: state.clone(), + handle: parts.extensions.get::>().cloned(), + actor: parts.extensions.get::().cloned(), + }) +} + +/// Which Cedar entity an action's gate binds to. +enum Gate { + /// Per-graph action gated against `handle.policy`. + Graph(PolicyAction), + /// Server-scoped action gated against `state.server_policy`. + Server(PolicyAction), + /// No Cedar gate (liveness/version). + None, +} + +/// The built-in tool set. One variant per RFC-003 §5.2 tool; the dynamic +/// stored-query tools are a separate populator (Phase 4). +#[derive(Clone, Copy)] +pub(crate) enum Builtin { + Health, + Snapshot, + SchemaGet, + BranchesList, + CommitsList, + CommitsGet, + GraphsList, + Query, + Mutate, + Ingest, + BranchesCreate, + BranchesDelete, + BranchesMerge, + SchemaApply, +} + +impl Builtin { + pub(crate) fn all() -> &'static [Builtin] { + use Builtin::*; + &[ + Health, + Snapshot, + SchemaGet, + BranchesList, + CommitsList, + CommitsGet, + GraphsList, + Query, + Mutate, + Ingest, + BranchesCreate, + BranchesDelete, + BranchesMerge, + SchemaApply, + ] + } + + pub(crate) fn from_name(name: &str) -> Option { + Builtin::all().iter().copied().find(|b| b.name() == name) + } + + pub(crate) fn name(self) -> &'static str { + match self { + Builtin::Health => "health", + Builtin::Snapshot => "snapshot", + Builtin::SchemaGet => "schema_get", + Builtin::BranchesList => "branches_list", + Builtin::CommitsList => "commits_list", + Builtin::CommitsGet => "commits_get", + Builtin::GraphsList => "graphs_list", + Builtin::Query => "query", + Builtin::Mutate => "mutate", + Builtin::Ingest => "ingest", + Builtin::BranchesCreate => "branches_create", + Builtin::BranchesDelete => "branches_delete", + Builtin::BranchesMerge => "branches_merge", + Builtin::SchemaApply => "schema_apply", + } + } + + /// The MCP `Tool` descriptor (name, description, JSON-Schema input). + pub(crate) fn descriptor(self) -> Tool { + let (description, schema) = match self { + Builtin::Health => ( + "Liveness + server version. No graph access.", + json!({"type": "object", "additionalProperties": false}), + ), + Builtin::Snapshot => ( + "Read a branch's current snapshot: manifest version plus per-table metadata. Read-only.", + json!({ + "type": "object", + "properties": {"branch": {"type": "string", "description": "Branch to read (default `main`)."}}, + "additionalProperties": false + }), + ), + Builtin::SchemaGet => ( + "Return the graph's `.pg` schema source. Read-only.", + json!({"type": "object", "additionalProperties": false}), + ), + Builtin::BranchesList => ( + "List all branch names (sorted). Read-only.", + json!({"type": "object", "additionalProperties": false}), + ), + Builtin::CommitsList => ( + "List commits, optionally filtered to one branch (most recent first). Read-only.", + json!({ + "type": "object", + "properties": {"branch": {"type": "string", "description": "Filter to this branch; omit for all."}}, + "additionalProperties": false + }), + ), + Builtin::CommitsGet => ( + "Get a single commit's metadata by id. Read-only.", + json!({ + "type": "object", + "properties": {"commit_id": {"type": "string"}}, + "required": ["commit_id"], + "additionalProperties": false + }), + ), + Builtin::GraphsList => ( + "List the graphs registered with this server (multi-graph mode only).", + json!({"type": "object", "additionalProperties": false}), + ), + Builtin::Query => ( + "Run an ad-hoc read-only GQ query. Mutations are rejected — use `mutate`.", + json!({ + "type": "object", + "properties": { + "source": {"type": "string", "description": "GQ query source."}, + "branch": {"type": "string", "description": "Branch to read (default `main`)."}, + "snapshot": {"type": "string", "description": "Snapshot id to read (mutually exclusive with `branch`)."}, + "params": {"type": "object", "description": "Named query parameters."} + }, + "required": ["source"], + "additionalProperties": false + }), + ), + Builtin::Mutate => ( + "Run an ad-hoc GQ mutation (insert/update/delete) on a branch. Writes are branchable and policy-gated.", + json!({ + "type": "object", + "properties": { + "source": {"type": "string", "description": "GQ mutation source."}, + "branch": {"type": "string", "description": "Branch to write (default `main`)."}, + "params": {"type": "object", "description": "Named query parameters."} + }, + "required": ["source"], + "additionalProperties": false + }), + ), + Builtin::Ingest => ( + "Bulk-ingest NDJSON into a branch. `mode` controls existing rows: merge (upsert, default), append, or overwrite (replaces table contents). Creates the branch from `from` if absent.", + json!({ + "type": "object", + "properties": { + "data": {"type": "string", "description": "NDJSON, one record per line."}, + "mode": {"type": "string", "enum": ["merge", "append", "overwrite"], "description": "Default `merge`."}, + "branch": {"type": "string", "description": "Branch to ingest into (default `main`)."}, + "from": {"type": "string", "description": "Parent branch to fork from when `branch` does not exist (default `main`)."} + }, + "required": ["data"], + "additionalProperties": false + }), + ), + Builtin::BranchesCreate => ( + "Create a branch, forking `name` off `from` (default `main`). Additive — shares parent data until written.", + json!({ + "type": "object", + "properties": { + "name": {"type": "string", "description": "New branch name."}, + "from": {"type": "string", "description": "Parent branch (default `main`)."} + }, + "required": ["name"], + "additionalProperties": false + }), + ), + Builtin::BranchesDelete => ( + "Delete a branch pointer. Irreversible.", + json!({ + "type": "object", + "properties": {"branch": {"type": "string", "description": "Branch to delete."}}, + "required": ["branch"], + "additionalProperties": false + }), + ), + Builtin::BranchesMerge => ( + "Merge `source` into `target` (default `main`). Returns the outcome (already_up_to_date / fast_forward / merged) or a conflict; the target is unchanged on conflict.", + json!({ + "type": "object", + "properties": { + "source": {"type": "string", "description": "Branch to merge from."}, + "target": {"type": "string", "description": "Branch to merge into (default `main`)."} + }, + "required": ["source"], + "additionalProperties": false + }), + ), + Builtin::SchemaApply => ( + "Apply a schema migration. Diffs `schema_source` against the current schema and applies the steps. Destructive: some steps drop data — set `allow_data_loss` to permit those.", + json!({ + "type": "object", + "properties": { + "schema_source": {"type": "string", "description": "The target `.pg` schema source."}, + "allow_data_loss": {"type": "boolean", "description": "Permit data-dropping steps (default false)."} + }, + "required": ["schema_source"], + "additionalProperties": false + }), + ), + }; + Tool::new(self.name(), description, schema_object(schema)).with_annotations(self.annotations()) + } + + /// MCP behavior hints. rmcp defaults `destructive_hint` and `open_world_hint` + /// to `true`, so every field is set explicitly. The engine operates on its + /// own datasets, so `open_world` is always `false`. + fn annotations(self) -> ToolAnnotations { + let read = || ToolAnnotations::new().read_only(true).open_world(false); + let additive = || { + ToolAnnotations::new() + .read_only(false) + .destructive(false) + .open_world(false) + }; + let destructive = || { + ToolAnnotations::new() + .read_only(false) + .destructive(true) + .open_world(false) + }; + match self { + Builtin::Health + | Builtin::Snapshot + | Builtin::SchemaGet + | Builtin::BranchesList + | Builtin::CommitsList + | Builtin::CommitsGet + | Builtin::GraphsList + | Builtin::Query => read(), + Builtin::BranchesCreate => additive(), + Builtin::Mutate + | Builtin::Ingest + | Builtin::BranchesDelete + | Builtin::BranchesMerge + | Builtin::SchemaApply => destructive(), + } + } + + /// The Cedar gate for list-time visibility. Uses `branch: None`; the actual + /// `do_*` / `run_*` call re-authorizes with the real branch, so for + /// branch-scoped policies the listed set is a best-effort approximation and + /// `call_tool` is the authoritative gate (RFC-003 §5.4 R7 caveat). + fn gate(self) -> Gate { + match self { + Builtin::Health => Gate::None, + Builtin::Snapshot + | Builtin::SchemaGet + | Builtin::BranchesList + | Builtin::CommitsList + | Builtin::CommitsGet + | Builtin::Query => Gate::Graph(PolicyAction::Read), + Builtin::Mutate | Builtin::Ingest => Gate::Graph(PolicyAction::Change), + Builtin::BranchesCreate => Gate::Graph(PolicyAction::BranchCreate), + Builtin::BranchesDelete => Gate::Graph(PolicyAction::BranchDelete), + Builtin::BranchesMerge => Gate::Graph(PolicyAction::BranchMerge), + Builtin::SchemaApply => Gate::Graph(PolicyAction::SchemaApply), + Builtin::GraphsList => Gate::Server(PolicyAction::GraphList), + } + } + + async fn call(self, cx: &ToolCx, args: &JsonObject) -> Result { + match self { + Builtin::Health => ok_json(&json!({ + "status": "ok", + "version": env!("CARGO_PKG_VERSION"), + })), + Builtin::Snapshot => { + let branch = opt_str(args, "branch").unwrap_or_else(|| "main".to_string()); + to_tool(do_snapshot(cx.graph()?, cx.actor_ref(), branch).await) + } + Builtin::SchemaGet => to_tool(do_schema_get(cx.graph()?, cx.actor_ref()).await), + Builtin::BranchesList => to_tool(do_branches_list(cx.graph()?, cx.actor_ref()).await), + Builtin::CommitsList => to_tool( + do_commits_list(cx.graph()?, cx.actor_ref(), opt_str(args, "branch")).await, + ), + Builtin::CommitsGet => { + let commit_id = req_str(args, "commit_id")?; + to_tool(do_commit_show(cx.graph()?, cx.actor_ref(), &commit_id).await) + } + Builtin::GraphsList => to_tool(do_graphs_list(&cx.state, cx.actor_ref()).await), + Builtin::Query => { + let source = req_str(args, "source")?; + let result = run_query( + Arc::clone(cx.graph()?), + cx.actor_ref(), + &source, + None, + args.get("params"), + opt_str(args, "branch"), + opt_str(args, "snapshot"), + true, + ) + .await; + match result { + Ok((name, target, qr)) => ok_json(&api::read_output(name, &target, qr)), + Err(err) => Ok(api_error_to_tool(err)), + } + } + Builtin::Mutate => { + let source = req_str(args, "source")?; + let branch = opt_str(args, "branch").unwrap_or_else(|| "main".to_string()); + to_tool( + run_mutate( + cx.state.clone(), + Arc::clone(cx.graph()?), + cx.actor_ref(), + &source, + None, + args.get("params"), + branch, + ) + .await, + ) + } + Builtin::Ingest => { + let data = req_str(args, "data")?; + let mode = match args.get("mode") { + Some(value) => serde_json::from_value::(value.clone()) + .map_err(|err| McpError::invalid_params(format!("invalid `mode`: {err}"), None))?, + None => LoadMode::Merge, + }; + let branch = opt_str(args, "branch").unwrap_or_else(|| "main".to_string()); + let from = opt_str(args, "from").unwrap_or_else(|| "main".to_string()); + to_tool( + do_ingest( + &cx.state, + cx.graph()?, + cx.actor_ref(), + &data, + mode, + branch, + from, + ) + .await, + ) + } + Builtin::BranchesCreate => { + let name = req_str(args, "name")?; + let from = opt_str(args, "from").unwrap_or_else(|| "main".to_string()); + to_tool(do_branch_create(&cx.state, cx.graph()?, cx.actor_ref(), name, from).await) + } + Builtin::BranchesDelete => { + let branch = req_str(args, "branch")?; + to_tool(do_branch_delete(&cx.state, cx.graph()?, cx.actor_ref(), branch).await) + } + Builtin::BranchesMerge => { + let source = req_str(args, "source")?; + let target = opt_str(args, "target").unwrap_or_else(|| "main".to_string()); + to_tool(do_branch_merge(&cx.state, cx.graph()?, cx.actor_ref(), source, target).await) + } + Builtin::SchemaApply => { + let schema_source = req_str(args, "schema_source")?; + let allow_data_loss = args + .get("allow_data_loss") + .and_then(Value::as_bool) + .unwrap_or(false); + to_tool( + do_schema_apply( + &cx.state, + cx.graph()?, + cx.actor_ref(), + &schema_source, + allow_data_loss, + ) + .await, + ) + } + } + } +} + +/// Is `tool` callable by this actor (argument-independent gate)? Used by both +/// `list_tools` (visibility) and `call_tool` (deny ≡ unknown masking). `Err` is +/// an operational failure (propagates as a JSON-RPC error); `Ok(false)` hides. +pub(crate) fn is_visible(tool: Builtin, cx: &ToolCx) -> Result { + let (policy, action) = match tool.gate() { + Gate::None => return Ok(true), + Gate::Graph(action) => (cx.handle.as_ref().and_then(|h| h.policy.as_deref()), action), + Gate::Server(action) => (cx.state.server_policy.as_deref(), action), + }; + let request = PolicyRequest { + action, + branch: None, + target_branch: None, + }; + match authorize(cx.actor_ref(), policy, request) { + Ok(Authz::Allowed) => Ok(true), + Ok(Authz::Denied(_)) => Ok(false), + Err(err) => Err(api_operational_error(err)), + } +} + +/// Dispatch a `tools/call` to a built-in. The caller has already enforced the +/// visibility gate (deny ≡ unknown), so this only parses args and runs. +pub(crate) async fn dispatch( + tool: Builtin, + cx: &ToolCx, + args: &JsonObject, +) -> Result { + tool.call(cx, args).await +} + +// ---- helpers --------------------------------------------------------------- + +fn schema_object(value: Value) -> Arc { + Arc::new( + value + .as_object() + .expect("tool input schema literal must be a JSON object") + .clone(), + ) +} + +fn opt_str(args: &JsonObject, key: &str) -> Option { + args.get(key).and_then(Value::as_str).map(str::to_string) +} + +fn req_str(args: &JsonObject, key: &str) -> Result { + opt_str(args, key) + .ok_or_else(|| McpError::invalid_params(format!("missing required argument `{key}`"), None)) +} + +fn ok_json(value: &T) -> Result { + let text = serde_json::to_string_pretty(value) + .map_err(|err| McpError::internal_error(format!("serialize tool result: {err}"), None))?; + Ok(CallToolResult::success(vec![Content::text(text)])) +} + +/// Map a `do_*` result into a tool result: `Ok` → JSON success; `Err` → either a +/// JSON-RPC error (operational 5xx) or an `isError` tool result (4xx/409, which +/// the model can self-correct on — the 2025-11-25 SEP-1303 split). +fn to_tool(result: Result) -> Result { + match result { + Ok(value) => ok_json(&value), + Err(err) => Ok(api_error_to_tool(err)), + } +} + +fn api_error_to_tool(err: crate::ApiError) -> CallToolResult { + CallToolResult::error(vec![Content::text(err.message_str().to_string())]) +} + +/// 5xx engine/policy failures are operational → surface as a JSON-RPC error so +/// the client (not the model) sees them; anything else that reaches here is +/// treated as internal too (callers route 4xx through `api_error_to_tool`). +fn api_operational_error(err: crate::ApiError) -> McpError { + use axum::http::StatusCode; + match err.status_code() { + StatusCode::UNAUTHORIZED => McpError::invalid_request(err.message_str().to_string(), None), + _ => McpError::internal_error(err.message_str().to_string(), None), + } +} diff --git a/crates/omnigraph-server/src/mcp/mod.rs b/crates/omnigraph-server/src/mcp/mod.rs index 279d5e0..93f409e 100644 --- a/crates/omnigraph-server/src/mcp/mod.rs +++ b/crates/omnigraph-server/src/mcp/mod.rs @@ -16,16 +16,25 @@ use std::sync::Arc; +use axum::Router; use rmcp::{ ErrorData as McpError, RoleServer, ServerHandler, - model::{ListToolsResult, PaginatedRequestParams, ServerCapabilities, ServerInfo}, + model::{ + CallToolRequestParams, CallToolResult, ListToolsResult, PaginatedRequestParams, + ServerCapabilities, ServerInfo, + }, service::RequestContext, transport::streamable_http_server::{ StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager, }, }; -use crate::AppState; +use tower_http::limit::RequestBodyLimitLayer; + +use crate::{AppState, INGEST_REQUEST_BODY_LIMIT_BYTES}; +use builtins::Builtin; + +mod builtins; /// Server-level guidance returned in the MCP `initialize` response. const MCP_INSTRUCTIONS: &str = "OmniGraph is a versioned, branchable property graph. \ @@ -37,10 +46,10 @@ are not permitted to call."; /// streamable-HTTP service constructs one per request in stateless mode. #[derive(Clone)] pub(crate) struct OmnigraphMcpHandler { - // Wired in Phase 3 (tools) / Phase 5 (resources): the handler resolves the - // per-request actor + graph handle from the request extensions and routes - // tool calls through the shared `do_*` / `run_query` / `run_mutate` paths. - #[allow(dead_code)] + // The handler resolves the per-request actor + graph handle from the + // request extensions (`resolve_cx`) and routes tool calls through the shared + // `do_*` / `run_query` / `run_mutate` paths. `state` supplies workload + // admission, the server-level policy, and graph routing. state: AppState, } @@ -71,22 +80,58 @@ impl ServerHandler for OmnigraphMcpHandler { async fn list_tools( &self, _request: Option, - _context: RequestContext, + context: RequestContext, ) -> Result { - // Phase 3 populates this with the Cedar-filtered built-in tools; Phase 4 - // adds the dynamic stored-query tools. - Ok(ListToolsResult::default()) + let cx = builtins::resolve_cx(&self.state, &context)?; + let mut tools = Vec::new(); + for &tool in Builtin::all() { + // Emit only tools the actor's Cedar policy permits. An operational + // failure (policy-engine error) propagates; a denial just hides. + if builtins::is_visible(tool, &cx)? { + tools.push(tool.descriptor()); + } + } + // Phase 4 appends the dynamic stored-query tools here. + let mut result = ListToolsResult::default(); + result.tools = tools; + Ok(result) + } + + async fn call_tool( + &self, + request: CallToolRequestParams, + context: RequestContext, + ) -> Result { + let cx = builtins::resolve_cx(&self.state, &context)?; + let Some(tool) = Builtin::from_name(&request.name) else { + // Unknown tool → JSON-RPC error (a dispatch failure, not a + // tool-execution error). + return Err(McpError::invalid_params( + format!("unknown tool: {}", request.name), + None, + )); + }; + // Enforce the visibility gate at call-time too, and mask a denial as + // "unknown tool" so the catalog isn't probeable without the grant (the + // same deny ≡ missing principle as `POST /queries/{name}`). The inner + // `do_*` / `run_*` re-authorizes against the real branch. + if !builtins::is_visible(tool, &cx)? { + return Err(McpError::invalid_params( + format!("unknown tool: {}", request.name), + None, + )); + } + let args = request.arguments.unwrap_or_default(); + builtins::dispatch(tool, &cx, &args).await } } -/// Build the stateless Streamable-HTTP MCP service mounted at `/mcp`. +/// Build the `/mcp` route: a stateless Streamable-HTTP MCP service, body-capped. /// -/// Mounted inside the `per_graph_protected` route group so the bearer-auth and +/// Merged into the `per_graph_protected` route group so the bearer-auth and /// graph-handle middleware run first; in multi-graph mode the same service is /// reached at `/graphs/{graph_id}/mcp`. -pub(crate) fn mcp_service( - state: AppState, -) -> StreamableHttpService { +pub(crate) fn mcp_router(state: AppState) -> Router { let handler = OmnigraphMcpHandler::new(state); // `StreamableHttpServerConfig` is `#[non_exhaustive]`: start from `Default`, // then flip to stateless JSON. Keep rmcp's loopback `allowed_hosts` default @@ -96,9 +141,16 @@ pub(crate) fn mcp_service( let config = StreamableHttpServerConfig::default() .with_stateful_mode(false) .with_json_response(true); - StreamableHttpService::new( + let service = StreamableHttpService::new( move || Ok(handler.clone()), Arc::new(LocalSessionManager::default()), config, - ) + ); + // rmcp reads the request body directly (it doesn't go through axum's + // `Bytes`/`Json` extractor), so the router's `DefaultBodyLimit` does NOT + // bound `/mcp`. Cap it explicitly at the ingest limit (the largest tool + // payload) so an MCP `ingest`/`schema_apply` call can't stream unbounded. + Router::new() + .route_service("/mcp", service) + .layer(RequestBodyLimitLayer::new(INGEST_REQUEST_BODY_LIMIT_BYTES)) } diff --git a/crates/omnigraph-server/tests/server.rs b/crates/omnigraph-server/tests/server.rs index 030650a..1331d9f 100644 --- a/crates/omnigraph-server/tests/server.rs +++ b/crates/omnigraph-server/tests/server.rs @@ -944,6 +944,287 @@ async fn mcp_get_returns_405_not_404() { assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); } +#[tokio::test] +async fn mcp_tools_list_includes_builtins() { + let (_temp, app) = app_for_loaded_graph().await; + let (status, body) = json_response( + &app, + mcp_post(json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/list" })), + ) + .await; + assert_eq!(status, StatusCode::OK, "{body}"); + let tools = body["result"]["tools"].as_array().expect("tools array"); + let names: Vec<&str> = tools.iter().filter_map(|t| t["name"].as_str()).collect(); + for expected in [ + "health", + "snapshot", + "schema_get", + "branches_list", + "commits_list", + "query", + "mutate", + ] { + assert!( + names.contains(&expected), + "tools/list missing `{expected}`: {names:?}" + ); + } +} + +#[tokio::test] +async fn mcp_tools_call_snapshot_reads_through_extension_passthrough() { + let (_temp, app) = app_for_loaded_graph().await; + let (status, body) = json_response( + &app, + mcp_post(json!({ + "jsonrpc": "2.0", + "id": 3, + "method": "tools/call", + "params": { "name": "snapshot", "arguments": {} } + })), + ) + .await; + assert_eq!(status, StatusCode::OK, "{body}"); + // The handler resolved the GraphHandle from the request extensions and ran + // do_snapshot — this is the RFC-003 §5.8 actor/handle passthrough proof. + assert_ne!( + body["result"]["isError"], + json!(true), + "snapshot tool errored: {body}" + ); + let text = body["result"]["content"][0]["text"] + .as_str() + .expect("text content block"); + let snapshot: Value = serde_json::from_str(text).expect("snapshot json payload"); + assert_eq!(snapshot["branch"], "main"); + assert!( + snapshot["manifest_version"].is_number(), + "snapshot carries a manifest_version: {snapshot}" + ); +} + +#[tokio::test] +async fn mcp_tools_call_unknown_tool_is_jsonrpc_error() { + let (_temp, app) = app_for_loaded_graph().await; + let (status, body) = json_response( + &app, + mcp_post(json!({ + "jsonrpc": "2.0", + "id": 4, + "method": "tools/call", + "params": { "name": "does_not_exist", "arguments": {} } + })), + ) + .await; + assert_eq!(status, StatusCode::OK, "JSON-RPC errors ride a 200"); + assert!( + body["error"].is_object(), + "unknown tool is a JSON-RPC error, not a result: {body}" + ); +} + +/// A read-only actor (`act-reader`) and an admin (`act-admin`), so `tools/list` +/// filtering by Cedar action is observable. +const MCP_FILTER_POLICY_YAML: &str = r#" +version: 1 +groups: + readers: ["act-reader"] + admins: ["act-admin"] +protected_branches: [main] +rules: + - id: read-only + allow: + actors: { group: readers } + actions: [read] + branch_scope: any + - id: admin-data + allow: + actors: { group: admins } + actions: [read, change, export] + branch_scope: any + - id: admin-targets + allow: + actors: { group: admins } + actions: [schema_apply, branch_create, branch_delete, branch_merge] + target_branch_scope: any +"#; + +fn mcp_post_auth(body: Value, token: &str) -> Request { + let mut request = mcp_post(body); + request + .headers_mut() + .insert(AUTHORIZATION, format!("Bearer {token}").parse().unwrap()); + request +} + +async fn mcp_tool_names(app: &Router, token: &str) -> Vec { + let (status, body) = json_response( + app, + mcp_post_auth( + json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list" }), + token, + ), + ) + .await; + assert_eq!(status, StatusCode::OK, "{body}"); + body["result"]["tools"] + .as_array() + .expect("tools array") + .iter() + .filter_map(|t| t["name"].as_str().map(String::from)) + .collect() +} + +#[tokio::test] +async fn mcp_tools_list_cedar_filters_by_policy() { + let (_temp, app) = app_with_stored_queries( + &[], + &[("act-reader", "reader-tok"), ("act-admin", "admin-tok")], + MCP_FILTER_POLICY_YAML, + ) + .await; + + let reader = mcp_tool_names(&app, "reader-tok").await; + assert!(reader.contains(&"snapshot".to_string()), "{reader:?}"); + assert!(reader.contains(&"query".to_string()), "{reader:?}"); + for hidden in ["mutate", "ingest", "branches_create", "branches_delete", "schema_apply"] { + assert!( + !reader.contains(&hidden.to_string()), + "read-only actor must NOT see `{hidden}`: {reader:?}" + ); + } + + let admin = mcp_tool_names(&app, "admin-tok").await; + for visible in ["mutate", "ingest", "branches_create", "schema_apply"] { + assert!( + admin.contains(&visible.to_string()), + "admin must see `{visible}`: {admin:?}" + ); + } +} + +#[tokio::test] +async fn mcp_tools_call_mutate_writes() { + let (_temp, app) = app_for_loaded_graph_with_auth("admin-token").await; + let (status, body) = json_response( + &app, + mcp_post_auth( + json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": "mutate", + "arguments": { + "source": "query ins($name: String, $age: I32) { insert Person { name: $name, age: $age } }", + "params": { "name": "Zelda", "age": 40 } + } + } + }), + "admin-token", + ), + ) + .await; + assert_eq!(status, StatusCode::OK, "{body}"); + assert_ne!(body["result"]["isError"], json!(true), "mutate failed: {body}"); + let text = body["result"]["content"][0]["text"] + .as_str() + .expect("change envelope text"); + let change: Value = serde_json::from_str(text).expect("change json"); + assert!( + change["affected_nodes"].as_u64().unwrap_or(0) >= 1, + "mutate wrote a node through MCP: {change}" + ); +} + +#[tokio::test] +async fn mcp_tools_call_denied_is_masked_as_unknown() { + let (_temp, app) = + app_with_stored_queries(&[], &[("act-reader", "reader-tok")], MCP_FILTER_POLICY_YAML).await; + let (status, denied) = json_response( + &app, + mcp_post_auth( + json!({ + "jsonrpc": "2.0", "id": 1, "method": "tools/call", + "params": { "name": "mutate", "arguments": { "source": "x" } } + }), + "reader-tok", + ), + ) + .await; + let (_s, unknown) = json_response( + &app, + mcp_post_auth( + json!({ + "jsonrpc": "2.0", "id": 1, "method": "tools/call", + "params": { "name": "does_not_exist", "arguments": {} } + }), + "reader-tok", + ), + ) + .await; + assert_eq!(status, StatusCode::OK); + // Deny ≡ unknown: a denied existing tool is reported with the SAME error + // code and the SAME `unknown tool: ` template as a truly-unknown tool + // (the echoed name is the one the caller already supplied), so an + // unauthorized caller cannot tell "denied" from "does not exist". + assert_eq!(denied["error"]["code"], json!(-32602)); + assert_eq!(denied["error"]["code"], unknown["error"]["code"]); + assert_eq!(denied["error"]["message"], json!("unknown tool: mutate")); + assert_eq!( + unknown["error"]["message"], + json!("unknown tool: does_not_exist") + ); +} + +#[tokio::test] +async fn mcp_tools_call_business_error_is_iserror() { + let (_temp, app) = app_for_loaded_graph().await; + let (status, body) = json_response( + &app, + mcp_post(json!({ + "jsonrpc": "2.0", "id": 1, "method": "tools/call", + "params": { "name": "query", "arguments": { "source": "this is not valid gq" } } + })), + ) + .await; + assert_eq!(status, StatusCode::OK); + // A bad query is a tool-execution error the model can self-correct on, NOT a + // JSON-RPC protocol error (the 2025-11-25 SEP-1303 split). + assert_eq!( + body["result"]["isError"], + json!(true), + "malformed GQ → isError result: {body}" + ); + assert!(body["error"].is_null(), "not a JSON-RPC error: {body}"); +} + +#[tokio::test] +async fn mcp_tool_annotations_match_read_write() { + let (_temp, app) = app_for_loaded_graph().await; + let (_s, body) = json_response( + &app, + mcp_post(json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list" })), + ) + .await; + let tools = body["result"]["tools"].as_array().expect("tools array"); + let find = |name: &str| { + tools + .iter() + .find(|t| t["name"] == json!(name)) + .unwrap_or_else(|| panic!("tool `{name}` listed")) + .clone() + }; + assert_eq!(find("snapshot")["annotations"]["readOnlyHint"], json!(true)); + let schema_apply = find("schema_apply"); + assert_eq!(schema_apply["annotations"]["readOnlyHint"], json!(false)); + assert_eq!( + schema_apply["annotations"]["destructiveHint"], + json!(true), + "schema_apply is destructive: {schema_apply}" + ); +} + #[tokio::test] async fn schema_apply_route_updates_graph_for_authorized_admin() { let (temp, app) = app_for_graph_with_auth_tokens_and_policy(