diff --git a/packages/cli/src/commands/connection-metabase-setup.test.ts b/packages/cli/src/commands/connection-metabase-setup.test.ts index cd94565a..cf7308d7 100644 --- a/packages/cli/src/commands/connection-metabase-setup.test.ts +++ b/packages/cli/src/commands/connection-metabase-setup.test.ts @@ -138,7 +138,7 @@ function makeIo(options: { isTTY?: boolean; stdinIsTTY?: boolean } = {}) { describe('runKtxConnectionMetabaseSetup', () => { const fakeMetabaseCredential = 'mb_example'; const existingMetabaseCredential = 'mb_existing'; - const fakeAdminCredential = 'pw'; + const fakeAdminCredential = 'admin-secret-value-123'; let tempDir: string; let projectDir: string; diff --git a/packages/cli/src/commands/runtime-commands.ts b/packages/cli/src/commands/runtime-commands.ts index 8f478658..3ce7d9ba 100644 --- a/packages/cli/src/commands/runtime-commands.ts +++ b/packages/cli/src/commands/runtime-commands.ts @@ -53,10 +53,12 @@ export function registerRuntimeCommands(program: Command, context: KtxCliCommand runtime .command('stop') .description('Stop the KTX-managed Python HTTP daemon') - .action(async () => { + .option('--all', 'Stop all KTX daemon processes recorded or discoverable on this machine', false) + .action(async (options: { all?: boolean }) => { await runRuntimeArgs(context, { command: 'stop', cliVersion: context.packageInfo.version, + all: options.all === true, }); }); diff --git a/packages/cli/src/index.test.ts b/packages/cli/src/index.test.ts index 8bc2a3a6..4a45274b 100644 --- a/packages/cli/src/index.test.ts +++ b/packages/cli/src/index.test.ts @@ -143,6 +143,7 @@ describe('runKtxCli', () => { const installIo = makeIo(); const startIo = makeIo(); const stopIo = makeIo(); + const stopAllIo = makeIo(); const statusIo = makeIo(); const doctorIo = makeIo(); const pruneIo = makeIo(); @@ -156,6 +157,7 @@ describe('runKtxCli', () => { runKtxCli(['runtime', 'start', '--feature', 'local-embeddings', '--force'], startIo.io, { runtime }), ).resolves.toBe(0); await expect(runKtxCli(['runtime', 'stop'], stopIo.io, { runtime })).resolves.toBe(0); + await expect(runKtxCli(['runtime', 'stop', '--all'], stopAllIo.io, { runtime })).resolves.toBe(0); await expect(runKtxCli(['runtime', 'status', '--json'], statusIo.io, { runtime })).resolves.toBe(0); await expect(runKtxCli(['runtime', 'doctor'], doctorIo.io, { runtime })).resolves.toBe(0); await expect(runKtxCli(['runtime', 'prune', '--dry-run'], pruneIo.io, { runtime })).resolves.toBe(0); @@ -185,11 +187,21 @@ describe('runKtxCli', () => { { command: 'stop', cliVersion: '0.0.0-private', + all: false, }, stopIo.io, ); expect(runtime).toHaveBeenNthCalledWith( 4, + { + command: 'stop', + cliVersion: '0.0.0-private', + all: true, + }, + stopAllIo.io, + ); + expect(runtime).toHaveBeenNthCalledWith( + 5, { command: 'status', cliVersion: '0.0.0-private', @@ -198,7 +210,7 @@ describe('runKtxCli', () => { statusIo.io, ); expect(runtime).toHaveBeenNthCalledWith( - 5, + 6, { command: 'doctor', cliVersion: '0.0.0-private', @@ -207,7 +219,7 @@ describe('runKtxCli', () => { doctorIo.io, ); expect(runtime).toHaveBeenNthCalledWith( - 6, + 7, { command: 'prune', cliVersion: '0.0.0-private', @@ -218,6 +230,17 @@ describe('runKtxCli', () => { ); }); + it('documents runtime stop all in command help', async () => { + const testIo = makeIo(); + + await expect(runKtxCli(['runtime', 'stop', '--help'], testIo.io)).resolves.toBe(0); + + expect(testIo.stdout()).toContain('--all'); + expect(testIo.stdout()).toContain('Stop all KTX daemon processes recorded or discoverable'); + expect(testIo.stdout()).toContain('on this machine'); + expect(testIo.stderr()).toBe(''); + }); + it('routes sl query managed runtime install policies', async () => { const sl = vi.fn(async () => 0); diff --git a/packages/cli/src/index.ts b/packages/cli/src/index.ts index 96fbbeec..de906ece 100644 --- a/packages/cli/src/index.ts +++ b/packages/cli/src/index.ts @@ -47,13 +47,18 @@ export { runKtxRuntime, type KtxRuntimeArgs, type KtxRuntimeDeps } from './runti export { allocateDaemonPort, readManagedPythonDaemonStatus, + stopAllManagedPythonDaemons, startManagedPythonDaemon, stopManagedPythonDaemon, } from './managed-python-daemon.js'; export type { + ManagedPythonDaemonProcessInfo, ManagedPythonDaemonStartResult, ManagedPythonDaemonState, ManagedPythonDaemonStatus, + ManagedPythonDaemonStopAllEntry, + ManagedPythonDaemonStopAllFailure, + ManagedPythonDaemonStopAllResult, ManagedPythonDaemonStopResult, } from './managed-python-daemon.js'; export { diff --git a/packages/cli/src/managed-python-daemon.test.ts b/packages/cli/src/managed-python-daemon.test.ts index 4e7af22c..ffa69972 100644 --- a/packages/cli/src/managed-python-daemon.test.ts +++ b/packages/cli/src/managed-python-daemon.test.ts @@ -5,9 +5,11 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { readManagedPythonDaemonStatus, startManagedPythonDaemon, + stopAllManagedPythonDaemons, stopManagedPythonDaemon, type ManagedPythonDaemonChild, type ManagedPythonDaemonFetch, + type ManagedPythonDaemonProcessInfo, type ManagedPythonDaemonSpawn, type ManagedPythonDaemonState, } from './managed-python-daemon.js'; @@ -105,6 +107,24 @@ function runningState(root: string, overrides: Partial }; } +function daemonStatePath(root: string, version: string): string { + return join(root, 'runtime', version, 'daemon.json'); +} + +function runningStateForVersion( + root: string, + version: string, + overrides: Partial = {}, +): ManagedPythonDaemonState { + return { + ...runningState(root), + version, + stdoutLog: join(root, 'runtime', version, 'daemon.stdout.log'), + stderrLog: join(root, 'runtime', version, 'daemon.stderr.log'), + ...overrides, + }; +} + describe('managed Python daemon lifecycle', () => { let tempDir: string; @@ -271,4 +291,138 @@ describe('managed Python daemon lifecycle', () => { expect(killProcess).toHaveBeenCalledWith(4242); await expect(readFile(layout(tempDir).daemonStatePath, 'utf8')).rejects.toThrow(); }); + + it('stops all recorded daemon states across runtime versions and removes state files', async () => { + await mkdir(join(tempDir, 'runtime', '0.1.0'), { recursive: true }); + await mkdir(join(tempDir, 'runtime', '0.2.0'), { recursive: true }); + await writeFile( + daemonStatePath(tempDir, '0.1.0'), + `${JSON.stringify(runningStateForVersion(tempDir, '0.1.0', { pid: 1111, port: 61111 }), null, 2)}\n`, + ); + await writeFile( + daemonStatePath(tempDir, '0.2.0'), + `${JSON.stringify(runningStateForVersion(tempDir, '0.2.0', { pid: 2222, port: 62222 }), null, 2)}\n`, + ); + const alive = new Set([1111, 2222]); + const killProcess = vi.fn((pid: number) => { + alive.delete(pid); + }); + + const result = await stopAllManagedPythonDaemons({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + listProcesses: vi.fn(async () => []), + processAlive: vi.fn((pid) => alive.has(pid)), + killProcess, + stopGraceMs: 0, + }); + + expect(result.failed).toHaveLength(0); + expect(result.stopped.map((entry) => entry.pid).sort()).toEqual([1111, 2222]); + expect(killProcess).toHaveBeenCalledWith(1111, 'SIGTERM'); + expect(killProcess).toHaveBeenCalledWith(2222, 'SIGTERM'); + await expect(readFile(daemonStatePath(tempDir, '0.1.0'), 'utf8')).rejects.toThrow(); + await expect(readFile(daemonStatePath(tempDir, '0.2.0'), 'utf8')).rejects.toThrow(); + }); + + it('removes stale state when the recorded daemon process is no longer alive', async () => { + await mkdir(layout(tempDir).versionDir, { recursive: true }); + await writeFile(layout(tempDir).daemonStatePath, `${JSON.stringify(runningState(tempDir), null, 2)}\n`); + + const result = await stopAllManagedPythonDaemons({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + listProcesses: vi.fn(async () => []), + processAlive: vi.fn(() => false), + killProcess: vi.fn(), + stopGraceMs: 0, + }); + + expect(result.stopped).toHaveLength(0); + expect(result.stale.map((entry) => entry.pid)).toEqual([4242]); + await expect(readFile(layout(tempDir).daemonStatePath, 'utf8')).rejects.toThrow(); + }); + + it('deduplicates a daemon found by state and process scan, preferring state metadata', async () => { + await mkdir(layout(tempDir).versionDir, { recursive: true }); + await writeFile(layout(tempDir).daemonStatePath, `${JSON.stringify(runningState(tempDir), null, 2)}\n`); + const alive = new Set([4242]); + const killProcess = vi.fn((pid: number) => { + alive.delete(pid); + }); + + const result = await stopAllManagedPythonDaemons({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + listProcesses: vi.fn(async (): Promise => [ + { pid: 4242, command: 'uv run ktx-daemon serve-http --host 127.0.0.1 --port 61234' }, + ]), + processAlive: vi.fn((pid) => alive.has(pid)), + killProcess, + stopGraceMs: 0, + }); + + expect(result.stopped).toHaveLength(1); + expect(result.stopped[0]).toMatchObject({ + pid: 4242, + source: 'state', + url: 'http://127.0.0.1:58731', + }); + expect(killProcess).toHaveBeenCalledTimes(1); + }); + + it('stops unrecorded ktx-daemon serve-http processes from process scan results', async () => { + const alive = new Set([3333, 5555]); + const killProcess = vi.fn((pid: number) => { + alive.delete(pid); + }); + + const result = await stopAllManagedPythonDaemons({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + listProcesses: vi.fn(async (): Promise => [ + { pid: 3333, command: 'uv run ktx-daemon serve-http --host 127.0.0.1 --port 8765' }, + { pid: 4444, command: 'node server.js --port 8765' }, + { pid: 5555, command: 'grep ktx-daemon serve-http --port 8765' }, + ]), + processAlive: vi.fn((pid) => alive.has(pid)), + killProcess, + stopGraceMs: 0, + }); + + expect(result.failed).toHaveLength(0); + expect(result.stopped).toEqual([ + expect.objectContaining({ + pid: 3333, + source: 'process', + url: 'http://127.0.0.1:8765', + }), + ]); + expect(killProcess).toHaveBeenCalledWith(3333, 'SIGTERM'); + expect(killProcess).not.toHaveBeenCalledWith(4444, expect.anything()); + expect(killProcess).not.toHaveBeenCalledWith(5555, expect.anything()); + }); + + it('reports a failed stop when TERM and KILL leave a daemon running', async () => { + await mkdir(layout(tempDir).versionDir, { recursive: true }); + await writeFile(layout(tempDir).daemonStatePath, `${JSON.stringify(runningState(tempDir), null, 2)}\n`); + + const result = await stopAllManagedPythonDaemons({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + listProcesses: vi.fn(async () => []), + processAlive: vi.fn(() => true), + killProcess: vi.fn(), + stopGraceMs: 0, + }); + + expect(result.stopped).toHaveLength(0); + expect(result.failed).toEqual([ + expect.objectContaining({ + pid: 4242, + detail: 'Process still running after SIGKILL', + }), + ]); + expect(await readFile(layout(tempDir).daemonStatePath, 'utf8')).toContain('"pid": 4242'); + }); }); diff --git a/packages/cli/src/managed-python-daemon.ts b/packages/cli/src/managed-python-daemon.ts index 2caf9182..b99de581 100644 --- a/packages/cli/src/managed-python-daemon.ts +++ b/packages/cli/src/managed-python-daemon.ts @@ -1,7 +1,9 @@ -import { spawn } from 'node:child_process'; -import { mkdir, open, readFile, rm, writeFile } from 'node:fs/promises'; +import { execFile, spawn } from 'node:child_process'; +import { mkdir, open, readdir, readFile, rm, writeFile } from 'node:fs/promises'; import { createServer } from 'node:net'; +import { join } from 'node:path'; import { setTimeout as delay } from 'node:timers/promises'; +import { promisify } from 'node:util'; import { z } from 'zod'; import { installManagedPythonRuntime, @@ -44,6 +46,35 @@ export interface ManagedPythonDaemonStopResult { state?: ManagedPythonDaemonState; } +export interface ManagedPythonDaemonProcessInfo { + pid: number; + command: string; +} + +export type ManagedPythonDaemonStopAllSource = 'state' | 'process'; + +export interface ManagedPythonDaemonStopAllEntry { + pid: number; + source: ManagedPythonDaemonStopAllSource; + url?: string; + health?: 'healthy' | 'unreachable'; + version?: string; + command?: string; + statePaths: string[]; +} + +export interface ManagedPythonDaemonStopAllFailure extends ManagedPythonDaemonStopAllEntry { + detail: string; +} + +export interface ManagedPythonDaemonStopAllResult { + runtimeRoot: string; + stopped: ManagedPythonDaemonStopAllEntry[]; + stale: ManagedPythonDaemonStopAllEntry[]; + failed: ManagedPythonDaemonStopAllFailure[]; + scanErrors: string[]; +} + export interface ManagedPythonDaemonChild { pid?: number; unref(): void; @@ -68,6 +99,8 @@ export type ManagedPythonDaemonFetch = ( text(): Promise; }>; +export type ManagedPythonDaemonKillProcess = (pid: number, signal?: NodeJS.Signals) => void; + export interface ManagedPythonDaemonStartOptions extends ManagedPythonRuntimeLayoutOptions { features: KtxRuntimeFeature[]; force?: boolean; @@ -76,7 +109,7 @@ export interface ManagedPythonDaemonStartOptions extends ManagedPythonRuntimeLay fetch?: ManagedPythonDaemonFetch; allocatePort?: () => Promise; processAlive?: (pid: number) => boolean; - killProcess?: (pid: number) => void; + killProcess?: ManagedPythonDaemonKillProcess; now?: () => Date; startupTimeoutMs?: number; pollIntervalMs?: number; @@ -89,9 +122,20 @@ export interface ManagedPythonDaemonStatusOptions extends ManagedPythonRuntimeLa export interface ManagedPythonDaemonStopOptions extends ManagedPythonRuntimeLayoutOptions { processAlive?: (pid: number) => boolean; - killProcess?: (pid: number) => void; + killProcess?: ManagedPythonDaemonKillProcess; } +export interface ManagedPythonDaemonStopAllOptions extends ManagedPythonRuntimeLayoutOptions { + listProcesses?: () => Promise; + processAlive?: (pid: number) => boolean; + killProcess?: ManagedPythonDaemonKillProcess; + stopGraceMs?: number; + pollIntervalMs?: number; + healthProbeMs?: number; +} + +const execFileAsync = promisify(execFile); + const daemonStateSchema = z.object({ schemaVersion: z.literal(1), pid: z.number().int().positive(), @@ -126,9 +170,9 @@ function defaultProcessAlive(pid: number): boolean { } } -function defaultKillProcess(pid: number): void { +function defaultKillProcess(pid: number, signal: NodeJS.Signals = 'SIGTERM'): void { try { - process.kill(pid, 'SIGTERM'); + process.kill(pid, signal); } catch (error) { const code = (error as { code?: unknown }).code; if (code !== 'ESRCH') { @@ -293,7 +337,7 @@ async function stopRecordedDaemon(input: { layout: ManagedPythonRuntimeLayout; state: ManagedPythonDaemonState; processAlive: (pid: number) => boolean; - killProcess: (pid: number) => void; + killProcess: ManagedPythonDaemonKillProcess; }): Promise { if (input.processAlive(input.state.pid)) { input.killProcess(input.state.pid); @@ -301,6 +345,323 @@ async function stopRecordedDaemon(input: { await removeState(input.layout); } +function runtimeRootForStopAll(options: ManagedPythonRuntimeLayoutOptions): string { + return managedPythonRuntimeLayout(options).runtimeRoot; +} + +async function removeStatePaths(paths: string[]): Promise { + await Promise.all([...new Set(paths)].map((path) => rm(path, { force: true }))); +} + +interface ManagedPythonDaemonStopCandidate { + pid: number; + source: ManagedPythonDaemonStopAllSource; + host?: string; + port?: number; + version?: string; + command?: string; + statePaths: string[]; +} + +function candidateUrl(candidate: ManagedPythonDaemonStopCandidate): string | undefined { + if (!candidate.host || !candidate.port) { + return undefined; + } + return `http://${candidate.host}:${candidate.port}`; +} + +function candidateEntry(candidate: ManagedPythonDaemonStopCandidate): ManagedPythonDaemonStopAllEntry { + return { + pid: candidate.pid, + source: candidate.source, + ...(candidateUrl(candidate) ? { url: candidateUrl(candidate) } : {}), + ...(candidate.version ? { version: candidate.version } : {}), + ...(candidate.command ? { command: candidate.command } : {}), + statePaths: [...candidate.statePaths], + }; +} + +async function probeCandidateHealth( + candidate: ManagedPythonDaemonStopCandidate, + timeoutMs: number, +): Promise<'healthy' | 'unreachable' | undefined> { + const url = candidateUrl(candidate); + if (!url) { + return undefined; + } + const controller = new AbortController(); + const timeout = setTimeout(() => { + controller.abort(); + }, timeoutMs); + try { + const response = await fetch(`${url}/health`, { signal: controller.signal }); + if (!response.ok) { + return 'unreachable'; + } + const body = (await response.json()) as unknown; + if (!body || typeof body !== 'object' || Array.isArray(body)) { + return 'unreachable'; + } + return (body as Record).status === 'healthy' ? 'healthy' : 'unreachable'; + } catch { + return 'unreachable'; + } finally { + clearTimeout(timeout); + } +} + +async function readStateCandidates(runtimeRoot: string): Promise { + let entries; + try { + entries = await readdir(runtimeRoot, { withFileTypes: true }); + } catch (error) { + const code = (error as { code?: unknown }).code; + if (code === 'ENOENT') { + return []; + } + throw error; + } + const candidates: ManagedPythonDaemonStopCandidate[] = []; + for (const entry of entries) { + if (!entry.isDirectory()) { + continue; + } + const statePath = join(runtimeRoot, entry.name, 'daemon.json'); + let state: ManagedPythonDaemonState | undefined; + try { + state = await readState(statePath); + } catch { + continue; + } + if (!state) { + continue; + } + candidates.push({ + pid: state.pid, + source: 'state', + host: state.host, + port: state.port, + version: state.version, + statePaths: [statePath], + }); + } + return candidates; +} + +function tokenizeCommand(command: string): string[] { + const tokens: string[] = []; + for (const match of command.matchAll(/"([^"]*)"|'([^']*)'|(\S+)/g)) { + tokens.push(match[1] ?? match[2] ?? match[3] ?? ''); + } + return tokens; +} + +function executableName(token: string): string { + return token.split(/[\\/]/).at(-1) ?? token; +} + +function isKtxDaemonExecutable(token: string): boolean { + return executableName(token) === 'ktx-daemon' || executableName(token) === 'ktx-daemon.exe'; +} + +function normalizedExecutableName(token: string): string { + return executableName(token).replace(/\.exe$/i, '').toLowerCase(); +} + +function hasUvRunPrefix(tokens: string[], daemonIndex: number): boolean { + return normalizedExecutableName(tokens[0] ?? '') === 'uv' && tokens.slice(1, daemonIndex).includes('run'); +} + +function isPythonExecutable(token: string): boolean { + const name = normalizedExecutableName(token); + return name === 'python' || name === 'python3'; +} + +function hasPythonModulePrefix(tokens: string[], moduleFlagIndex: number): boolean { + if (moduleFlagIndex === 1 && isPythonExecutable(tokens[0] ?? '')) { + return true; + } + return ( + normalizedExecutableName(tokens[0] ?? '') === 'uv' && + tokens.slice(1, moduleFlagIndex).includes('run') && + tokens.some((token, index) => index < moduleFlagIndex && isPythonExecutable(token)) + ); +} + +function isKtxDaemonServeHttp(tokens: string[]): boolean { + for (let index = 0; index < tokens.length; index += 1) { + if ( + isKtxDaemonExecutable(tokens[index] ?? '') && + tokens[index + 1] === 'serve-http' && + (index === 0 || hasUvRunPrefix(tokens, index)) + ) { + return true; + } + if ( + tokens[index] === '-m' && + tokens[index + 1] === 'ktx_daemon' && + tokens[index + 2] === 'serve-http' && + hasPythonModulePrefix(tokens, index) + ) { + return true; + } + } + return false; +} + +function parseCommandOption(tokens: string[], option: string): string | undefined { + for (let index = 0; index < tokens.length; index += 1) { + const token = tokens[index]; + if (token === option) { + return tokens[index + 1]; + } + if (token?.startsWith(`${option}=`)) { + return token.slice(option.length + 1); + } + } + return undefined; +} + +function processCandidate(processInfo: ManagedPythonDaemonProcessInfo): ManagedPythonDaemonStopCandidate | undefined { + const tokens = tokenizeCommand(processInfo.command); + if (!isKtxDaemonServeHttp(tokens)) { + return undefined; + } + const host = parseCommandOption(tokens, '--host') ?? '127.0.0.1'; + const rawPort = parseCommandOption(tokens, '--port'); + const parsedPort = rawPort ? Number.parseInt(rawPort, 10) : 8765; + const port = Number.isInteger(parsedPort) && parsedPort >= 1 && parsedPort <= 65535 ? parsedPort : 8765; + return { + pid: processInfo.pid, + source: 'process', + host, + port, + command: processInfo.command, + statePaths: [], + }; +} + +function mergeCandidates(candidates: ManagedPythonDaemonStopCandidate[]): ManagedPythonDaemonStopCandidate[] { + const byPid = new Map(); + for (const candidate of candidates) { + const existing = byPid.get(candidate.pid); + if (!existing) { + byPid.set(candidate.pid, { ...candidate, statePaths: [...candidate.statePaths] }); + continue; + } + existing.statePaths.push(...candidate.statePaths); + if (existing.source === 'process' && candidate.source === 'state') { + byPid.set(candidate.pid, { + ...candidate, + statePaths: [...new Set([...existing.statePaths, ...candidate.statePaths])], + }); + } else { + existing.statePaths = [...new Set(existing.statePaths)]; + } + } + return [...byPid.values()].sort((left, right) => left.pid - right.pid); +} + +function parsePosixProcessList(output: string): ManagedPythonDaemonProcessInfo[] { + const processes: ManagedPythonDaemonProcessInfo[] = []; + for (const line of output.split(/\r?\n/)) { + const match = line.match(/^\s*(\d+)\s+(.+)$/); + if (!match) { + continue; + } + processes.push({ pid: Number.parseInt(match[1], 10), command: match[2] }); + } + return processes; +} + +function parseWindowsProcessList(output: string): ManagedPythonDaemonProcessInfo[] { + if (!output.trim()) { + return []; + } + const parsed = JSON.parse(output) as unknown; + const records = Array.isArray(parsed) ? parsed : [parsed]; + const processes: ManagedPythonDaemonProcessInfo[] = []; + for (const record of records) { + if (!record || typeof record !== 'object') { + continue; + } + const value = record as Record; + const pid = value.ProcessId; + const command = value.CommandLine; + if (typeof pid === 'number' && typeof command === 'string' && command.length > 0) { + processes.push({ pid, command }); + } + } + return processes; +} + +async function defaultListProcesses(platform: NodeJS.Platform = process.platform): Promise { + if (platform === 'win32') { + const command = [ + 'Get-CimInstance Win32_Process', + '| Where-Object { $_.CommandLine -ne $null }', + '| Select-Object ProcessId,CommandLine', + '| ConvertTo-Json -Compress', + ].join(' '); + const { stdout } = await execFileAsync('powershell.exe', ['-NoProfile', '-Command', command], { + encoding: 'utf8', + maxBuffer: 10 * 1024 * 1024, + }); + return parseWindowsProcessList(stdout); + } + const { stdout } = await execFileAsync('ps', ['-axo', 'pid=,command='], { + encoding: 'utf8', + maxBuffer: 10 * 1024 * 1024, + }); + return parsePosixProcessList(stdout); +} + +async function waitUntilStopped(input: { + pid: number; + processAlive: (pid: number) => boolean; + timeoutMs: number; + pollIntervalMs: number; +}): Promise { + const deadline = Date.now() + input.timeoutMs; + do { + if (!input.processAlive(input.pid)) { + return true; + } + if (Date.now() >= deadline) { + break; + } + await delay(input.pollIntervalMs); + } while (Date.now() <= deadline); + return !input.processAlive(input.pid); +} + +async function discoverStopAllCandidates( + options: ManagedPythonDaemonStopAllOptions, +): Promise<{ + runtimeRoot: string; + candidates: ManagedPythonDaemonStopCandidate[]; + scanErrors: string[]; +}> { + const runtimeRoot = runtimeRootForStopAll(options); + const stateCandidates = await readStateCandidates(runtimeRoot); + const scanErrors: string[] = []; + let processCandidates: ManagedPythonDaemonStopCandidate[] = []; + try { + const processes = await (options.listProcesses ?? defaultListProcesses)(); + processCandidates = processes.flatMap((processInfo) => { + const candidate = processCandidate(processInfo); + return candidate ? [candidate] : []; + }); + } catch (error) { + scanErrors.push(error instanceof Error ? error.message : String(error)); + } + return { + runtimeRoot, + candidates: mergeCandidates([...stateCandidates, ...processCandidates]), + scanErrors, + }; +} + export async function startManagedPythonDaemon( options: ManagedPythonDaemonStartOptions, ): Promise { @@ -404,3 +765,63 @@ export async function stopManagedPythonDaemon( }); return { status: 'stopped', layout, state }; } + +export async function stopAllManagedPythonDaemons( + options: ManagedPythonDaemonStopAllOptions, +): Promise { + const processAlive = options.processAlive ?? defaultProcessAlive; + const killProcess = options.killProcess ?? defaultKillProcess; + const stopGraceMs = options.stopGraceMs ?? 500; + const pollIntervalMs = options.pollIntervalMs ?? 50; + const healthProbeMs = options.healthProbeMs ?? 100; + const discovery = await discoverStopAllCandidates(options); + const stopped: ManagedPythonDaemonStopAllEntry[] = []; + const stale: ManagedPythonDaemonStopAllEntry[] = []; + const failed: ManagedPythonDaemonStopAllFailure[] = []; + + for (const candidate of discovery.candidates) { + const health = await probeCandidateHealth(candidate, healthProbeMs); + const entry = { ...candidateEntry(candidate), ...(health ? { health } : {}) }; + if (!processAlive(candidate.pid)) { + await removeStatePaths(candidate.statePaths); + stale.push(entry); + continue; + } + try { + killProcess(candidate.pid, 'SIGTERM'); + if ( + !(await waitUntilStopped({ + pid: candidate.pid, + processAlive, + timeoutMs: stopGraceMs, + pollIntervalMs, + })) + ) { + killProcess(candidate.pid, 'SIGKILL'); + if ( + !(await waitUntilStopped({ + pid: candidate.pid, + processAlive, + timeoutMs: stopGraceMs, + pollIntervalMs, + })) + ) { + failed.push({ ...entry, detail: 'Process still running after SIGKILL' }); + continue; + } + } + await removeStatePaths(candidate.statePaths); + stopped.push(entry); + } catch (error) { + failed.push({ ...entry, detail: error instanceof Error ? error.message : String(error) }); + } + } + + return { + runtimeRoot: discovery.runtimeRoot, + stopped, + stale, + failed, + scanErrors: discovery.scanErrors, + }; +} diff --git a/packages/cli/src/runtime.test.ts b/packages/cli/src/runtime.test.ts index e367d339..46f708b2 100644 --- a/packages/cli/src/runtime.test.ts +++ b/packages/cli/src/runtime.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it, vi } from 'vitest'; import type { + ManagedPythonDaemonStopAllResult, ManagedPythonDaemonStartResult, ManagedPythonDaemonStopResult, } from './managed-python-daemon.js'; @@ -199,13 +200,63 @@ describe('runKtxRuntime', () => { })), }; - await expect(runKtxRuntime({ command: 'stop', cliVersion: '0.2.0' }, io.io, deps)).resolves.toBe(0); + await expect(runKtxRuntime({ command: 'stop', cliVersion: '0.2.0', all: false }, io.io, deps)).resolves.toBe(0); expect(deps.stopDaemon).toHaveBeenCalledWith({ cliVersion: '0.2.0' }); expect(io.stdout()).toContain('Stopped KTX Python daemon'); expect(io.stdout()).toContain('pid: 4242'); }); + it('stops all discovered Python daemons and reports the summary', async () => { + const io = makeIo(); + const deps: KtxRuntimeDeps = { + stopAllDaemons: vi.fn(async (): Promise => ({ + runtimeRoot: '/runtime', + stopped: [ + { pid: 4242, source: 'state', url: 'http://127.0.0.1:61234', statePaths: ['/runtime/0.2.0/daemon.json'] }, + { pid: 5252, source: 'process', url: 'http://127.0.0.1:8765', statePaths: [] }, + ], + stale: [], + failed: [], + scanErrors: [], + })), + }; + + await expect(runKtxRuntime({ command: 'stop', cliVersion: '0.2.0', all: true }, io.io, deps)).resolves.toBe(0); + + expect(deps.stopAllDaemons).toHaveBeenCalledWith({ cliVersion: '0.2.0' }); + expect(io.stdout()).toContain('Stopped 2 KTX Python daemons'); + expect(io.stdout()).toContain('pid: 4242 source: state url: http://127.0.0.1:61234'); + expect(io.stdout()).toContain('pid: 5252 source: process url: http://127.0.0.1:8765'); + }); + + it('returns failure when stop all cannot stop every daemon', async () => { + const io = makeIo(); + const deps: KtxRuntimeDeps = { + stopAllDaemons: vi.fn(async (): Promise => ({ + runtimeRoot: '/runtime', + stopped: [], + stale: [], + failed: [ + { + pid: 4242, + source: 'state', + url: 'http://127.0.0.1:61234', + statePaths: ['/runtime/0.2.0/daemon.json'], + detail: 'Process still running after SIGKILL', + }, + ], + scanErrors: ['ps failed'], + })), + }; + + await expect(runKtxRuntime({ command: 'stop', cliVersion: '0.2.0', all: true }, io.io, deps)).resolves.toBe(1); + + expect(io.stderr()).toContain('Stopped 0 KTX Python daemons; failed 1'); + expect(io.stderr()).toContain('pid: 4242 source: state url: http://127.0.0.1:61234'); + expect(io.stderr()).toContain('process scan: ps failed'); + }); + it('prints runtime status as JSON', async () => { const io = makeIo(); const deps: KtxRuntimeDeps = { diff --git a/packages/cli/src/runtime.ts b/packages/cli/src/runtime.ts index fe2b5f74..e88f2b31 100644 --- a/packages/cli/src/runtime.ts +++ b/packages/cli/src/runtime.ts @@ -1,7 +1,9 @@ import type { KtxCliIo } from './cli-runtime.js'; import { + stopAllManagedPythonDaemons, startManagedPythonDaemon, stopManagedPythonDaemon, + type ManagedPythonDaemonStopAllResult, type ManagedPythonDaemonStartResult, type ManagedPythonDaemonStopResult, } from './managed-python-daemon.js'; @@ -22,7 +24,7 @@ import { export type KtxRuntimeArgs = | { command: 'install'; cliVersion: string; feature: KtxRuntimeFeature; force: boolean } | { command: 'start'; cliVersion: string; feature: KtxRuntimeFeature; force: boolean } - | { command: 'stop'; cliVersion: string } + | { command: 'stop'; cliVersion: string; all: boolean } | { command: 'status'; cliVersion: string; json: boolean } | { command: 'doctor'; cliVersion: string; json: boolean } | { command: 'prune'; cliVersion: string; dryRun: boolean; yes: boolean }; @@ -35,6 +37,7 @@ export interface KtxRuntimeDeps { force?: boolean; }) => Promise; stopDaemon?: (options: { cliVersion: string }) => Promise; + stopAllDaemons?: (options: { cliVersion: string }) => Promise; readStatus?: (options: ManagedPythonRuntimeLayoutOptions) => Promise; doctorRuntime?: (options: ManagedPythonRuntimeLayoutOptions) => Promise; pruneRuntime?: (options: { @@ -81,6 +84,58 @@ function writeDaemonStop(io: KtxCliIo, result: ManagedPythonDaemonStopResult): v io.stdout.write(`state: ${result.layout.daemonStatePath}\n`); } +function writeStopAllEntry(io: KtxCliIo, entry: { pid: number; source: string; url?: string; health?: string; detail?: string }): void { + io.stdout.write( + `pid: ${entry.pid} source: ${entry.source}${entry.url ? ` url: ${entry.url}` : ''}${ + entry.health ? ` health: ${entry.health}` : '' + }${ + entry.detail ? ` detail: ${entry.detail}` : '' + }\n`, + ); +} + +function writeDaemonStopAll(io: KtxCliIo, result: ManagedPythonDaemonStopAllResult): number { + const failed = result.failed.length + result.scanErrors.length; + if ( + result.stopped.length === 0 && + result.stale.length === 0 && + result.failed.length === 0 && + result.scanErrors.length === 0 + ) { + io.stdout.write('No KTX Python daemons found\n'); + return 0; + } + if (failed === 0) { + io.stdout.write(`Stopped ${result.stopped.length} KTX Python daemons\n`); + if (result.stale.length > 0) { + io.stdout.write(`Cleaned ${result.stale.length} stale daemon states\n`); + } + for (const entry of result.stopped) { + writeStopAllEntry(io, entry); + } + for (const entry of result.stale) { + writeStopAllEntry(io, entry); + } + return 0; + } + io.stderr.write( + `Stopped ${result.stopped.length} KTX Python daemons; failed ${result.failed.length}${ + result.stale.length > 0 ? `; cleaned stale ${result.stale.length}` : '' + }\n`, + ); + for (const entry of result.failed) { + io.stderr.write( + `pid: ${entry.pid} source: ${entry.source}${entry.url ? ` url: ${entry.url}` : ''}${ + entry.health ? ` health: ${entry.health}` : '' + } detail: ${entry.detail}\n`, + ); + } + for (const error of result.scanErrors) { + io.stderr.write(`process scan: ${error}\n`); + } + return 1; +} + function writeStatus(io: KtxCliIo, status: ManagedPythonRuntimeStatus): void { io.stdout.write('KTX Python runtime\n'); io.stdout.write(`status: ${status.kind}\n`); @@ -142,10 +197,16 @@ export async function runKtxRuntime( return 0; } if (args.command === 'stop') { - const stopDaemon = deps.stopDaemon ?? stopManagedPythonDaemon; - const result = await stopDaemon({ cliVersion: args.cliVersion }); - writeDaemonStop(io, result); - return 0; + if (args.all) { + const stopAllDaemons = deps.stopAllDaemons ?? stopAllManagedPythonDaemons; + const result = await stopAllDaemons({ cliVersion: args.cliVersion }); + return writeDaemonStopAll(io, result); + } else { + const stopDaemon = deps.stopDaemon ?? stopManagedPythonDaemon; + const result = await stopDaemon({ cliVersion: args.cliVersion }); + writeDaemonStop(io, result); + return 0; + } } if (args.command === 'status') { const readStatus = deps.readStatus ?? readManagedPythonRuntimeStatus;