feat(cli): host mcp over streamable http

This commit is contained in:
Andrey Avtomonov 2026-05-14 18:50:08 +02:00
parent 7ffa99983f
commit 6bff3c3492
4 changed files with 403 additions and 1 deletions

View file

@ -45,6 +45,7 @@
"@ktx/connector-sqlserver": "workspace:*",
"@ktx/context": "workspace:*",
"@ktx/llm": "workspace:*",
"@modelcontextprotocol/sdk": "^1.29.0",
"commander": "14.0.3",
"ink": "^7.0.2",
"react": "^19.2.6",

View file

@ -1,8 +1,12 @@
import { request } from 'node:http';
import type { AddressInfo } from 'node:net';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { describe, expect, it } from 'vitest';
import {
buildMcpSecurityConfig,
isMcpRequestAuthorized,
normalizeHostHeader,
runKtxMcpHttpServer,
} from './mcp-http-server.js';
describe('normalizeHostHeader', () => {
@ -114,3 +118,157 @@ describe('isMcpRequestAuthorized', () => {
});
});
});
function postJson(port: number, path: string, body: unknown, headers: Record<string, string> = {}) {
return new Promise<{ status: number; headers: Record<string, string | string[] | undefined>; body: string }>(
(resolve, reject) => {
const payload = JSON.stringify(body);
const req = request(
{
host: '127.0.0.1',
port,
path,
method: 'POST',
headers: {
host: `127.0.0.1:${port}`,
accept: 'application/json, text/event-stream',
'content-type': 'application/json',
'content-length': Buffer.byteLength(payload),
...headers,
},
},
(res) => {
const chunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => chunks.push(chunk));
res.on('end', () =>
resolve({
status: res.statusCode ?? 0,
headers: res.headers,
body: Buffer.concat(chunks).toString('utf8'),
}),
);
},
);
req.on('error', reject);
req.end(payload);
},
);
}
function get(port: number, path: string, headers: Record<string, string> = {}) {
return new Promise<{ status: number; headers: Record<string, string | string[] | undefined>; body: string }>(
(resolve, reject) => {
const req = request(
{
host: '127.0.0.1',
port,
path,
method: 'GET',
headers: { host: `127.0.0.1:${port}`, ...headers },
},
(res) => {
const chunks: Buffer[] = [];
res.on('data', (chunk: Buffer) => chunks.push(chunk));
res.on('end', () =>
resolve({
status: res.statusCode ?? 0,
headers: res.headers,
body: Buffer.concat(chunks).toString('utf8'),
}),
);
},
);
req.on('error', reject);
req.end();
},
);
}
function createTestMcpServer() {
return () => {
const server = new McpServer({ name: 'ktx-test', version: '0.0.0-test' });
server.registerTool('ping', { inputSchema: {} }, async () => ({
content: [{ type: 'text', text: 'pong' }],
}));
return server;
};
}
describe('runKtxMcpHttpServer', () => {
it('serves /health with project metadata', async () => {
const handle = await runKtxMcpHttpServer({
projectDir: '/tmp/ktx-project',
host: '127.0.0.1',
port: 0,
allowedHosts: [],
allowedOrigins: [],
createMcpServer: createTestMcpServer(),
});
try {
const port = (handle.server.address() as AddressInfo).port;
const response = await get(port, '/health');
expect(response.status).toBe(200);
expect(JSON.parse(response.body)).toEqual({
status: 'ok',
projectDir: '/tmp/ktx-project',
port,
});
} finally {
await handle.close();
}
});
it('allocates a stateful MCP session on initialize', async () => {
const handle = await runKtxMcpHttpServer({
projectDir: '/tmp/ktx-project',
host: '127.0.0.1',
port: 0,
allowedHosts: [],
allowedOrigins: [],
createMcpServer: createTestMcpServer(),
});
try {
const port = (handle.server.address() as AddressInfo).port;
const response = await postJson(port, '/mcp', {
jsonrpc: '2.0',
id: 1,
method: 'initialize',
params: {
protocolVersion: '2025-06-18',
capabilities: {},
clientInfo: { name: 'vitest', version: '0.0.0' },
},
});
expect(response.status).toBe(200);
expect(response.headers['mcp-session-id']).toBeTruthy();
} finally {
await handle.close();
}
});
it('rejects unknown session ids with 404', async () => {
const handle = await runKtxMcpHttpServer({
projectDir: '/tmp/ktx-project',
host: '127.0.0.1',
port: 0,
allowedHosts: [],
allowedOrigins: [],
createMcpServer: createTestMcpServer(),
});
try {
const port = (handle.server.address() as AddressInfo).port;
const response = await postJson(
port,
'/mcp',
{ jsonrpc: '2.0', id: 2, method: 'tools/list', params: {} },
{ 'mcp-session-id': 'missing-session' },
);
expect(response.status).toBe(404);
expect(response.body).toContain('Unknown MCP session');
} finally {
await handle.close();
}
});
});

View file

@ -1,4 +1,16 @@
import type { IncomingHttpHeaders } from 'node:http';
import { randomUUID } from 'node:crypto';
import { createServer, type IncomingHttpHeaders, type IncomingMessage, type Server, type ServerResponse } from 'node:http';
import { createDefaultKtxMcpServer, createLocalProjectMcpContextPorts } from '@ktx/context/mcp';
import { createLocalProjectMemoryCapture } from '@ktx/context/memory';
import { loadKtxProject, type KtxLocalProject } from '@ktx/context/project';
import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js';
import type { KtxCliIo } from './cli-runtime.js';
import { createKtxCliIngestQueryExecutor } from './ingest-query-executor.js';
import { createKtxCliScanConnector } from './local-scan-connectors.js';
import { createManagedPythonSemanticLayerComputePort } from './managed-python-command.js';
import { createManagedDaemonSqlAnalysisPort } from './managed-python-http.js';
const DEFAULT_ALLOWED_HOSTS = ['localhost', '127.0.0.1', '::1'] as const;
@ -98,3 +110,231 @@ export function isMcpRequestAuthorized(
}
return { ok: true };
}
export interface KtxMcpHttpServerHandle {
server: Server;
close(): Promise<void>;
}
export interface RunKtxMcpHttpServerOptions extends McpSecurityConfigInput {
projectDir: string;
cliVersion?: string;
io?: KtxCliIo;
createMcpServer?: () => McpServer;
loadProject?: typeof loadKtxProject;
}
function noopIo(): KtxCliIo {
return {
stdout: { write() {} },
stderr: { write() {} },
};
}
function writeJson(res: ServerResponse, status: number, body: object): void {
const payload = `${JSON.stringify(body)}\n`;
res.writeHead(status, {
'content-type': 'application/json',
'content-length': Buffer.byteLength(payload),
});
res.end(payload);
}
function writeText(res: ServerResponse, status: number, body: string): void {
res.writeHead(status, { 'content-type': 'text/plain; charset=utf-8' });
res.end(body);
}
function requestPath(req: IncomingMessage): string {
const url = new URL(req.url ?? '/', 'http://127.0.0.1');
return url.pathname;
}
async function readJsonBody(req: IncomingMessage): Promise<unknown> {
const chunks: Buffer[] = [];
for await (const chunk of req) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
const raw = Buffer.concat(chunks).toString('utf8');
return raw.trim().length === 0 ? undefined : (JSON.parse(raw) as unknown);
}
async function defaultMcpServerFactory(input: {
project: KtxLocalProject;
projectDir: string;
cliVersion: string;
io?: KtxCliIo;
}): Promise<() => McpServer> {
const io = input.io ?? noopIo();
const queryExecutor = createKtxCliIngestQueryExecutor(input.project);
const semanticLayerCompute = await createManagedPythonSemanticLayerComputePort({
cliVersion: input.cliVersion,
installPolicy: 'auto',
io,
});
const sqlAnalysis = createManagedDaemonSqlAnalysisPort({
cliVersion: input.cliVersion,
projectDir: input.projectDir,
installPolicy: 'auto',
io,
});
const contextTools = createLocalProjectMcpContextPorts(input.project, {
semanticLayerCompute,
queryExecutor,
sqlAnalysis,
localScan: {
createConnector: async (connectionId) => createKtxCliScanConnector(input.project, connectionId),
},
localIngest: {
semanticLayerCompute,
queryExecutor,
},
});
let memoryCapture;
try {
memoryCapture = createLocalProjectMemoryCapture(input.project, { semanticLayerCompute, queryExecutor });
} catch (error) {
input.io?.stderr.write(`KTX MCP memory_capture disabled: ${error instanceof Error ? error.message : String(error)}\n`);
}
return () =>
createDefaultKtxMcpServer({
name: 'ktx',
version: input.cliVersion,
userContext: { userId: 'local' },
contextTools,
memoryCapture,
});
}
function listenerPort(server: Server, fallback: number): number {
const address = server.address();
return typeof address === 'object' && address ? address.port : fallback;
}
function transportAllowedHosts(config: McpSecurityConfig, server: Server): string[] {
const port = listenerPort(server, config.port);
const hosts = new Set<string>(config.allowedHosts);
for (const host of config.allowedHosts) {
hosts.add(`${host}:${port}`);
if (config.port !== 0 && config.port !== port) {
hosts.add(`${host}:${config.port}`);
}
}
return [...hosts];
}
export async function runKtxMcpHttpServer(options: RunKtxMcpHttpServerOptions): Promise<KtxMcpHttpServerHandle> {
const config = buildMcpSecurityConfig(options);
const project =
options.createMcpServer === undefined
? await (options.loadProject ?? loadKtxProject)({ projectDir: options.projectDir })
: undefined;
const createMcpServer =
options.createMcpServer ??
(await defaultMcpServerFactory({
project: project!,
projectDir: options.projectDir,
cliVersion: options.cliVersion ?? '0.0.0-private',
io: options.io,
}));
const sessions = new Map<string, StreamableHTTPServerTransport>();
async function newTransport(): Promise<StreamableHTTPServerTransport> {
let transport: StreamableHTTPServerTransport;
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (sessionId) => {
sessions.set(sessionId, transport);
},
onsessionclosed: (sessionId) => {
sessions.delete(sessionId);
},
allowedHosts: transportAllowedHosts(config, server),
allowedOrigins: config.allowedOrigins,
enableDnsRebindingProtection: true,
});
transport.onclose = () => {
if (transport.sessionId) {
sessions.delete(transport.sessionId);
}
};
await createMcpServer().connect(transport);
return transport;
}
const server = createServer(async (req, res) => {
const path = requestPath(req);
const auth = isMcpRequestAuthorized({ path, headers: req.headers }, config);
if (!auth.ok) {
writeText(res, auth.status, auth.message);
return;
}
if (path === '/health' && req.method === 'GET') {
const port = listenerPort(server, config.port);
writeJson(res, 200, { status: 'ok', projectDir: options.projectDir, port });
return;
}
if (path !== '/mcp' || !['POST', 'GET', 'DELETE'].includes(req.method ?? '')) {
writeText(res, 404, 'Not found');
return;
}
const sessionId = req.headers['mcp-session-id'];
const normalizedSessionId = Array.isArray(sessionId) ? sessionId[0] : sessionId;
if (req.method === 'POST') {
let body: unknown;
try {
body = await readJsonBody(req);
} catch (error) {
writeText(res, 400, `Invalid JSON body: ${error instanceof Error ? error.message : String(error)}`);
return;
}
const existing = normalizedSessionId ? sessions.get(normalizedSessionId) : undefined;
if (existing) {
await existing.handleRequest(req, res, body);
return;
}
if (normalizedSessionId) {
writeText(res, 404, `Unknown MCP session: ${normalizedSessionId}`);
return;
}
if (!isInitializeRequest(body)) {
writeText(res, 400, 'MCP initialize request is required before session traffic.');
return;
}
await (await newTransport()).handleRequest(req, res, body);
return;
}
if (!normalizedSessionId || !sessions.has(normalizedSessionId)) {
writeText(res, 404, normalizedSessionId ? `Unknown MCP session: ${normalizedSessionId}` : 'Missing MCP session id.');
return;
}
await sessions.get(normalizedSessionId)!.handleRequest(req, res);
});
await new Promise<void>((resolve, reject) => {
server.once('error', reject);
server.listen(config.port, config.host, () => {
server.off('error', reject);
resolve();
});
});
return {
server,
async close() {
for (const transport of sessions.values()) {
await transport.close();
}
await new Promise<void>((resolve, reject) => {
server.close((error) => (error ? reject(error) : resolve()));
});
},
};
}

3
pnpm-lock.yaml generated
View file

@ -106,6 +106,9 @@ importers:
'@ktx/llm':
specifier: workspace:*
version: link:../llm
'@modelcontextprotocol/sdk':
specifier: ^1.29.0
version: 1.29.0(zod@4.4.3)
commander:
specifier: 14.0.3
version: 14.0.3