feat(mcp): MCP server surface — Streamable-HTTP transport + tool/resource projection (RFC-003)

Add the `omnigraph-mcp` crate (stateless Streamable-HTTP transport, `McpBackend`
seam, fail-closed Host/Origin policy) and the server backend projecting built-in
operations and the per-graph stored-query registry as MCP tools + resources over
`POST /graphs/{id}/mcp`. Every tool delegates to the same engine/handler
functions the REST routes use and is gated by the same Cedar `authorize` path;
reads/writes carry structured output.

Includes three correctness fixes from review + live testing:

- tools/list is a faithful relaxation of the per-call gate: a built-in whose
  authorization depends on a caller-chosen branch is shown iff the actor could
  invoke it on some branch, via PolicyEngine::permits_on_any_branch (capability
  probe through the same Cedar authorizer). A fabricated-`main` probe wrongly
  hid graph_mutate under the canonical "protect main, write unprotected" policy.
- The stored-query surface honors mode + `expose` on call as well as on list:
  resolve_stored_tool is the single membership test, so the meta pair
  (stored_query_list/stored_query_run) is callable only in `meta` mode and
  stored_query_run resolves exposed-only. An `expose:false` query is unreachable
  by name on the agent surface (it stays HTTP/service-callable).
- The loopback Host allow-list is the full set [127.0.0.1, ::1, localhost]
  (matches rmcp's default), so an IPv6 loopback `Host: [::1]` is accepted
  regardless of which stack the server bound.

The protocol-version contract is documented (initialize negotiates the version
in its body, so the MCP-Protocol-Version header is validated on non-init
requests only) and pinned by a test.

Tests: omnigraph-mcp/tests/standalone.rs, omnigraph-server/tests/mcp.rs,
omnigraph-policy permits_on_any_branch unit test, omnigraph-api-types schema
projection. Full workspace gate green.
This commit is contained in:
Ragnor Comerford 2026-06-17 14:00:52 +02:00
parent c43b81d318
commit bcd0d9c867
No known key found for this signature in database
20 changed files with 2968 additions and 43 deletions

View file

@ -0,0 +1,73 @@
//! MCP (Model Context Protocol) server surface for Omnigraph, served over
//! **stateless Streamable HTTP**.
//!
//! This crate owns the `rmcp` dependency and the transport wiring. It defines a
//! single seam — the [`McpBackend`] trait — that the server crate implements.
//! The crate **never names an omnigraph type**: the backend reads its own types
//! (resolved actor, graph handle, …) out of `parts.extensions`, so the
//! dependency edge is `server → mcp` (never the reverse — a `mcp → server` edge
//! would cycle the binary at `server-bin → omnigraph-mcp → server-lib`).
//!
//! The transport is **stateless JSON over a single `/mcp` POST**: no SSE stream,
//! no `Mcp-Session-Id`, every request independent. See [`transport`].
use async_trait::async_trait;
mod service;
pub mod transport;
pub use transport::{McpHostPolicy, OriginPolicy, mcp_router};
// rmcp model types re-exported so the server speaks rmcp via `omnigraph_mcp::…`
// and carries no direct rmcp dependency.
pub use rmcp::ErrorData as McpError; // JSON-RPC error: invalid_params=-32602, internal_error=-32603
pub use rmcp::model::{
CallToolResult, Content, Extensions, Implementation, RawResource, ReadResourceResult, Resource,
ResourceContents, ServerCapabilities, ServerInfo, Tool, ToolAnnotations,
};
/// A JSON object — the shape of tool arguments and JSON Schema documents.
/// Identical to `rmcp::model::JsonObject` (`serde_json::Map<String, Value>`).
pub type JsonObject = serde_json::Map<String, serde_json::Value>;
/// The seam the server fills. One implementor (`OmnigraphMcpBackend`); the boxed
/// future from `#[async_trait]` is negligible at MCP QPS.
///
/// **The list seam is non-paginated by contract.** `list_tools`/`list_resources`
/// return the *full* set, so the service always emits `nextCursor: null`. The
/// catalog is bounded by construction (a fixed set of built-ins; large
/// stored-query catalogs collapse to a discovery + execute meta-tool pair rather
/// than leaning on `tools/list` paging). The `Vec<T>` return type *is* that
/// contract; a future paging need is a signature change, not a doc promise.
///
/// Each method receives the request's [`http::request::Parts`]; the backend reads
/// its own injected extensions (`parts.extensions.get::<T>()`) — the decoupling
/// mechanism that keeps this crate free of omnigraph types and auth-method
/// agnostic.
#[async_trait]
pub trait McpBackend: Clone + Send + Sync + 'static {
/// Server identity + advertised capabilities (`initialize` response).
fn server_info(&self) -> ServerInfo;
/// The full, Cedar-filtered tool set for this request's actor + graph.
async fn list_tools(&self, parts: &http::request::Parts) -> Result<Vec<Tool>, McpError>;
/// Dispatch a tool call. The authoritative authorization gate.
async fn call_tool(
&self,
parts: &http::request::Parts,
name: &str,
args: JsonObject,
) -> Result<CallToolResult, McpError>;
/// The full, Cedar-filtered resource set for this request's actor + graph.
async fn list_resources(&self, parts: &http::request::Parts)
-> Result<Vec<Resource>, McpError>;
/// Read one resource by URI.
async fn read_resource(
&self,
parts: &http::request::Parts,
uri: &str,
) -> Result<ReadResourceResult, McpError>;
}

View file

@ -0,0 +1,80 @@
//! `McpService<B>` — the rmcp `ServerHandler` adapter. Pulls the request's
//! `http::request::Parts` out of the context once and delegates each method to
//! the [`McpBackend`]. Maps the backend's non-paginated `Vec<T>` returns to
//! rmcp's `List*Result` with `next_cursor: None`.
use rmcp::ServerHandler;
use rmcp::ErrorData as McpError;
use rmcp::model::{
CallToolRequestParams, CallToolResult, ListResourcesResult, ListToolsResult,
PaginatedRequestParams, ReadResourceRequestParams, ReadResourceResult, ServerInfo,
};
use rmcp::service::{RequestContext, RoleServer};
use crate::McpBackend;
#[derive(Clone)]
pub(crate) struct McpService<B: McpBackend> {
backend: B,
}
impl<B: McpBackend> McpService<B> {
pub(crate) fn new(backend: B) -> Self {
Self { backend }
}
/// The HTTP `Parts` injected by `StreamableHttpService` into the request
/// context extensions (`tower.rs` does `request.into_parts()` then
/// `req.request.extensions_mut().insert(part)`). Absent only on an internal
/// wiring error, not a client-reachable path.
fn parts(ctx: &RequestContext<RoleServer>) -> Result<&http::request::Parts, McpError> {
ctx.extensions
.get::<http::request::Parts>()
.ok_or_else(|| McpError::internal_error("request parts missing from MCP context", None))
}
}
impl<B: McpBackend> ServerHandler for McpService<B> {
fn get_info(&self) -> ServerInfo {
self.backend.server_info()
}
async fn list_tools(
&self,
_request: Option<PaginatedRequestParams>,
context: RequestContext<RoleServer>,
) -> Result<ListToolsResult, McpError> {
let parts = Self::parts(&context)?;
let tools = self.backend.list_tools(parts).await?;
Ok(ListToolsResult::with_all_items(tools))
}
async fn call_tool(
&self,
request: CallToolRequestParams,
context: RequestContext<RoleServer>,
) -> Result<CallToolResult, McpError> {
let parts = Self::parts(&context)?;
let args = request.arguments.unwrap_or_default();
self.backend.call_tool(parts, &request.name, args).await
}
async fn list_resources(
&self,
_request: Option<PaginatedRequestParams>,
context: RequestContext<RoleServer>,
) -> Result<ListResourcesResult, McpError> {
let parts = Self::parts(&context)?;
let resources = self.backend.list_resources(parts).await?;
Ok(ListResourcesResult::with_all_items(resources))
}
async fn read_resource(
&self,
request: ReadResourceRequestParams,
context: RequestContext<RoleServer>,
) -> Result<ReadResourceResult, McpError> {
let parts = Self::parts(&context)?;
self.backend.read_resource(parts, &request.uri).await
}
}

View file

@ -0,0 +1,158 @@
//! Stateless Streamable-HTTP transport for the MCP surface.
//!
//! One endpoint: `POST /mcp` returns a single `application/json` object; no SSE,
//! no session id (`NeverSessionManager` + `stateful_mode = false`). rmcp gives,
//! for free in stateless mode: `GET`/`DELETE → 405 + Allow: POST`, a disallowed
//! `Host → 403`, and an unsupported `MCP-Protocol-Version → 400` on
//! **non-`initialize`** requests. `initialize` is exempt by design — it
//! negotiates the version in its JSON-RPC body (`protocolVersion`), not the HTTP
//! header, so a bogus header there is ignored (absent ⇒ rmcp's default version).
//!
//! The one thing rmcp does **not** give is fail-closed Origin: it validates
//! `Origin` only when `allowed_origins` is non-empty (an empty list is
//! *fail-open*). [`origin_guard`] closes that — a present, disallowed `Origin`
//! is `403` regardless — and [`McpHostPolicy`] has no "absent ⇒ skip" state, so a
//! remote deployment cannot accidentally run fail-open.
use std::net::SocketAddr;
use std::sync::Arc;
use axum::extract::{Request, State};
use axum::middleware::{Next, from_fn_with_state};
use axum::response::{IntoResponse, Response};
use rmcp::transport::streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService, session::never::NeverSessionManager,
};
use crate::McpBackend;
use crate::service::McpService;
/// Browser-`Origin` posture as a **total** choice — there is no `None ⇒ skip`
/// state to leak into a fail-open default. Every deployment lands in exactly one
/// arm, chosen once by [`McpHostPolicy::from_bind`].
#[derive(Debug, Clone)]
pub enum OriginPolicy {
/// Browser clients from these origins; any OTHER present `Origin` → `403`.
Allow(Vec<String>),
/// No browser clients expected; ANY present `Origin` → `403`. Non-browser
/// MCP clients (the launch tier) send no `Origin` and pass. The remote
/// default.
DenyBrowsers,
/// Explicit opt-out (loopback dev / trusted network) — never the remote
/// default.
Unchecked,
}
/// Host + Origin posture, derived together from the deployment. The struct has
/// no skip-by-absence state, and [`from_bind`](Self::from_bind) is the only
/// constructor, so a fail-open policy is unrepresentable.
#[derive(Debug, Clone)]
pub struct McpHostPolicy {
/// `None` ⇒ accept any `Host` (DNS-rebinding defense relaxed for a known
/// public bind; bearer is the real control there).
pub allowed_hosts: Option<Vec<String>>,
/// Total — no `Option`.
pub origin: OriginPolicy,
}
impl McpHostPolicy {
/// The only constructor. Host and Origin posture are derived together from
/// the bind + config, **fail-closed**: a remote bind with no configured
/// origins is `DenyBrowsers` (a present `Origin` is rejected), NOT "skip".
pub fn from_bind(bind: &SocketAddr, public_hosts: &[String], browser_origins: &[String]) -> Self {
let loopback = bind.ip().is_loopback();
Self {
allowed_hosts: if loopback {
// A loopback bind accepts every loopback Host form, not just the
// stack it bound: the Host header is independent of the socket
// (in-process tests, reverse proxies, dual-stack `localhost`
// resolution), so a `127.0.0.1`-bound server must still accept a
// `[::1]` Host and vice-versa. This mirrors rmcp's own default
// loopback set; deriving the list from `bind.ip()` alone dropped
// the sibling-stack literal and 403'd legitimate loopback clients.
Some(vec!["127.0.0.1".into(), "::1".into(), "localhost".into()])
} else if public_hosts.is_empty() {
None
} else {
Some(public_hosts.to_vec())
},
origin: if !browser_origins.is_empty() {
OriginPolicy::Allow(browser_origins.to_vec())
} else if loopback {
OriginPolicy::Unchecked
} else {
OriginPolicy::DenyBrowsers
},
}
}
}
/// Fail-closed Origin enforcement, run BEFORE rmcp so it is independent of
/// rmcp's empty-`allowed_origins` fail-open semantics. A *present* `Origin` that
/// the policy disallows → `403`; an *absent* `Origin` always passes (non-browser
/// MCP clients send none); `Unchecked` is a no-op.
async fn origin_guard(State(origin): State<OriginPolicy>, request: Request, next: Next) -> Response {
let header = request
.headers()
.get(http::header::ORIGIN)
.and_then(|v| v.to_str().ok());
let allowed = match header {
None => true,
Some(o) => match &origin {
OriginPolicy::Unchecked => true,
OriginPolicy::Allow(list) => list.iter().any(|a| a == o),
OriginPolicy::DenyBrowsers => false,
},
};
if allowed {
next.run(request).await
} else {
(http::StatusCode::FORBIDDEN, "Forbidden: Origin not allowed").into_response()
}
}
/// Build the `/mcp` router for a backend. The returned router carries its own
/// Origin guard and body-limit layer; merge (not `.route`) it into the
/// per-graph group so the body limit does not leak onto sibling routes.
///
/// Generic over the router state `S`: the `/mcp` route is a `route_service`
/// with no state-bearing extractors, so it composes with any caller's state
/// type (e.g. the server merges it into a `Router<AppState>` before
/// `.with_state`). A standalone caller pins `S = ()` via the return-type
/// annotation.
pub fn mcp_router<B, S>(backend: B, body_limit: usize, hosts: McpHostPolicy) -> axum::Router<S>
where
B: McpBackend,
S: Clone + Send + Sync + 'static,
{
// `StreamableHttpServerConfig` is `#[non_exhaustive]`; its Default is
// stateful_mode=true, json_response=false, allowed_hosts=loopback. Build
// from Default and flip via the with_* setters for a remote stateless JSON
// server.
let mut config = StreamableHttpServerConfig::default()
.with_stateful_mode(false)
.with_json_response(true);
config = match &hosts.allowed_hosts {
Some(list) => config.with_allowed_hosts(list.clone()),
None => config.disable_allowed_hosts(),
};
// `Allow` also configures rmcp as defense-in-depth; `DenyBrowsers` cannot be
// expressed to rmcp (empty list ⇒ rmcp skips), so `origin_guard` is the
// fail-closed authority.
if let OriginPolicy::Allow(origins) = &hosts.origin {
config = config.with_allowed_origins(origins.clone());
}
// service_factory returns Result<S, io::Error>; NeverSessionManager pairs
// with stateless mode (rejects every session op).
let svc = StreamableHttpService::new(
move || Ok(McpService::new(backend.clone())),
Arc::new(NeverSessionManager::default()),
config,
);
axum::Router::<S>::new()
.route_service("/mcp", svc)
.layer(from_fn_with_state(hosts.origin, origin_guard))
.layer(tower_http::limit::RequestBodyLimitLayer::new(body_limit))
}