From 6bff3c3492f4a6e4ab8dbf0400d25cd6209c23d5 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Thu, 14 May 2026 18:50:08 +0200 Subject: [PATCH] feat(cli): host mcp over streamable http --- packages/cli/package.json | 1 + packages/cli/src/mcp-http-server.test.ts | 158 +++++++++++++++ packages/cli/src/mcp-http-server.ts | 242 ++++++++++++++++++++++- pnpm-lock.yaml | 3 + 4 files changed, 403 insertions(+), 1 deletion(-) diff --git a/packages/cli/package.json b/packages/cli/package.json index 65895f89..f7f0b214 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -45,6 +45,7 @@ "@ktx/connector-sqlserver": "workspace:*", "@ktx/context": "workspace:*", "@ktx/llm": "workspace:*", + "@modelcontextprotocol/sdk": "^1.29.0", "commander": "14.0.3", "ink": "^7.0.2", "react": "^19.2.6", diff --git a/packages/cli/src/mcp-http-server.test.ts b/packages/cli/src/mcp-http-server.test.ts index bc50494e..d34f0c0c 100644 --- a/packages/cli/src/mcp-http-server.test.ts +++ b/packages/cli/src/mcp-http-server.test.ts @@ -1,8 +1,12 @@ +import { request } from 'node:http'; +import type { AddressInfo } from 'node:net'; +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { describe, expect, it } from 'vitest'; import { buildMcpSecurityConfig, isMcpRequestAuthorized, normalizeHostHeader, + runKtxMcpHttpServer, } from './mcp-http-server.js'; describe('normalizeHostHeader', () => { @@ -114,3 +118,157 @@ describe('isMcpRequestAuthorized', () => { }); }); }); + +function postJson(port: number, path: string, body: unknown, headers: Record = {}) { + return new Promise<{ status: number; headers: Record; body: string }>( + (resolve, reject) => { + const payload = JSON.stringify(body); + const req = request( + { + host: '127.0.0.1', + port, + path, + method: 'POST', + headers: { + host: `127.0.0.1:${port}`, + accept: 'application/json, text/event-stream', + 'content-type': 'application/json', + 'content-length': Buffer.byteLength(payload), + ...headers, + }, + }, + (res) => { + const chunks: Buffer[] = []; + res.on('data', (chunk: Buffer) => chunks.push(chunk)); + res.on('end', () => + resolve({ + status: res.statusCode ?? 0, + headers: res.headers, + body: Buffer.concat(chunks).toString('utf8'), + }), + ); + }, + ); + req.on('error', reject); + req.end(payload); + }, + ); +} + +function get(port: number, path: string, headers: Record = {}) { + return new Promise<{ status: number; headers: Record; body: string }>( + (resolve, reject) => { + const req = request( + { + host: '127.0.0.1', + port, + path, + method: 'GET', + headers: { host: `127.0.0.1:${port}`, ...headers }, + }, + (res) => { + const chunks: Buffer[] = []; + res.on('data', (chunk: Buffer) => chunks.push(chunk)); + res.on('end', () => + resolve({ + status: res.statusCode ?? 0, + headers: res.headers, + body: Buffer.concat(chunks).toString('utf8'), + }), + ); + }, + ); + req.on('error', reject); + req.end(); + }, + ); +} + +function createTestMcpServer() { + return () => { + const server = new McpServer({ name: 'ktx-test', version: '0.0.0-test' }); + server.registerTool('ping', { inputSchema: {} }, async () => ({ + content: [{ type: 'text', text: 'pong' }], + })); + return server; + }; +} + +describe('runKtxMcpHttpServer', () => { + it('serves /health with project metadata', async () => { + const handle = await runKtxMcpHttpServer({ + projectDir: '/tmp/ktx-project', + host: '127.0.0.1', + port: 0, + allowedHosts: [], + allowedOrigins: [], + createMcpServer: createTestMcpServer(), + }); + try { + const port = (handle.server.address() as AddressInfo).port; + const response = await get(port, '/health'); + expect(response.status).toBe(200); + expect(JSON.parse(response.body)).toEqual({ + status: 'ok', + projectDir: '/tmp/ktx-project', + port, + }); + } finally { + await handle.close(); + } + }); + + it('allocates a stateful MCP session on initialize', async () => { + const handle = await runKtxMcpHttpServer({ + projectDir: '/tmp/ktx-project', + host: '127.0.0.1', + port: 0, + allowedHosts: [], + allowedOrigins: [], + createMcpServer: createTestMcpServer(), + }); + try { + const port = (handle.server.address() as AddressInfo).port; + const response = await postJson(port, '/mcp', { + jsonrpc: '2.0', + id: 1, + method: 'initialize', + params: { + protocolVersion: '2025-06-18', + capabilities: {}, + clientInfo: { name: 'vitest', version: '0.0.0' }, + }, + }); + + expect(response.status).toBe(200); + expect(response.headers['mcp-session-id']).toBeTruthy(); + } finally { + await handle.close(); + } + }); + + it('rejects unknown session ids with 404', async () => { + const handle = await runKtxMcpHttpServer({ + projectDir: '/tmp/ktx-project', + host: '127.0.0.1', + port: 0, + allowedHosts: [], + allowedOrigins: [], + createMcpServer: createTestMcpServer(), + }); + try { + const port = (handle.server.address() as AddressInfo).port; + const response = await postJson( + port, + '/mcp', + { jsonrpc: '2.0', id: 2, method: 'tools/list', params: {} }, + { 'mcp-session-id': 'missing-session' }, + ); + + expect(response.status).toBe(404); + expect(response.body).toContain('Unknown MCP session'); + } finally { + await handle.close(); + } + }); +}); diff --git a/packages/cli/src/mcp-http-server.ts b/packages/cli/src/mcp-http-server.ts index 53b0d495..b040ca6e 100644 --- a/packages/cli/src/mcp-http-server.ts +++ b/packages/cli/src/mcp-http-server.ts @@ -1,4 +1,16 @@ -import type { IncomingHttpHeaders } from 'node:http'; +import { randomUUID } from 'node:crypto'; +import { createServer, type IncomingHttpHeaders, type IncomingMessage, type Server, type ServerResponse } from 'node:http'; +import { createDefaultKtxMcpServer, createLocalProjectMcpContextPorts } from '@ktx/context/mcp'; +import { createLocalProjectMemoryCapture } from '@ktx/context/memory'; +import { loadKtxProject, type KtxLocalProject } from '@ktx/context/project'; +import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'; +import type { KtxCliIo } from './cli-runtime.js'; +import { createKtxCliIngestQueryExecutor } from './ingest-query-executor.js'; +import { createKtxCliScanConnector } from './local-scan-connectors.js'; +import { createManagedPythonSemanticLayerComputePort } from './managed-python-command.js'; +import { createManagedDaemonSqlAnalysisPort } from './managed-python-http.js'; const DEFAULT_ALLOWED_HOSTS = ['localhost', '127.0.0.1', '::1'] as const; @@ -98,3 +110,231 @@ export function isMcpRequestAuthorized( } return { ok: true }; } + +export interface KtxMcpHttpServerHandle { + server: Server; + close(): Promise; +} + +export interface RunKtxMcpHttpServerOptions extends McpSecurityConfigInput { + projectDir: string; + cliVersion?: string; + io?: KtxCliIo; + createMcpServer?: () => McpServer; + loadProject?: typeof loadKtxProject; +} + +function noopIo(): KtxCliIo { + return { + stdout: { write() {} }, + stderr: { write() {} }, + }; +} + +function writeJson(res: ServerResponse, status: number, body: object): void { + const payload = `${JSON.stringify(body)}\n`; + res.writeHead(status, { + 'content-type': 'application/json', + 'content-length': Buffer.byteLength(payload), + }); + res.end(payload); +} + +function writeText(res: ServerResponse, status: number, body: string): void { + res.writeHead(status, { 'content-type': 'text/plain; charset=utf-8' }); + res.end(body); +} + +function requestPath(req: IncomingMessage): string { + const url = new URL(req.url ?? '/', 'http://127.0.0.1'); + return url.pathname; +} + +async function readJsonBody(req: IncomingMessage): Promise { + const chunks: Buffer[] = []; + for await (const chunk of req) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + const raw = Buffer.concat(chunks).toString('utf8'); + return raw.trim().length === 0 ? undefined : (JSON.parse(raw) as unknown); +} + +async function defaultMcpServerFactory(input: { + project: KtxLocalProject; + projectDir: string; + cliVersion: string; + io?: KtxCliIo; +}): Promise<() => McpServer> { + const io = input.io ?? noopIo(); + const queryExecutor = createKtxCliIngestQueryExecutor(input.project); + const semanticLayerCompute = await createManagedPythonSemanticLayerComputePort({ + cliVersion: input.cliVersion, + installPolicy: 'auto', + io, + }); + const sqlAnalysis = createManagedDaemonSqlAnalysisPort({ + cliVersion: input.cliVersion, + projectDir: input.projectDir, + installPolicy: 'auto', + io, + }); + const contextTools = createLocalProjectMcpContextPorts(input.project, { + semanticLayerCompute, + queryExecutor, + sqlAnalysis, + localScan: { + createConnector: async (connectionId) => createKtxCliScanConnector(input.project, connectionId), + }, + localIngest: { + semanticLayerCompute, + queryExecutor, + }, + }); + + let memoryCapture; + try { + memoryCapture = createLocalProjectMemoryCapture(input.project, { semanticLayerCompute, queryExecutor }); + } catch (error) { + input.io?.stderr.write(`KTX MCP memory_capture disabled: ${error instanceof Error ? error.message : String(error)}\n`); + } + + return () => + createDefaultKtxMcpServer({ + name: 'ktx', + version: input.cliVersion, + userContext: { userId: 'local' }, + contextTools, + memoryCapture, + }); +} + +function listenerPort(server: Server, fallback: number): number { + const address = server.address(); + return typeof address === 'object' && address ? address.port : fallback; +} + +function transportAllowedHosts(config: McpSecurityConfig, server: Server): string[] { + const port = listenerPort(server, config.port); + const hosts = new Set(config.allowedHosts); + for (const host of config.allowedHosts) { + hosts.add(`${host}:${port}`); + if (config.port !== 0 && config.port !== port) { + hosts.add(`${host}:${config.port}`); + } + } + return [...hosts]; +} + +export async function runKtxMcpHttpServer(options: RunKtxMcpHttpServerOptions): Promise { + const config = buildMcpSecurityConfig(options); + const project = + options.createMcpServer === undefined + ? await (options.loadProject ?? loadKtxProject)({ projectDir: options.projectDir }) + : undefined; + const createMcpServer = + options.createMcpServer ?? + (await defaultMcpServerFactory({ + project: project!, + projectDir: options.projectDir, + cliVersion: options.cliVersion ?? '0.0.0-private', + io: options.io, + })); + const sessions = new Map(); + + async function newTransport(): Promise { + let transport: StreamableHTTPServerTransport; + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: (sessionId) => { + sessions.set(sessionId, transport); + }, + onsessionclosed: (sessionId) => { + sessions.delete(sessionId); + }, + allowedHosts: transportAllowedHosts(config, server), + allowedOrigins: config.allowedOrigins, + enableDnsRebindingProtection: true, + }); + transport.onclose = () => { + if (transport.sessionId) { + sessions.delete(transport.sessionId); + } + }; + await createMcpServer().connect(transport); + return transport; + } + + const server = createServer(async (req, res) => { + const path = requestPath(req); + const auth = isMcpRequestAuthorized({ path, headers: req.headers }, config); + if (!auth.ok) { + writeText(res, auth.status, auth.message); + return; + } + + if (path === '/health' && req.method === 'GET') { + const port = listenerPort(server, config.port); + writeJson(res, 200, { status: 'ok', projectDir: options.projectDir, port }); + return; + } + + if (path !== '/mcp' || !['POST', 'GET', 'DELETE'].includes(req.method ?? '')) { + writeText(res, 404, 'Not found'); + return; + } + + const sessionId = req.headers['mcp-session-id']; + const normalizedSessionId = Array.isArray(sessionId) ? sessionId[0] : sessionId; + + if (req.method === 'POST') { + let body: unknown; + try { + body = await readJsonBody(req); + } catch (error) { + writeText(res, 400, `Invalid JSON body: ${error instanceof Error ? error.message : String(error)}`); + return; + } + const existing = normalizedSessionId ? sessions.get(normalizedSessionId) : undefined; + if (existing) { + await existing.handleRequest(req, res, body); + return; + } + if (normalizedSessionId) { + writeText(res, 404, `Unknown MCP session: ${normalizedSessionId}`); + return; + } + if (!isInitializeRequest(body)) { + writeText(res, 400, 'MCP initialize request is required before session traffic.'); + return; + } + await (await newTransport()).handleRequest(req, res, body); + return; + } + + if (!normalizedSessionId || !sessions.has(normalizedSessionId)) { + writeText(res, 404, normalizedSessionId ? `Unknown MCP session: ${normalizedSessionId}` : 'Missing MCP session id.'); + return; + } + await sessions.get(normalizedSessionId)!.handleRequest(req, res); + }); + + await new Promise((resolve, reject) => { + server.once('error', reject); + server.listen(config.port, config.host, () => { + server.off('error', reject); + resolve(); + }); + }); + + return { + server, + async close() { + for (const transport of sessions.values()) { + await transport.close(); + } + await new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())); + }); + }, + }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index de92ff9e..5323b3a1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -106,6 +106,9 @@ importers: '@ktx/llm': specifier: workspace:* version: link:../llm + '@modelcontextprotocol/sdk': + specifier: ^1.29.0 + version: 1.29.0(zod@4.4.3) commander: specifier: 14.0.3 version: 14.0.3