From db09df4d725d4fe339cb2462ef496dc072528cd0 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Thu, 14 May 2026 18:50:54 +0200 Subject: [PATCH] feat(cli): manage mcp daemon lifecycle --- packages/cli/src/managed-mcp-daemon.test.ts | 130 +++++++++++ packages/cli/src/managed-mcp-daemon.ts | 238 ++++++++++++++++++++ 2 files changed, 368 insertions(+) create mode 100644 packages/cli/src/managed-mcp-daemon.test.ts create mode 100644 packages/cli/src/managed-mcp-daemon.ts diff --git a/packages/cli/src/managed-mcp-daemon.test.ts b/packages/cli/src/managed-mcp-daemon.test.ts new file mode 100644 index 00000000..d54f33e3 --- /dev/null +++ b/packages/cli/src/managed-mcp-daemon.test.ts @@ -0,0 +1,130 @@ +import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + mcpDaemonLayout, + readKtxMcpDaemonStatus, + startKtxMcpDaemon, + stopKtxMcpDaemon, + type KtxMcpDaemonChild, + type KtxMcpDaemonState, +} from './managed-mcp-daemon.js'; + +function child(pid = 4242): KtxMcpDaemonChild { + return { pid, unref: vi.fn() }; +} + +function state(projectDir: string, overrides: Partial = {}): KtxMcpDaemonState { + return { + schemaVersion: 1, + pid: 4242, + host: '127.0.0.1', + port: 7878, + tokenAuth: false, + projectDir, + startedAt: '2026-05-14T00:00:00.000Z', + logPath: join(projectDir, '.ktx/logs/mcp.log'), + ...overrides, + }; +} + +describe('managed MCP daemon lifecycle', () => { + let tempDir: string; + let projectDir: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-mcp-daemon-')); + projectDir = join(tempDir, 'project'); + await mkdir(projectDir, { recursive: true }); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('uses the spec state and log paths', () => { + expect(mcpDaemonLayout(projectDir)).toEqual({ + statePath: join(projectDir, '.ktx/mcp.json'), + logPath: join(projectDir, '.ktx/logs/mcp.log'), + }); + }); + + it('starts a detached child and writes state without the token value', async () => { + const spawnDaemon = vi.fn(() => child(5555)); + await startKtxMcpDaemon({ + projectDir, + cliVersion: '0.0.0-test', + host: '0.0.0.0', + port: 7879, + token: 'secret-token', + allowedHosts: ['mcp.example.test'], + allowedOrigins: ['https://mcp.example.test'], + binPath: '/repo/packages/cli/dist/bin.js', + spawnDaemon, + processAlive: vi.fn(() => false), + portAvailable: vi.fn(async () => true), + now: () => new Date('2026-05-14T00:00:00.000Z'), + }); + + expect(spawnDaemon).toHaveBeenCalledWith( + process.execPath, + [ + '/repo/packages/cli/dist/bin.js', + '--project-dir', + projectDir, + 'mcp', + 'serve-internal', + '--host', + '0.0.0.0', + '--port', + '7879', + '--allowed-host', + 'mcp.example.test', + '--allowed-origin', + 'https://mcp.example.test', + ], + expect.objectContaining({ + detached: true, + env: expect.objectContaining({ KTX_MCP_TOKEN: 'secret-token' }), + }), + ); + expect(JSON.stringify(JSON.parse(await readFile(join(projectDir, '.ktx/mcp.json'), 'utf8')))).not.toContain( + 'secret-token', + ); + }); + + it('reports running when the process is alive and health passes', async () => { + await mkdir(join(projectDir, '.ktx'), { recursive: true }); + await writeFile(join(projectDir, '.ktx/mcp.json'), `${JSON.stringify(state(projectDir), null, 2)}\n`); + + const status = await readKtxMcpDaemonStatus({ + projectDir, + processAlive: vi.fn(() => true), + fetchHealth: vi.fn(async () => ({ ok: true, body: { status: 'ok', projectDir, port: 7878 } })), + }); + + expect(status.kind).toBe('running'); + expect(status.url).toBe('http://127.0.0.1:7878/mcp'); + }); + + it('stops a recorded daemon and removes state', async () => { + await mkdir(join(projectDir, '.ktx'), { recursive: true }); + await writeFile(join(projectDir, '.ktx/mcp.json'), `${JSON.stringify(state(projectDir), null, 2)}\n`); + const alive = new Set([4242]); + const killProcess = vi.fn((pid: number) => alive.delete(pid)); + + await expect( + stopKtxMcpDaemon({ + projectDir, + processAlive: vi.fn((pid) => alive.has(pid)), + killProcess, + stopGraceMs: 1, + pollIntervalMs: 1, + }), + ).resolves.toEqual({ status: 'stopped' }); + + expect(killProcess).toHaveBeenCalledWith(4242, 'SIGTERM'); + await expect(readFile(join(projectDir, '.ktx/mcp.json'), 'utf8')).rejects.toThrow(); + }); +}); diff --git a/packages/cli/src/managed-mcp-daemon.ts b/packages/cli/src/managed-mcp-daemon.ts new file mode 100644 index 00000000..96394f69 --- /dev/null +++ b/packages/cli/src/managed-mcp-daemon.ts @@ -0,0 +1,238 @@ +import { spawn } from 'node:child_process'; +import { mkdir, open, readFile, rm, writeFile } from 'node:fs/promises'; +import { createServer } from 'node:net'; +import { dirname, join } from 'node:path'; +import { setTimeout as delay } from 'node:timers/promises'; +import { z } from 'zod'; + +export interface KtxMcpDaemonState { + schemaVersion: 1; + pid: number; + host: string; + port: number; + tokenAuth: boolean; + projectDir: string; + startedAt: string; + logPath: string; +} + +export interface KtxMcpDaemonChild { + pid?: number; + unref(): void; +} + +export type KtxMcpDaemonStatus = + | { kind: 'stopped'; detail: string } + | { kind: 'running'; detail: string; state: KtxMcpDaemonState; url: string } + | { kind: 'stale'; detail: string; state?: KtxMcpDaemonState }; + +const stateSchema = z.object({ + schemaVersion: z.literal(1), + pid: z.number().int().positive(), + host: z.string().min(1), + port: z.number().int().min(1).max(65535), + tokenAuth: z.boolean(), + projectDir: z.string().min(1), + startedAt: z.string().min(1), + logPath: z.string().min(1), +}); + +export function mcpDaemonLayout(projectDir: string): { statePath: string; logPath: string } { + return { + statePath: join(projectDir, '.ktx/mcp.json'), + logPath: join(projectDir, '.ktx/logs/mcp.log'), + }; +} + +function defaultProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +function defaultKillProcess(pid: number, signal: NodeJS.Signals): void { + try { + process.kill(pid, signal); + } catch (error) { + if ((error as { code?: unknown }).code !== 'ESRCH') { + throw error; + } + } +} + +async function readState(projectDir: string): Promise { + try { + return stateSchema.parse(JSON.parse(await readFile(mcpDaemonLayout(projectDir).statePath, 'utf8')) as unknown); + } catch (error) { + if ((error as { code?: unknown }).code === 'ENOENT') { + return undefined; + } + throw error; + } +} + +async function writeState(projectDir: string, state: KtxMcpDaemonState): Promise { + const { statePath } = mcpDaemonLayout(projectDir); + await mkdir(dirname(statePath), { recursive: true }); + await writeFile(statePath, `${JSON.stringify(state, null, 2)}\n`, 'utf8'); +} + +async function defaultPortAvailable(host: string, port: number): Promise { + return await new Promise((resolve) => { + const server = createServer(); + server.once('error', () => resolve(false)); + server.listen(port, host, () => server.close(() => resolve(true))); + }); +} + +function defaultSpawnDaemon( + command: string, + args: string[], + options: { detached: boolean; stdio: ['ignore', number, number]; env: NodeJS.ProcessEnv }, +): KtxMcpDaemonChild { + return spawn(command, args, options); +} + +async function defaultFetchHealth(state: KtxMcpDaemonState): Promise<{ ok: boolean; body: unknown; detail?: string }> { + try { + const response = await fetch(`http://${state.host}:${state.port}/health`, { + headers: { host: `${state.host}:${state.port}` }, + }); + const body = await response.json(); + return { ok: response.ok, body, detail: response.ok ? undefined : `HTTP ${response.status}` }; + } catch (error) { + return { ok: false, body: null, detail: error instanceof Error ? error.message : String(error) }; + } +} + +export async function startKtxMcpDaemon(options: { + projectDir: string; + cliVersion: string; + host: string; + port: number; + token?: string; + allowedHosts: string[]; + allowedOrigins: string[]; + binPath: string; + processAlive?: (pid: number) => boolean; + portAvailable?: (host: string, port: number) => Promise; + spawnDaemon?: typeof defaultSpawnDaemon; + now?: () => Date; +}): Promise<{ status: 'started'; state: KtxMcpDaemonState; url: string }> { + const existing = await readState(options.projectDir).catch(() => undefined); + const processAlive = options.processAlive ?? defaultProcessAlive; + if (existing && processAlive(existing.pid)) { + throw new Error(`KTX MCP daemon is already recorded at http://${existing.host}:${existing.port}/mcp`); + } + const portAvailable = options.portAvailable ?? defaultPortAvailable; + if (!(await portAvailable(options.host, options.port))) { + throw new Error(`Port ${options.port} is already in use. Choose another port with --port .`); + } + + const { logPath } = mcpDaemonLayout(options.projectDir); + await mkdir(dirname(logPath), { recursive: true }); + const log = await open(logPath, 'a'); + try { + const args = [ + options.binPath, + '--project-dir', + options.projectDir, + 'mcp', + 'serve-internal', + '--host', + options.host, + '--port', + String(options.port), + ...options.allowedHosts.flatMap((host) => ['--allowed-host', host]), + ...options.allowedOrigins.flatMap((origin) => ['--allowed-origin', origin]), + ]; + const child = (options.spawnDaemon ?? defaultSpawnDaemon)(process.execPath, args, { + detached: true, + stdio: ['ignore', log.fd, log.fd], + env: { + ...process.env, + KTX_CLI_VERSION: options.cliVersion, + ...(options.token ? { KTX_MCP_TOKEN: options.token } : {}), + }, + }); + if (!child.pid) { + throw new Error('Failed to start KTX MCP daemon: child process pid was not available.'); + } + child.unref(); + const state: KtxMcpDaemonState = { + schemaVersion: 1, + pid: child.pid, + host: options.host, + port: options.port, + tokenAuth: Boolean(options.token), + projectDir: options.projectDir, + startedAt: (options.now ?? (() => new Date()))().toISOString(), + logPath, + }; + await writeState(options.projectDir, state); + return { status: 'started', state, url: `http://${state.host}:${state.port}/mcp` }; + } finally { + await log.close(); + } +} + +export async function readKtxMcpDaemonStatus(options: { + projectDir: string; + processAlive?: (pid: number) => boolean; + fetchHealth?: (state: KtxMcpDaemonState) => Promise<{ ok: boolean; body: unknown; detail?: string }>; +}): Promise { + let state: KtxMcpDaemonState | undefined; + try { + state = await readState(options.projectDir); + } catch (error) { + return { kind: 'stale', detail: `MCP daemon state is invalid: ${error instanceof Error ? error.message : String(error)}` }; + } + if (!state) { + return { kind: 'stopped', detail: `No MCP daemon state at ${mcpDaemonLayout(options.projectDir).statePath}` }; + } + const processAlive = options.processAlive ?? defaultProcessAlive; + if (!processAlive(state.pid)) { + return { kind: 'stale', detail: `MCP daemon process ${state.pid} is not running`, state }; + } + const health = await (options.fetchHealth ?? defaultFetchHealth)(state); + if (!health.ok) { + return { kind: 'stale', detail: health.detail ?? 'MCP daemon health check failed', state }; + } + return { + kind: 'running', + detail: `KTX MCP daemon running at http://${state.host}:${state.port}/mcp`, + state, + url: `http://${state.host}:${state.port}/mcp`, + }; +} + +export async function stopKtxMcpDaemon(options: { + projectDir: string; + processAlive?: (pid: number) => boolean; + killProcess?: (pid: number, signal: NodeJS.Signals) => void; + stopGraceMs?: number; + pollIntervalMs?: number; +}): Promise<{ status: 'stopped' | 'already-stopped' }> { + const state = await readState(options.projectDir); + const { statePath } = mcpDaemonLayout(options.projectDir); + if (!state) { + return { status: 'already-stopped' }; + } + const processAlive = options.processAlive ?? defaultProcessAlive; + const killProcess = options.killProcess ?? defaultKillProcess; + if (processAlive(state.pid)) { + killProcess(state.pid, 'SIGTERM'); + const deadline = Date.now() + (options.stopGraceMs ?? 10_000); + while (Date.now() <= deadline && processAlive(state.pid)) { + await delay(options.pollIntervalMs ?? 100); + } + if (processAlive(state.pid)) { + killProcess(state.pid, 'SIGKILL'); + } + } + await rm(statePath, { force: true }); + return { status: 'stopped' }; +}