feat(server): MCP built-in tool surface (RFC-003 §5.2)

Project 14 built-in operations as MCP tools over the /mcp endpoint: health,
snapshot, schema_get, branches_list, commits_list, commits_get, graphs_list,
query, mutate, ingest, branches_create, branches_delete, branches_merge,
schema_apply. Each tool reuses the exact do_* / run_query / run_mutate path
(and the exact Cedar action) its REST route enforces, so there is no new
business logic and no second authorization surface.

- list_tools is Cedar-filtered: a tool is listed only if the actor's policy
  permits its action (a policy-engine error propagates; a denial hides).
- call_tool enforces the same gate, masks a denied or unknown tool identically
  ("unknown tool: <name>", deny == missing), and classifies engine failures as
  isError tool results (4xx/409) vs JSON-RPC errors (5xx) per the 2025-11-25
  spec split.
- Tool annotations set readOnlyHint / destructiveHint / openWorldHint
  explicitly (rmcp defaults destructive + open_world to true).
- The actor and graph handle are read from the request extensions the
  bearer/handle middleware attached, threaded through rmcp's RequestContext
  (RFC-003 §5.8); a multi-graph tool call resolves the per-request graph for
  free.
- Bound MCP request bodies with tower-http RequestBodyLimitLayer at the ingest
  limit: rmcp reads the body directly, so axum's DefaultBodyLimit does not
  cover /mcp.

Tests (tests/server.rs, 11 MCP cases): tools/list contents + Cedar filtering
by policy, a snapshot read and a mutate write end to end through the extension
passthrough, masked deny, malformed-query isError, unknown-tool JSON-RPC error,
and the read/write annotation hints. docs/user/server.md deferred to the docs
phase per the RFC-003 rollout.

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-06-09 14:51:45 +02:00
parent 9df9f394c0
commit 66c37d289a
No known key found for this signature in database
5 changed files with 887 additions and 19 deletions

View file

@ -31,7 +31,7 @@ tokio = { workspace = true }
serde_yaml = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tower-http = { workspace = true }
tower-http = { workspace = true, features = ["limit"] }
utoipa = { workspace = true }
futures = { workspace = true }
sha2 = { workspace = true }

View file

@ -610,6 +610,18 @@ fn hash_bearer_tokens(bearer_tokens: Vec<(String, String)>) -> Arc<[(BearerToken
}
impl ApiError {
/// HTTP status this error maps to. Lets the MCP layer classify operational
/// failures (500 → JSON-RPC error) from tool-execution errors (4xx/409 →
/// `isError` content the model can self-correct on).
pub(crate) fn status_code(&self) -> StatusCode {
self.status
}
/// The human-readable error message, for building MCP tool-error content.
pub(crate) fn message_str(&self) -> &str {
&self.message
}
pub fn unauthorized(message: impl Into<String>) -> Self {
Self {
status: StatusCode::UNAUTHORIZED,
@ -1143,7 +1155,7 @@ pub fn build_app(state: AppState) -> Router {
// shares this group's bearer-auth + graph-handle middleware. Single
// mode serves it flat at `/mcp`; multi mode nests it to
// `/graphs/{graph_id}/mcp` (per-graph isolation for free).
.route_service("/mcp", mcp::mcp_service(state.clone()))
.merge(mcp::mcp_router(state.clone()))
.route_layer(middleware::from_fn_with_state(
state.clone(),
resolve_graph_handle,

View file

@ -0,0 +1,523 @@
//! Built-in MCP tools — RFC-003 §5.2.
//!
//! Each tool reuses the EXACT `do_*` / `run_query` / `run_mutate` path (and the
//! exact Cedar action) its HTTP route uses, so there is no new business logic
//! and no second authorization surface. `list_tools` Cedar-filters the set;
//! `call_tool` enforces the same gate (deny ≡ unknown tool) then dispatches.
use std::sync::Arc;
use rmcp::ErrorData as McpError;
use rmcp::RoleServer;
use rmcp::model::{CallToolResult, Content, JsonObject, Tool, ToolAnnotations};
use rmcp::service::RequestContext;
use serde::Serialize;
use serde_json::{Value, json};
use crate::{
AppState, Authz, GraphHandle, PolicyAction, PolicyRequest, ResolvedActor, api, authorize,
do_branch_create, do_branch_delete, do_branch_merge, do_branches_list, do_commit_show,
do_commits_list, do_graphs_list, do_ingest, do_schema_apply, do_schema_get, do_snapshot,
run_mutate, run_query,
};
use omnigraph::loader::LoadMode;
/// Per-request context for a tool call. `state` comes from the shared handler;
/// `handle` (per-graph) and `actor` are read back out of the request extensions
/// that the `resolve_graph_handle` / `require_bearer_auth` middleware attached
/// before rmcp ran (RFC-003 §5.8 — the handler consumes a resolved actor and
/// branches on nothing about how it was verified).
pub(crate) struct ToolCx {
pub state: AppState,
pub handle: Option<Arc<GraphHandle>>,
pub actor: Option<ResolvedActor>,
}
impl ToolCx {
fn graph(&self) -> Result<&Arc<GraphHandle>, McpError> {
self.handle.as_ref().ok_or_else(|| {
McpError::internal_error("graph handle missing from MCP request context", None)
})
}
fn actor_ref(&self) -> Option<&ResolvedActor> {
self.actor.as_ref()
}
}
/// Pull the per-request actor + graph handle out of the `http::request::Parts`
/// rmcp threads into `RequestContext.extensions`.
pub(crate) fn resolve_cx(
state: &AppState,
ctx: &RequestContext<RoleServer>,
) -> Result<ToolCx, McpError> {
let parts = ctx
.extensions
.get::<axum::http::request::Parts>()
.ok_or_else(|| {
McpError::internal_error("request parts missing from MCP request context", None)
})?;
Ok(ToolCx {
state: state.clone(),
handle: parts.extensions.get::<Arc<GraphHandle>>().cloned(),
actor: parts.extensions.get::<ResolvedActor>().cloned(),
})
}
/// Which Cedar entity an action's gate binds to.
enum Gate {
/// Per-graph action gated against `handle.policy`.
Graph(PolicyAction),
/// Server-scoped action gated against `state.server_policy`.
Server(PolicyAction),
/// No Cedar gate (liveness/version).
None,
}
/// The built-in tool set. One variant per RFC-003 §5.2 tool; the dynamic
/// stored-query tools are a separate populator (Phase 4).
#[derive(Clone, Copy)]
pub(crate) enum Builtin {
Health,
Snapshot,
SchemaGet,
BranchesList,
CommitsList,
CommitsGet,
GraphsList,
Query,
Mutate,
Ingest,
BranchesCreate,
BranchesDelete,
BranchesMerge,
SchemaApply,
}
impl Builtin {
pub(crate) fn all() -> &'static [Builtin] {
use Builtin::*;
&[
Health,
Snapshot,
SchemaGet,
BranchesList,
CommitsList,
CommitsGet,
GraphsList,
Query,
Mutate,
Ingest,
BranchesCreate,
BranchesDelete,
BranchesMerge,
SchemaApply,
]
}
pub(crate) fn from_name(name: &str) -> Option<Builtin> {
Builtin::all().iter().copied().find(|b| b.name() == name)
}
pub(crate) fn name(self) -> &'static str {
match self {
Builtin::Health => "health",
Builtin::Snapshot => "snapshot",
Builtin::SchemaGet => "schema_get",
Builtin::BranchesList => "branches_list",
Builtin::CommitsList => "commits_list",
Builtin::CommitsGet => "commits_get",
Builtin::GraphsList => "graphs_list",
Builtin::Query => "query",
Builtin::Mutate => "mutate",
Builtin::Ingest => "ingest",
Builtin::BranchesCreate => "branches_create",
Builtin::BranchesDelete => "branches_delete",
Builtin::BranchesMerge => "branches_merge",
Builtin::SchemaApply => "schema_apply",
}
}
/// The MCP `Tool` descriptor (name, description, JSON-Schema input).
pub(crate) fn descriptor(self) -> Tool {
let (description, schema) = match self {
Builtin::Health => (
"Liveness + server version. No graph access.",
json!({"type": "object", "additionalProperties": false}),
),
Builtin::Snapshot => (
"Read a branch's current snapshot: manifest version plus per-table metadata. Read-only.",
json!({
"type": "object",
"properties": {"branch": {"type": "string", "description": "Branch to read (default `main`)."}},
"additionalProperties": false
}),
),
Builtin::SchemaGet => (
"Return the graph's `.pg` schema source. Read-only.",
json!({"type": "object", "additionalProperties": false}),
),
Builtin::BranchesList => (
"List all branch names (sorted). Read-only.",
json!({"type": "object", "additionalProperties": false}),
),
Builtin::CommitsList => (
"List commits, optionally filtered to one branch (most recent first). Read-only.",
json!({
"type": "object",
"properties": {"branch": {"type": "string", "description": "Filter to this branch; omit for all."}},
"additionalProperties": false
}),
),
Builtin::CommitsGet => (
"Get a single commit's metadata by id. Read-only.",
json!({
"type": "object",
"properties": {"commit_id": {"type": "string"}},
"required": ["commit_id"],
"additionalProperties": false
}),
),
Builtin::GraphsList => (
"List the graphs registered with this server (multi-graph mode only).",
json!({"type": "object", "additionalProperties": false}),
),
Builtin::Query => (
"Run an ad-hoc read-only GQ query. Mutations are rejected — use `mutate`.",
json!({
"type": "object",
"properties": {
"source": {"type": "string", "description": "GQ query source."},
"branch": {"type": "string", "description": "Branch to read (default `main`)."},
"snapshot": {"type": "string", "description": "Snapshot id to read (mutually exclusive with `branch`)."},
"params": {"type": "object", "description": "Named query parameters."}
},
"required": ["source"],
"additionalProperties": false
}),
),
Builtin::Mutate => (
"Run an ad-hoc GQ mutation (insert/update/delete) on a branch. Writes are branchable and policy-gated.",
json!({
"type": "object",
"properties": {
"source": {"type": "string", "description": "GQ mutation source."},
"branch": {"type": "string", "description": "Branch to write (default `main`)."},
"params": {"type": "object", "description": "Named query parameters."}
},
"required": ["source"],
"additionalProperties": false
}),
),
Builtin::Ingest => (
"Bulk-ingest NDJSON into a branch. `mode` controls existing rows: merge (upsert, default), append, or overwrite (replaces table contents). Creates the branch from `from` if absent.",
json!({
"type": "object",
"properties": {
"data": {"type": "string", "description": "NDJSON, one record per line."},
"mode": {"type": "string", "enum": ["merge", "append", "overwrite"], "description": "Default `merge`."},
"branch": {"type": "string", "description": "Branch to ingest into (default `main`)."},
"from": {"type": "string", "description": "Parent branch to fork from when `branch` does not exist (default `main`)."}
},
"required": ["data"],
"additionalProperties": false
}),
),
Builtin::BranchesCreate => (
"Create a branch, forking `name` off `from` (default `main`). Additive — shares parent data until written.",
json!({
"type": "object",
"properties": {
"name": {"type": "string", "description": "New branch name."},
"from": {"type": "string", "description": "Parent branch (default `main`)."}
},
"required": ["name"],
"additionalProperties": false
}),
),
Builtin::BranchesDelete => (
"Delete a branch pointer. Irreversible.",
json!({
"type": "object",
"properties": {"branch": {"type": "string", "description": "Branch to delete."}},
"required": ["branch"],
"additionalProperties": false
}),
),
Builtin::BranchesMerge => (
"Merge `source` into `target` (default `main`). Returns the outcome (already_up_to_date / fast_forward / merged) or a conflict; the target is unchanged on conflict.",
json!({
"type": "object",
"properties": {
"source": {"type": "string", "description": "Branch to merge from."},
"target": {"type": "string", "description": "Branch to merge into (default `main`)."}
},
"required": ["source"],
"additionalProperties": false
}),
),
Builtin::SchemaApply => (
"Apply a schema migration. Diffs `schema_source` against the current schema and applies the steps. Destructive: some steps drop data — set `allow_data_loss` to permit those.",
json!({
"type": "object",
"properties": {
"schema_source": {"type": "string", "description": "The target `.pg` schema source."},
"allow_data_loss": {"type": "boolean", "description": "Permit data-dropping steps (default false)."}
},
"required": ["schema_source"],
"additionalProperties": false
}),
),
};
Tool::new(self.name(), description, schema_object(schema)).with_annotations(self.annotations())
}
/// MCP behavior hints. rmcp defaults `destructive_hint` and `open_world_hint`
/// to `true`, so every field is set explicitly. The engine operates on its
/// own datasets, so `open_world` is always `false`.
fn annotations(self) -> ToolAnnotations {
let read = || ToolAnnotations::new().read_only(true).open_world(false);
let additive = || {
ToolAnnotations::new()
.read_only(false)
.destructive(false)
.open_world(false)
};
let destructive = || {
ToolAnnotations::new()
.read_only(false)
.destructive(true)
.open_world(false)
};
match self {
Builtin::Health
| Builtin::Snapshot
| Builtin::SchemaGet
| Builtin::BranchesList
| Builtin::CommitsList
| Builtin::CommitsGet
| Builtin::GraphsList
| Builtin::Query => read(),
Builtin::BranchesCreate => additive(),
Builtin::Mutate
| Builtin::Ingest
| Builtin::BranchesDelete
| Builtin::BranchesMerge
| Builtin::SchemaApply => destructive(),
}
}
/// The Cedar gate for list-time visibility. Uses `branch: None`; the actual
/// `do_*` / `run_*` call re-authorizes with the real branch, so for
/// branch-scoped policies the listed set is a best-effort approximation and
/// `call_tool` is the authoritative gate (RFC-003 §5.4 R7 caveat).
fn gate(self) -> Gate {
match self {
Builtin::Health => Gate::None,
Builtin::Snapshot
| Builtin::SchemaGet
| Builtin::BranchesList
| Builtin::CommitsList
| Builtin::CommitsGet
| Builtin::Query => Gate::Graph(PolicyAction::Read),
Builtin::Mutate | Builtin::Ingest => Gate::Graph(PolicyAction::Change),
Builtin::BranchesCreate => Gate::Graph(PolicyAction::BranchCreate),
Builtin::BranchesDelete => Gate::Graph(PolicyAction::BranchDelete),
Builtin::BranchesMerge => Gate::Graph(PolicyAction::BranchMerge),
Builtin::SchemaApply => Gate::Graph(PolicyAction::SchemaApply),
Builtin::GraphsList => Gate::Server(PolicyAction::GraphList),
}
}
async fn call(self, cx: &ToolCx, args: &JsonObject) -> Result<CallToolResult, McpError> {
match self {
Builtin::Health => ok_json(&json!({
"status": "ok",
"version": env!("CARGO_PKG_VERSION"),
})),
Builtin::Snapshot => {
let branch = opt_str(args, "branch").unwrap_or_else(|| "main".to_string());
to_tool(do_snapshot(cx.graph()?, cx.actor_ref(), branch).await)
}
Builtin::SchemaGet => to_tool(do_schema_get(cx.graph()?, cx.actor_ref()).await),
Builtin::BranchesList => to_tool(do_branches_list(cx.graph()?, cx.actor_ref()).await),
Builtin::CommitsList => to_tool(
do_commits_list(cx.graph()?, cx.actor_ref(), opt_str(args, "branch")).await,
),
Builtin::CommitsGet => {
let commit_id = req_str(args, "commit_id")?;
to_tool(do_commit_show(cx.graph()?, cx.actor_ref(), &commit_id).await)
}
Builtin::GraphsList => to_tool(do_graphs_list(&cx.state, cx.actor_ref()).await),
Builtin::Query => {
let source = req_str(args, "source")?;
let result = run_query(
Arc::clone(cx.graph()?),
cx.actor_ref(),
&source,
None,
args.get("params"),
opt_str(args, "branch"),
opt_str(args, "snapshot"),
true,
)
.await;
match result {
Ok((name, target, qr)) => ok_json(&api::read_output(name, &target, qr)),
Err(err) => Ok(api_error_to_tool(err)),
}
}
Builtin::Mutate => {
let source = req_str(args, "source")?;
let branch = opt_str(args, "branch").unwrap_or_else(|| "main".to_string());
to_tool(
run_mutate(
cx.state.clone(),
Arc::clone(cx.graph()?),
cx.actor_ref(),
&source,
None,
args.get("params"),
branch,
)
.await,
)
}
Builtin::Ingest => {
let data = req_str(args, "data")?;
let mode = match args.get("mode") {
Some(value) => serde_json::from_value::<LoadMode>(value.clone())
.map_err(|err| McpError::invalid_params(format!("invalid `mode`: {err}"), None))?,
None => LoadMode::Merge,
};
let branch = opt_str(args, "branch").unwrap_or_else(|| "main".to_string());
let from = opt_str(args, "from").unwrap_or_else(|| "main".to_string());
to_tool(
do_ingest(
&cx.state,
cx.graph()?,
cx.actor_ref(),
&data,
mode,
branch,
from,
)
.await,
)
}
Builtin::BranchesCreate => {
let name = req_str(args, "name")?;
let from = opt_str(args, "from").unwrap_or_else(|| "main".to_string());
to_tool(do_branch_create(&cx.state, cx.graph()?, cx.actor_ref(), name, from).await)
}
Builtin::BranchesDelete => {
let branch = req_str(args, "branch")?;
to_tool(do_branch_delete(&cx.state, cx.graph()?, cx.actor_ref(), branch).await)
}
Builtin::BranchesMerge => {
let source = req_str(args, "source")?;
let target = opt_str(args, "target").unwrap_or_else(|| "main".to_string());
to_tool(do_branch_merge(&cx.state, cx.graph()?, cx.actor_ref(), source, target).await)
}
Builtin::SchemaApply => {
let schema_source = req_str(args, "schema_source")?;
let allow_data_loss = args
.get("allow_data_loss")
.and_then(Value::as_bool)
.unwrap_or(false);
to_tool(
do_schema_apply(
&cx.state,
cx.graph()?,
cx.actor_ref(),
&schema_source,
allow_data_loss,
)
.await,
)
}
}
}
}
/// Is `tool` callable by this actor (argument-independent gate)? Used by both
/// `list_tools` (visibility) and `call_tool` (deny ≡ unknown masking). `Err` is
/// an operational failure (propagates as a JSON-RPC error); `Ok(false)` hides.
pub(crate) fn is_visible(tool: Builtin, cx: &ToolCx) -> Result<bool, McpError> {
let (policy, action) = match tool.gate() {
Gate::None => return Ok(true),
Gate::Graph(action) => (cx.handle.as_ref().and_then(|h| h.policy.as_deref()), action),
Gate::Server(action) => (cx.state.server_policy.as_deref(), action),
};
let request = PolicyRequest {
action,
branch: None,
target_branch: None,
};
match authorize(cx.actor_ref(), policy, request) {
Ok(Authz::Allowed) => Ok(true),
Ok(Authz::Denied(_)) => Ok(false),
Err(err) => Err(api_operational_error(err)),
}
}
/// Dispatch a `tools/call` to a built-in. The caller has already enforced the
/// visibility gate (deny ≡ unknown), so this only parses args and runs.
pub(crate) async fn dispatch(
tool: Builtin,
cx: &ToolCx,
args: &JsonObject,
) -> Result<CallToolResult, McpError> {
tool.call(cx, args).await
}
// ---- helpers ---------------------------------------------------------------
fn schema_object(value: Value) -> Arc<JsonObject> {
Arc::new(
value
.as_object()
.expect("tool input schema literal must be a JSON object")
.clone(),
)
}
fn opt_str(args: &JsonObject, key: &str) -> Option<String> {
args.get(key).and_then(Value::as_str).map(str::to_string)
}
fn req_str(args: &JsonObject, key: &str) -> Result<String, McpError> {
opt_str(args, key)
.ok_or_else(|| McpError::invalid_params(format!("missing required argument `{key}`"), None))
}
fn ok_json<T: Serialize>(value: &T) -> Result<CallToolResult, McpError> {
let text = serde_json::to_string_pretty(value)
.map_err(|err| McpError::internal_error(format!("serialize tool result: {err}"), None))?;
Ok(CallToolResult::success(vec![Content::text(text)]))
}
/// Map a `do_*` result into a tool result: `Ok` → JSON success; `Err` → either a
/// JSON-RPC error (operational 5xx) or an `isError` tool result (4xx/409, which
/// the model can self-correct on — the 2025-11-25 SEP-1303 split).
fn to_tool<T: Serialize>(result: Result<T, crate::ApiError>) -> Result<CallToolResult, McpError> {
match result {
Ok(value) => ok_json(&value),
Err(err) => Ok(api_error_to_tool(err)),
}
}
fn api_error_to_tool(err: crate::ApiError) -> CallToolResult {
CallToolResult::error(vec![Content::text(err.message_str().to_string())])
}
/// 5xx engine/policy failures are operational → surface as a JSON-RPC error so
/// the client (not the model) sees them; anything else that reaches here is
/// treated as internal too (callers route 4xx through `api_error_to_tool`).
fn api_operational_error(err: crate::ApiError) -> McpError {
use axum::http::StatusCode;
match err.status_code() {
StatusCode::UNAUTHORIZED => McpError::invalid_request(err.message_str().to_string(), None),
_ => McpError::internal_error(err.message_str().to_string(), None),
}
}

View file

@ -16,16 +16,25 @@
use std::sync::Arc;
use axum::Router;
use rmcp::{
ErrorData as McpError, RoleServer, ServerHandler,
model::{ListToolsResult, PaginatedRequestParams, ServerCapabilities, ServerInfo},
model::{
CallToolRequestParams, CallToolResult, ListToolsResult, PaginatedRequestParams,
ServerCapabilities, ServerInfo,
},
service::RequestContext,
transport::streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
},
};
use crate::AppState;
use tower_http::limit::RequestBodyLimitLayer;
use crate::{AppState, INGEST_REQUEST_BODY_LIMIT_BYTES};
use builtins::Builtin;
mod builtins;
/// Server-level guidance returned in the MCP `initialize` response.
const MCP_INSTRUCTIONS: &str = "OmniGraph is a versioned, branchable property graph. \
@ -37,10 +46,10 @@ are not permitted to call.";
/// streamable-HTTP service constructs one per request in stateless mode.
#[derive(Clone)]
pub(crate) struct OmnigraphMcpHandler {
// Wired in Phase 3 (tools) / Phase 5 (resources): the handler resolves the
// per-request actor + graph handle from the request extensions and routes
// tool calls through the shared `do_*` / `run_query` / `run_mutate` paths.
#[allow(dead_code)]
// The handler resolves the per-request actor + graph handle from the
// request extensions (`resolve_cx`) and routes tool calls through the shared
// `do_*` / `run_query` / `run_mutate` paths. `state` supplies workload
// admission, the server-level policy, and graph routing.
state: AppState,
}
@ -71,22 +80,58 @@ impl ServerHandler for OmnigraphMcpHandler {
async fn list_tools(
&self,
_request: Option<PaginatedRequestParams>,
_context: RequestContext<RoleServer>,
context: RequestContext<RoleServer>,
) -> Result<ListToolsResult, McpError> {
// Phase 3 populates this with the Cedar-filtered built-in tools; Phase 4
// adds the dynamic stored-query tools.
Ok(ListToolsResult::default())
let cx = builtins::resolve_cx(&self.state, &context)?;
let mut tools = Vec::new();
for &tool in Builtin::all() {
// Emit only tools the actor's Cedar policy permits. An operational
// failure (policy-engine error) propagates; a denial just hides.
if builtins::is_visible(tool, &cx)? {
tools.push(tool.descriptor());
}
}
// Phase 4 appends the dynamic stored-query tools here.
let mut result = ListToolsResult::default();
result.tools = tools;
Ok(result)
}
async fn call_tool(
&self,
request: CallToolRequestParams,
context: RequestContext<RoleServer>,
) -> Result<CallToolResult, McpError> {
let cx = builtins::resolve_cx(&self.state, &context)?;
let Some(tool) = Builtin::from_name(&request.name) else {
// Unknown tool → JSON-RPC error (a dispatch failure, not a
// tool-execution error).
return Err(McpError::invalid_params(
format!("unknown tool: {}", request.name),
None,
));
};
// Enforce the visibility gate at call-time too, and mask a denial as
// "unknown tool" so the catalog isn't probeable without the grant (the
// same deny ≡ missing principle as `POST /queries/{name}`). The inner
// `do_*` / `run_*` re-authorizes against the real branch.
if !builtins::is_visible(tool, &cx)? {
return Err(McpError::invalid_params(
format!("unknown tool: {}", request.name),
None,
));
}
let args = request.arguments.unwrap_or_default();
builtins::dispatch(tool, &cx, &args).await
}
}
/// Build the stateless Streamable-HTTP MCP service mounted at `/mcp`.
/// Build the `/mcp` route: a stateless Streamable-HTTP MCP service, body-capped.
///
/// Mounted inside the `per_graph_protected` route group so the bearer-auth and
/// Merged into the `per_graph_protected` route group so the bearer-auth and
/// graph-handle middleware run first; in multi-graph mode the same service is
/// reached at `/graphs/{graph_id}/mcp`.
pub(crate) fn mcp_service(
state: AppState,
) -> StreamableHttpService<OmnigraphMcpHandler, LocalSessionManager> {
pub(crate) fn mcp_router(state: AppState) -> Router<AppState> {
let handler = OmnigraphMcpHandler::new(state);
// `StreamableHttpServerConfig` is `#[non_exhaustive]`: start from `Default`,
// then flip to stateless JSON. Keep rmcp's loopback `allowed_hosts` default
@ -96,9 +141,16 @@ pub(crate) fn mcp_service(
let config = StreamableHttpServerConfig::default()
.with_stateful_mode(false)
.with_json_response(true);
StreamableHttpService::new(
let service = StreamableHttpService::new(
move || Ok(handler.clone()),
Arc::new(LocalSessionManager::default()),
config,
)
);
// rmcp reads the request body directly (it doesn't go through axum's
// `Bytes`/`Json` extractor), so the router's `DefaultBodyLimit` does NOT
// bound `/mcp`. Cap it explicitly at the ingest limit (the largest tool
// payload) so an MCP `ingest`/`schema_apply` call can't stream unbounded.
Router::new()
.route_service("/mcp", service)
.layer(RequestBodyLimitLayer::new(INGEST_REQUEST_BODY_LIMIT_BYTES))
}

View file

@ -944,6 +944,287 @@ async fn mcp_get_returns_405_not_404() {
assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
}
#[tokio::test]
async fn mcp_tools_list_includes_builtins() {
let (_temp, app) = app_for_loaded_graph().await;
let (status, body) = json_response(
&app,
mcp_post(json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/list" })),
)
.await;
assert_eq!(status, StatusCode::OK, "{body}");
let tools = body["result"]["tools"].as_array().expect("tools array");
let names: Vec<&str> = tools.iter().filter_map(|t| t["name"].as_str()).collect();
for expected in [
"health",
"snapshot",
"schema_get",
"branches_list",
"commits_list",
"query",
"mutate",
] {
assert!(
names.contains(&expected),
"tools/list missing `{expected}`: {names:?}"
);
}
}
#[tokio::test]
async fn mcp_tools_call_snapshot_reads_through_extension_passthrough() {
let (_temp, app) = app_for_loaded_graph().await;
let (status, body) = json_response(
&app,
mcp_post(json!({
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": { "name": "snapshot", "arguments": {} }
})),
)
.await;
assert_eq!(status, StatusCode::OK, "{body}");
// The handler resolved the GraphHandle from the request extensions and ran
// do_snapshot — this is the RFC-003 §5.8 actor/handle passthrough proof.
assert_ne!(
body["result"]["isError"],
json!(true),
"snapshot tool errored: {body}"
);
let text = body["result"]["content"][0]["text"]
.as_str()
.expect("text content block");
let snapshot: Value = serde_json::from_str(text).expect("snapshot json payload");
assert_eq!(snapshot["branch"], "main");
assert!(
snapshot["manifest_version"].is_number(),
"snapshot carries a manifest_version: {snapshot}"
);
}
#[tokio::test]
async fn mcp_tools_call_unknown_tool_is_jsonrpc_error() {
let (_temp, app) = app_for_loaded_graph().await;
let (status, body) = json_response(
&app,
mcp_post(json!({
"jsonrpc": "2.0",
"id": 4,
"method": "tools/call",
"params": { "name": "does_not_exist", "arguments": {} }
})),
)
.await;
assert_eq!(status, StatusCode::OK, "JSON-RPC errors ride a 200");
assert!(
body["error"].is_object(),
"unknown tool is a JSON-RPC error, not a result: {body}"
);
}
/// A read-only actor (`act-reader`) and an admin (`act-admin`), so `tools/list`
/// filtering by Cedar action is observable.
const MCP_FILTER_POLICY_YAML: &str = r#"
version: 1
groups:
readers: ["act-reader"]
admins: ["act-admin"]
protected_branches: [main]
rules:
- id: read-only
allow:
actors: { group: readers }
actions: [read]
branch_scope: any
- id: admin-data
allow:
actors: { group: admins }
actions: [read, change, export]
branch_scope: any
- id: admin-targets
allow:
actors: { group: admins }
actions: [schema_apply, branch_create, branch_delete, branch_merge]
target_branch_scope: any
"#;
fn mcp_post_auth(body: Value, token: &str) -> Request<Body> {
let mut request = mcp_post(body);
request
.headers_mut()
.insert(AUTHORIZATION, format!("Bearer {token}").parse().unwrap());
request
}
async fn mcp_tool_names(app: &Router, token: &str) -> Vec<String> {
let (status, body) = json_response(
app,
mcp_post_auth(
json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list" }),
token,
),
)
.await;
assert_eq!(status, StatusCode::OK, "{body}");
body["result"]["tools"]
.as_array()
.expect("tools array")
.iter()
.filter_map(|t| t["name"].as_str().map(String::from))
.collect()
}
#[tokio::test]
async fn mcp_tools_list_cedar_filters_by_policy() {
let (_temp, app) = app_with_stored_queries(
&[],
&[("act-reader", "reader-tok"), ("act-admin", "admin-tok")],
MCP_FILTER_POLICY_YAML,
)
.await;
let reader = mcp_tool_names(&app, "reader-tok").await;
assert!(reader.contains(&"snapshot".to_string()), "{reader:?}");
assert!(reader.contains(&"query".to_string()), "{reader:?}");
for hidden in ["mutate", "ingest", "branches_create", "branches_delete", "schema_apply"] {
assert!(
!reader.contains(&hidden.to_string()),
"read-only actor must NOT see `{hidden}`: {reader:?}"
);
}
let admin = mcp_tool_names(&app, "admin-tok").await;
for visible in ["mutate", "ingest", "branches_create", "schema_apply"] {
assert!(
admin.contains(&visible.to_string()),
"admin must see `{visible}`: {admin:?}"
);
}
}
#[tokio::test]
async fn mcp_tools_call_mutate_writes() {
let (_temp, app) = app_for_loaded_graph_with_auth("admin-token").await;
let (status, body) = json_response(
&app,
mcp_post_auth(
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "mutate",
"arguments": {
"source": "query ins($name: String, $age: I32) { insert Person { name: $name, age: $age } }",
"params": { "name": "Zelda", "age": 40 }
}
}
}),
"admin-token",
),
)
.await;
assert_eq!(status, StatusCode::OK, "{body}");
assert_ne!(body["result"]["isError"], json!(true), "mutate failed: {body}");
let text = body["result"]["content"][0]["text"]
.as_str()
.expect("change envelope text");
let change: Value = serde_json::from_str(text).expect("change json");
assert!(
change["affected_nodes"].as_u64().unwrap_or(0) >= 1,
"mutate wrote a node through MCP: {change}"
);
}
#[tokio::test]
async fn mcp_tools_call_denied_is_masked_as_unknown() {
let (_temp, app) =
app_with_stored_queries(&[], &[("act-reader", "reader-tok")], MCP_FILTER_POLICY_YAML).await;
let (status, denied) = json_response(
&app,
mcp_post_auth(
json!({
"jsonrpc": "2.0", "id": 1, "method": "tools/call",
"params": { "name": "mutate", "arguments": { "source": "x" } }
}),
"reader-tok",
),
)
.await;
let (_s, unknown) = json_response(
&app,
mcp_post_auth(
json!({
"jsonrpc": "2.0", "id": 1, "method": "tools/call",
"params": { "name": "does_not_exist", "arguments": {} }
}),
"reader-tok",
),
)
.await;
assert_eq!(status, StatusCode::OK);
// Deny ≡ unknown: a denied existing tool is reported with the SAME error
// code and the SAME `unknown tool: <name>` template as a truly-unknown tool
// (the echoed name is the one the caller already supplied), so an
// unauthorized caller cannot tell "denied" from "does not exist".
assert_eq!(denied["error"]["code"], json!(-32602));
assert_eq!(denied["error"]["code"], unknown["error"]["code"]);
assert_eq!(denied["error"]["message"], json!("unknown tool: mutate"));
assert_eq!(
unknown["error"]["message"],
json!("unknown tool: does_not_exist")
);
}
#[tokio::test]
async fn mcp_tools_call_business_error_is_iserror() {
let (_temp, app) = app_for_loaded_graph().await;
let (status, body) = json_response(
&app,
mcp_post(json!({
"jsonrpc": "2.0", "id": 1, "method": "tools/call",
"params": { "name": "query", "arguments": { "source": "this is not valid gq" } }
})),
)
.await;
assert_eq!(status, StatusCode::OK);
// A bad query is a tool-execution error the model can self-correct on, NOT a
// JSON-RPC protocol error (the 2025-11-25 SEP-1303 split).
assert_eq!(
body["result"]["isError"],
json!(true),
"malformed GQ → isError result: {body}"
);
assert!(body["error"].is_null(), "not a JSON-RPC error: {body}");
}
#[tokio::test]
async fn mcp_tool_annotations_match_read_write() {
let (_temp, app) = app_for_loaded_graph().await;
let (_s, body) = json_response(
&app,
mcp_post(json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list" })),
)
.await;
let tools = body["result"]["tools"].as_array().expect("tools array");
let find = |name: &str| {
tools
.iter()
.find(|t| t["name"] == json!(name))
.unwrap_or_else(|| panic!("tool `{name}` listed"))
.clone()
};
assert_eq!(find("snapshot")["annotations"]["readOnlyHint"], json!(true));
let schema_apply = find("schema_apply");
assert_eq!(schema_apply["annotations"]["readOnlyHint"], json!(false));
assert_eq!(
schema_apply["annotations"]["destructiveHint"],
json!(true),
"schema_apply is destructive: {schema_apply}"
);
}
#[tokio::test]
async fn schema_apply_route_updates_graph_for_authorized_admin() {
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(