feat(server): in-server MCP endpoint over Streamable HTTP (rmcp), RFC-003

Adds POST /mcp: a stateless, JSON-only MCP (Model Context Protocol) endpoint
served by rmcp 1.7's StreamableHttpService, mounted inside per_graph_protected
so it inherits the bearer-auth and graph-handle middleware (single mode serves
/mcp; multi mode nests it to /graphs/{graph_id}/mcp).

rmcp's stateless mode (stateful_mode=false, json_response=true) provides
GET/DELETE 405, loopback Host validation, and MCP-Protocol-Version checking
(400 on unsupported, default 2025-03-26) without extra code. The handler
advertises the tools capability and answers the initialize handshake; the
tool catalog, resources, and stored-query projection land in the following
phases. Auth stays decoupled (RFC-003 5.8): the handler consumes the
already-resolved ResolvedActor and branches on nothing about how the token
was verified.

Tests (tests/server.rs): initialize handshake, GET 405, and bearer required
(401 without a token, 200 with). The docs/user/server.md MCP section is
deferred to the docs phase per the RFC-003 rollout, since the endpoint
exposes an empty tool list until the tools phase.

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ragnor Comerford 2026-06-08 15:27:01 +02:00
parent 69fae71870
commit 9df9f394c0
No known key found for this signature in database
5 changed files with 321 additions and 5 deletions

122
Cargo.lock generated
View file

@ -25,7 +25,7 @@ checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
"cpufeatures 0.2.17",
]
[[package]]
@ -1085,7 +1085,7 @@ dependencies = [
"cc",
"cfg-if",
"constant_time_eq",
"cpufeatures",
"cpufeatures 0.2.17",
]
[[package]]
@ -1278,6 +1278,17 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chacha20"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601"
dependencies = [
"cfg-if",
"cpufeatures 0.3.0",
"rand_core 0.10.1",
]
[[package]]
name = "chrono"
version = "0.4.44"
@ -1487,6 +1498,15 @@ dependencies = [
"libc",
]
[[package]]
name = "cpufeatures"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201"
dependencies = [
"libc",
]
[[package]]
name = "crc32c"
version = "0.6.8"
@ -2750,6 +2770,7 @@ dependencies = [
"cfg-if",
"libc",
"r-efi 6.0.0",
"rand_core 0.10.1",
"wasip2",
"wasip3",
]
@ -3423,7 +3444,7 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653"
dependencies = [
"cpufeatures",
"cpufeatures 0.2.17",
]
[[package]]
@ -4657,6 +4678,7 @@ dependencies = [
"omnigraph-engine",
"omnigraph-policy",
"regex",
"rmcp",
"serde",
"serde_json",
"serde_yaml",
@ -4793,6 +4815,12 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "pastey"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ee67f1008b1ba2321834326597b8e186293b049a023cdef258527550b9935b4"
[[package]]
name = "path_abs"
version = "0.5.1"
@ -5309,6 +5337,17 @@ dependencies = [
"rand_core 0.9.5",
]
[[package]]
name = "rand"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207"
dependencies = [
"chacha20",
"getrandom 0.4.2",
"rand_core 0.10.1",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
@ -5347,6 +5386,12 @@ dependencies = [
"getrandom 0.3.4",
]
[[package]]
name = "rand_core"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69"
[[package]]
name = "rand_distr"
version = "0.5.1"
@ -5579,6 +5624,35 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rmcp"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0810a9f717d9828f475fe1f629f4c305c8464b7f496c3a854b58d29e65f4058e"
dependencies = [
"async-trait",
"bytes",
"chrono",
"futures",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"pastey",
"pin-project-lite",
"rand 0.10.1",
"schemars 1.2.1",
"serde",
"serde_json",
"sse-stream",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"tower-service",
"tracing",
"uuid",
]
[[package]]
name = "roaring"
version = "0.11.3"
@ -5816,12 +5890,26 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc"
dependencies = [
"chrono",
"dyn-clone",
"ref-cast",
"schemars_derive",
"serde",
"serde_json",
]
[[package]]
name = "schemars_derive"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d115b50f4aaeea07e79c1912f645c7513d81715d0420f8bc77a18c6260b307f"
dependencies = [
"proc-macro2",
"quote",
"serde_derive_internals",
"syn 2.0.117",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
@ -5926,6 +6014,17 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "serde_derive_internals"
version = "0.29.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.117",
]
[[package]]
name = "serde_json"
version = "1.0.149"
@ -6051,7 +6150,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"cpufeatures 0.2.17",
"digest",
]
@ -6062,7 +6161,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if",
"cpufeatures",
"cpufeatures 0.2.17",
"digest",
]
@ -6241,6 +6340,19 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "sse-stream"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3962b63f038885f15bce2c6e02c0e7925c072f1ac86bb60fd44c5c6b762fb72"
dependencies = [
"bytes",
"futures-util",
"http-body 1.0.1",
"http-body-util",
"pin-project-lite",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.1"

View file

@ -43,6 +43,7 @@ regex = { workspace = true }
thiserror = { workspace = true }
aws-config = { version = "1", optional = true, default-features = false, features = ["rustls", "rt-tokio", "credentials-process", "sso"] }
aws-sdk-secretsmanager = { version = "1", optional = true, default-features = false, features = ["rustls", "rt-tokio"] }
rmcp = { version = "1.7.0", default-features = false, features = ["server", "transport-streamable-http-server"] }
[dev-dependencies]
tempfile = { workspace = true }

View file

@ -3,6 +3,7 @@ pub mod auth;
pub mod config;
pub mod graph_id;
pub mod identity;
pub mod mcp;
pub mod policy;
pub mod queries;
pub mod registry;
@ -1138,6 +1139,11 @@ pub fn build_app(state: AppState) -> Router {
.route("/branches/merge", post(server_branch_merge))
.route("/commits", get(server_commit_list))
.route("/commits/{commit_id}", get(server_commit_show))
// MCP (RFC-003): a stateless Streamable-HTTP JSON-RPC endpoint that
// shares this group's bearer-auth + graph-handle middleware. Single
// mode serves it flat at `/mcp`; multi mode nests it to
// `/graphs/{graph_id}/mcp` (per-graph isolation for free).
.route_service("/mcp", mcp::mcp_service(state.clone()))
.route_layer(middleware::from_fn_with_state(
state.clone(),
resolve_graph_handle,

View file

@ -0,0 +1,104 @@
//! In-server MCP (Model Context Protocol) surface — RFC-003.
//!
//! Projects omnigraph-server operations as MCP tools and resources over
//! Streamable HTTP (rmcp), Cedar-gated through the same `authorize` path the
//! REST routes use. Stateless POST-only: rmcp's `stateful_mode = false` gives
//! `GET`/`DELETE` → 405 and `MCP-Protocol-Version` validation (400 on
//! unsupported, default `2025-03-26` when absent) for free. Host/Origin
//! DNS-rebinding checks use rmcp's loopback `allowed_hosts` default until a
//! server-config knob to widen them for non-loopback deploys lands with the
//! OAuth fast-follow.
//!
//! Auth is decoupled (RFC-003 §5.8): the `require_bearer_auth` /
//! `resolve_graph_handle` middleware run before the MCP service and attach
//! `ResolvedActor` + `Arc<GraphHandle>` to the request; the handler reads them
//! back from `RequestContext.extensions` → `http::request::Parts.extensions`.
use std::sync::Arc;
use rmcp::{
ErrorData as McpError, RoleServer, ServerHandler,
model::{ListToolsResult, PaginatedRequestParams, ServerCapabilities, ServerInfo},
service::RequestContext,
transport::streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
},
};
use crate::AppState;
/// Server-level guidance returned in the MCP `initialize` response.
const MCP_INSTRUCTIONS: &str = "OmniGraph is a versioned, branchable property graph. \
Reads run typed GQ queries; writes are branchable and policy-gated. The tools mirror the \
HTTP API and are authorized per-actor by Cedar policy a tool you cannot see is one you \
are not permitted to call.";
/// Shared MCP handler. Cheap to clone (holds the `Arc`-backed [`AppState`]); the
/// streamable-HTTP service constructs one per request in stateless mode.
#[derive(Clone)]
pub(crate) struct OmnigraphMcpHandler {
// Wired in Phase 3 (tools) / Phase 5 (resources): the handler resolves the
// per-request actor + graph handle from the request extensions and routes
// tool calls through the shared `do_*` / `run_query` / `run_mutate` paths.
#[allow(dead_code)]
state: AppState,
}
impl OmnigraphMcpHandler {
fn new(state: AppState) -> Self {
Self { state }
}
}
impl ServerHandler for OmnigraphMcpHandler {
fn get_info(&self) -> ServerInfo {
// `ServerInfo` (`InitializeResult`) is `#[non_exhaustive]`; build from
// `Default` and set the fields we own. We advertise `tools` and
// `resources` with neither `listChanged` nor `subscribe` — stateless,
// no server push.
let mut info = ServerInfo::default();
// Advertise only `tools` for now. The resources phase adds
// `list_resources`/`read_resource`; advertising a `resources`
// capability whose `resources/read` returns method-not-found would be a
// dishonest contract, so `.enable_resources()` lands with that phase.
info.capabilities = ServerCapabilities::builder().enable_tools().build();
info.server_info.name = "omnigraph-server".to_string();
info.server_info.version = env!("CARGO_PKG_VERSION").to_string();
info.instructions = Some(MCP_INSTRUCTIONS.to_string());
info
}
async fn list_tools(
&self,
_request: Option<PaginatedRequestParams>,
_context: RequestContext<RoleServer>,
) -> Result<ListToolsResult, McpError> {
// Phase 3 populates this with the Cedar-filtered built-in tools; Phase 4
// adds the dynamic stored-query tools.
Ok(ListToolsResult::default())
}
}
/// Build the stateless Streamable-HTTP MCP service mounted at `/mcp`.
///
/// Mounted inside the `per_graph_protected` route group so the bearer-auth and
/// graph-handle middleware run first; in multi-graph mode the same service is
/// reached at `/graphs/{graph_id}/mcp`.
pub(crate) fn mcp_service(
state: AppState,
) -> StreamableHttpService<OmnigraphMcpHandler, LocalSessionManager> {
let handler = OmnigraphMcpHandler::new(state);
// `StreamableHttpServerConfig` is `#[non_exhaustive]`: start from `Default`,
// then flip to stateless JSON. Keep rmcp's loopback `allowed_hosts` default
// (DNS-rebinding protection for local servers); a server-config knob to
// widen `allowed_hosts` / `allowed_origins` for non-loopback deployments
// lands with the OAuth fast-follow.
let config = StreamableHttpServerConfig::default()
.with_stateful_mode(false)
.with_json_response(true);
StreamableHttpService::new(
move || Ok(handler.clone()),
Arc::new(LocalSessionManager::default()),
config,
)
}

View file

@ -851,6 +851,99 @@ async fn json_response(app: &Router, request: Request<Body>) -> (StatusCode, Val
(status, value)
}
/// Build a stateless MCP JSON-RPC POST. rmcp's `handle_post` requires the
/// `Accept` header to list both JSON and SSE and the content type to be JSON;
/// a `Host` is needed so DNS-rebinding validation can run.
fn mcp_post(body: Value) -> Request<Body> {
Request::builder()
.method(Method::POST)
.uri("/mcp")
.header("host", "localhost")
.header("content-type", "application/json")
.header("accept", "application/json, text/event-stream")
.body(Body::from(serde_json::to_vec(&body).unwrap()))
.unwrap()
}
fn mcp_initialize_body() -> Value {
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-06-18",
"capabilities": {},
"clientInfo": { "name": "smoke", "version": "0.0.0" }
}
})
}
#[tokio::test]
async fn mcp_initialize_advertises_tools_capability() {
let (_temp, app) = app_for_loaded_graph().await;
let (status, body) = json_response(&app, mcp_post(mcp_initialize_body())).await;
assert_eq!(status, StatusCode::OK, "initialize should 200");
assert_eq!(body["jsonrpc"], "2.0");
assert_eq!(body["id"], 1);
assert!(
body["result"]["capabilities"]["tools"].is_object(),
"advertises the tools capability: {body}"
);
// Resources are NOT advertised until the resources phase implements
// `list_resources`/`read_resource`; advertising a capability whose
// `resources/read` 404s would be a dishonest contract.
assert!(
body["result"]["capabilities"]["resources"].is_null(),
"does not advertise resources until implemented: {body}"
);
assert_eq!(body["result"]["serverInfo"]["name"], "omnigraph-server");
}
#[tokio::test]
async fn mcp_requires_bearer_when_auth_enabled() {
// The §5.8 auth-decoupling invariant: /mcp sits behind the same
// `require_bearer_auth` middleware as the REST routes, so a missing bearer
// is rejected at the HTTP boundary (401) BEFORE rmcp runs, and a valid
// bearer reaches the handler (200). `route_layer` + `route_service` is the
// exact Axum interaction that could silently regress.
let (_temp, app) = app_for_loaded_graph_with_auth("mcp-token").await;
let no_bearer = app
.clone()
.oneshot(mcp_post(mcp_initialize_body()))
.await
.unwrap();
assert_eq!(
no_bearer.status(),
StatusCode::UNAUTHORIZED,
"no bearer must 401 at the middleware, not reach rmcp"
);
let mut with_bearer = mcp_post(mcp_initialize_body());
with_bearer
.headers_mut()
.insert(AUTHORIZATION, "Bearer mcp-token".parse().unwrap());
let (status, body) = json_response(&app, with_bearer).await;
assert_eq!(status, StatusCode::OK, "valid bearer reaches the handler");
assert_eq!(body["result"]["serverInfo"]["name"], "omnigraph-server");
}
#[tokio::test]
async fn mcp_get_returns_405_not_404() {
// The MCP endpoint must route GET (the spec requires both POST + GET) and
// return 405 when SSE is not offered — rmcp's stateless mode gives this
// for free. A 404 would mean the endpoint isn't reachable.
let (_temp, app) = app_for_loaded_graph().await;
let request = Request::builder()
.method(Method::GET)
.uri("/mcp")
.header("host", "localhost")
.body(Body::empty())
.unwrap();
let response = app.clone().oneshot(request).await.unwrap();
assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
}
#[tokio::test]
async fn schema_apply_route_updates_graph_for_authorized_admin() {
let (temp, app) = app_for_graph_with_auth_tokens_and_policy(