feat(cli): add reliable runtime stop --all (#30)

* feat(cli): add runtime stop all

* test(cli): avoid Metabase secret fixture path collision

---------

Co-authored-by: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com>
This commit is contained in:
Andrey Avtomonov 2026-05-12 13:00:08 +02:00 committed by GitHub
parent 68c7e27bad
commit 36c3f93ad7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 734 additions and 17 deletions

View file

@ -1,7 +1,9 @@
import { spawn } from 'node:child_process';
import { mkdir, open, readFile, rm, writeFile } from 'node:fs/promises';
import { execFile, spawn } from 'node:child_process';
import { mkdir, open, readdir, readFile, rm, writeFile } from 'node:fs/promises';
import { createServer } from 'node:net';
import { join } from 'node:path';
import { setTimeout as delay } from 'node:timers/promises';
import { promisify } from 'node:util';
import { z } from 'zod';
import {
installManagedPythonRuntime,
@ -44,6 +46,35 @@ export interface ManagedPythonDaemonStopResult {
state?: ManagedPythonDaemonState;
}
export interface ManagedPythonDaemonProcessInfo {
pid: number;
command: string;
}
export type ManagedPythonDaemonStopAllSource = 'state' | 'process';
export interface ManagedPythonDaemonStopAllEntry {
pid: number;
source: ManagedPythonDaemonStopAllSource;
url?: string;
health?: 'healthy' | 'unreachable';
version?: string;
command?: string;
statePaths: string[];
}
export interface ManagedPythonDaemonStopAllFailure extends ManagedPythonDaemonStopAllEntry {
detail: string;
}
export interface ManagedPythonDaemonStopAllResult {
runtimeRoot: string;
stopped: ManagedPythonDaemonStopAllEntry[];
stale: ManagedPythonDaemonStopAllEntry[];
failed: ManagedPythonDaemonStopAllFailure[];
scanErrors: string[];
}
export interface ManagedPythonDaemonChild {
pid?: number;
unref(): void;
@ -68,6 +99,8 @@ export type ManagedPythonDaemonFetch = (
text(): Promise<string>;
}>;
export type ManagedPythonDaemonKillProcess = (pid: number, signal?: NodeJS.Signals) => void;
export interface ManagedPythonDaemonStartOptions extends ManagedPythonRuntimeLayoutOptions {
features: KtxRuntimeFeature[];
force?: boolean;
@ -76,7 +109,7 @@ export interface ManagedPythonDaemonStartOptions extends ManagedPythonRuntimeLay
fetch?: ManagedPythonDaemonFetch;
allocatePort?: () => Promise<number>;
processAlive?: (pid: number) => boolean;
killProcess?: (pid: number) => void;
killProcess?: ManagedPythonDaemonKillProcess;
now?: () => Date;
startupTimeoutMs?: number;
pollIntervalMs?: number;
@ -89,9 +122,20 @@ export interface ManagedPythonDaemonStatusOptions extends ManagedPythonRuntimeLa
export interface ManagedPythonDaemonStopOptions extends ManagedPythonRuntimeLayoutOptions {
processAlive?: (pid: number) => boolean;
killProcess?: (pid: number) => void;
killProcess?: ManagedPythonDaemonKillProcess;
}
export interface ManagedPythonDaemonStopAllOptions extends ManagedPythonRuntimeLayoutOptions {
listProcesses?: () => Promise<ManagedPythonDaemonProcessInfo[]>;
processAlive?: (pid: number) => boolean;
killProcess?: ManagedPythonDaemonKillProcess;
stopGraceMs?: number;
pollIntervalMs?: number;
healthProbeMs?: number;
}
const execFileAsync = promisify(execFile);
const daemonStateSchema = z.object({
schemaVersion: z.literal(1),
pid: z.number().int().positive(),
@ -126,9 +170,9 @@ function defaultProcessAlive(pid: number): boolean {
}
}
function defaultKillProcess(pid: number): void {
function defaultKillProcess(pid: number, signal: NodeJS.Signals = 'SIGTERM'): void {
try {
process.kill(pid, 'SIGTERM');
process.kill(pid, signal);
} catch (error) {
const code = (error as { code?: unknown }).code;
if (code !== 'ESRCH') {
@ -293,7 +337,7 @@ async function stopRecordedDaemon(input: {
layout: ManagedPythonRuntimeLayout;
state: ManagedPythonDaemonState;
processAlive: (pid: number) => boolean;
killProcess: (pid: number) => void;
killProcess: ManagedPythonDaemonKillProcess;
}): Promise<void> {
if (input.processAlive(input.state.pid)) {
input.killProcess(input.state.pid);
@ -301,6 +345,323 @@ async function stopRecordedDaemon(input: {
await removeState(input.layout);
}
function runtimeRootForStopAll(options: ManagedPythonRuntimeLayoutOptions): string {
return managedPythonRuntimeLayout(options).runtimeRoot;
}
async function removeStatePaths(paths: string[]): Promise<void> {
await Promise.all([...new Set(paths)].map((path) => rm(path, { force: true })));
}
interface ManagedPythonDaemonStopCandidate {
pid: number;
source: ManagedPythonDaemonStopAllSource;
host?: string;
port?: number;
version?: string;
command?: string;
statePaths: string[];
}
function candidateUrl(candidate: ManagedPythonDaemonStopCandidate): string | undefined {
if (!candidate.host || !candidate.port) {
return undefined;
}
return `http://${candidate.host}:${candidate.port}`;
}
function candidateEntry(candidate: ManagedPythonDaemonStopCandidate): ManagedPythonDaemonStopAllEntry {
return {
pid: candidate.pid,
source: candidate.source,
...(candidateUrl(candidate) ? { url: candidateUrl(candidate) } : {}),
...(candidate.version ? { version: candidate.version } : {}),
...(candidate.command ? { command: candidate.command } : {}),
statePaths: [...candidate.statePaths],
};
}
async function probeCandidateHealth(
candidate: ManagedPythonDaemonStopCandidate,
timeoutMs: number,
): Promise<'healthy' | 'unreachable' | undefined> {
const url = candidateUrl(candidate);
if (!url) {
return undefined;
}
const controller = new AbortController();
const timeout = setTimeout(() => {
controller.abort();
}, timeoutMs);
try {
const response = await fetch(`${url}/health`, { signal: controller.signal });
if (!response.ok) {
return 'unreachable';
}
const body = (await response.json()) as unknown;
if (!body || typeof body !== 'object' || Array.isArray(body)) {
return 'unreachable';
}
return (body as Record<string, unknown>).status === 'healthy' ? 'healthy' : 'unreachable';
} catch {
return 'unreachable';
} finally {
clearTimeout(timeout);
}
}
async function readStateCandidates(runtimeRoot: string): Promise<ManagedPythonDaemonStopCandidate[]> {
let entries;
try {
entries = await readdir(runtimeRoot, { withFileTypes: true });
} catch (error) {
const code = (error as { code?: unknown }).code;
if (code === 'ENOENT') {
return [];
}
throw error;
}
const candidates: ManagedPythonDaemonStopCandidate[] = [];
for (const entry of entries) {
if (!entry.isDirectory()) {
continue;
}
const statePath = join(runtimeRoot, entry.name, 'daemon.json');
let state: ManagedPythonDaemonState | undefined;
try {
state = await readState(statePath);
} catch {
continue;
}
if (!state) {
continue;
}
candidates.push({
pid: state.pid,
source: 'state',
host: state.host,
port: state.port,
version: state.version,
statePaths: [statePath],
});
}
return candidates;
}
function tokenizeCommand(command: string): string[] {
const tokens: string[] = [];
for (const match of command.matchAll(/"([^"]*)"|'([^']*)'|(\S+)/g)) {
tokens.push(match[1] ?? match[2] ?? match[3] ?? '');
}
return tokens;
}
function executableName(token: string): string {
return token.split(/[\\/]/).at(-1) ?? token;
}
function isKtxDaemonExecutable(token: string): boolean {
return executableName(token) === 'ktx-daemon' || executableName(token) === 'ktx-daemon.exe';
}
function normalizedExecutableName(token: string): string {
return executableName(token).replace(/\.exe$/i, '').toLowerCase();
}
function hasUvRunPrefix(tokens: string[], daemonIndex: number): boolean {
return normalizedExecutableName(tokens[0] ?? '') === 'uv' && tokens.slice(1, daemonIndex).includes('run');
}
function isPythonExecutable(token: string): boolean {
const name = normalizedExecutableName(token);
return name === 'python' || name === 'python3';
}
function hasPythonModulePrefix(tokens: string[], moduleFlagIndex: number): boolean {
if (moduleFlagIndex === 1 && isPythonExecutable(tokens[0] ?? '')) {
return true;
}
return (
normalizedExecutableName(tokens[0] ?? '') === 'uv' &&
tokens.slice(1, moduleFlagIndex).includes('run') &&
tokens.some((token, index) => index < moduleFlagIndex && isPythonExecutable(token))
);
}
function isKtxDaemonServeHttp(tokens: string[]): boolean {
for (let index = 0; index < tokens.length; index += 1) {
if (
isKtxDaemonExecutable(tokens[index] ?? '') &&
tokens[index + 1] === 'serve-http' &&
(index === 0 || hasUvRunPrefix(tokens, index))
) {
return true;
}
if (
tokens[index] === '-m' &&
tokens[index + 1] === 'ktx_daemon' &&
tokens[index + 2] === 'serve-http' &&
hasPythonModulePrefix(tokens, index)
) {
return true;
}
}
return false;
}
function parseCommandOption(tokens: string[], option: string): string | undefined {
for (let index = 0; index < tokens.length; index += 1) {
const token = tokens[index];
if (token === option) {
return tokens[index + 1];
}
if (token?.startsWith(`${option}=`)) {
return token.slice(option.length + 1);
}
}
return undefined;
}
function processCandidate(processInfo: ManagedPythonDaemonProcessInfo): ManagedPythonDaemonStopCandidate | undefined {
const tokens = tokenizeCommand(processInfo.command);
if (!isKtxDaemonServeHttp(tokens)) {
return undefined;
}
const host = parseCommandOption(tokens, '--host') ?? '127.0.0.1';
const rawPort = parseCommandOption(tokens, '--port');
const parsedPort = rawPort ? Number.parseInt(rawPort, 10) : 8765;
const port = Number.isInteger(parsedPort) && parsedPort >= 1 && parsedPort <= 65535 ? parsedPort : 8765;
return {
pid: processInfo.pid,
source: 'process',
host,
port,
command: processInfo.command,
statePaths: [],
};
}
function mergeCandidates(candidates: ManagedPythonDaemonStopCandidate[]): ManagedPythonDaemonStopCandidate[] {
const byPid = new Map<number, ManagedPythonDaemonStopCandidate>();
for (const candidate of candidates) {
const existing = byPid.get(candidate.pid);
if (!existing) {
byPid.set(candidate.pid, { ...candidate, statePaths: [...candidate.statePaths] });
continue;
}
existing.statePaths.push(...candidate.statePaths);
if (existing.source === 'process' && candidate.source === 'state') {
byPid.set(candidate.pid, {
...candidate,
statePaths: [...new Set([...existing.statePaths, ...candidate.statePaths])],
});
} else {
existing.statePaths = [...new Set(existing.statePaths)];
}
}
return [...byPid.values()].sort((left, right) => left.pid - right.pid);
}
function parsePosixProcessList(output: string): ManagedPythonDaemonProcessInfo[] {
const processes: ManagedPythonDaemonProcessInfo[] = [];
for (const line of output.split(/\r?\n/)) {
const match = line.match(/^\s*(\d+)\s+(.+)$/);
if (!match) {
continue;
}
processes.push({ pid: Number.parseInt(match[1], 10), command: match[2] });
}
return processes;
}
function parseWindowsProcessList(output: string): ManagedPythonDaemonProcessInfo[] {
if (!output.trim()) {
return [];
}
const parsed = JSON.parse(output) as unknown;
const records = Array.isArray(parsed) ? parsed : [parsed];
const processes: ManagedPythonDaemonProcessInfo[] = [];
for (const record of records) {
if (!record || typeof record !== 'object') {
continue;
}
const value = record as Record<string, unknown>;
const pid = value.ProcessId;
const command = value.CommandLine;
if (typeof pid === 'number' && typeof command === 'string' && command.length > 0) {
processes.push({ pid, command });
}
}
return processes;
}
async function defaultListProcesses(platform: NodeJS.Platform = process.platform): Promise<ManagedPythonDaemonProcessInfo[]> {
if (platform === 'win32') {
const command = [
'Get-CimInstance Win32_Process',
'| Where-Object { $_.CommandLine -ne $null }',
'| Select-Object ProcessId,CommandLine',
'| ConvertTo-Json -Compress',
].join(' ');
const { stdout } = await execFileAsync('powershell.exe', ['-NoProfile', '-Command', command], {
encoding: 'utf8',
maxBuffer: 10 * 1024 * 1024,
});
return parseWindowsProcessList(stdout);
}
const { stdout } = await execFileAsync('ps', ['-axo', 'pid=,command='], {
encoding: 'utf8',
maxBuffer: 10 * 1024 * 1024,
});
return parsePosixProcessList(stdout);
}
async function waitUntilStopped(input: {
pid: number;
processAlive: (pid: number) => boolean;
timeoutMs: number;
pollIntervalMs: number;
}): Promise<boolean> {
const deadline = Date.now() + input.timeoutMs;
do {
if (!input.processAlive(input.pid)) {
return true;
}
if (Date.now() >= deadline) {
break;
}
await delay(input.pollIntervalMs);
} while (Date.now() <= deadline);
return !input.processAlive(input.pid);
}
async function discoverStopAllCandidates(
options: ManagedPythonDaemonStopAllOptions,
): Promise<{
runtimeRoot: string;
candidates: ManagedPythonDaemonStopCandidate[];
scanErrors: string[];
}> {
const runtimeRoot = runtimeRootForStopAll(options);
const stateCandidates = await readStateCandidates(runtimeRoot);
const scanErrors: string[] = [];
let processCandidates: ManagedPythonDaemonStopCandidate[] = [];
try {
const processes = await (options.listProcesses ?? defaultListProcesses)();
processCandidates = processes.flatMap((processInfo) => {
const candidate = processCandidate(processInfo);
return candidate ? [candidate] : [];
});
} catch (error) {
scanErrors.push(error instanceof Error ? error.message : String(error));
}
return {
runtimeRoot,
candidates: mergeCandidates([...stateCandidates, ...processCandidates]),
scanErrors,
};
}
export async function startManagedPythonDaemon(
options: ManagedPythonDaemonStartOptions,
): Promise<ManagedPythonDaemonStartResult> {
@ -404,3 +765,63 @@ export async function stopManagedPythonDaemon(
});
return { status: 'stopped', layout, state };
}
export async function stopAllManagedPythonDaemons(
options: ManagedPythonDaemonStopAllOptions,
): Promise<ManagedPythonDaemonStopAllResult> {
const processAlive = options.processAlive ?? defaultProcessAlive;
const killProcess = options.killProcess ?? defaultKillProcess;
const stopGraceMs = options.stopGraceMs ?? 500;
const pollIntervalMs = options.pollIntervalMs ?? 50;
const healthProbeMs = options.healthProbeMs ?? 100;
const discovery = await discoverStopAllCandidates(options);
const stopped: ManagedPythonDaemonStopAllEntry[] = [];
const stale: ManagedPythonDaemonStopAllEntry[] = [];
const failed: ManagedPythonDaemonStopAllFailure[] = [];
for (const candidate of discovery.candidates) {
const health = await probeCandidateHealth(candidate, healthProbeMs);
const entry = { ...candidateEntry(candidate), ...(health ? { health } : {}) };
if (!processAlive(candidate.pid)) {
await removeStatePaths(candidate.statePaths);
stale.push(entry);
continue;
}
try {
killProcess(candidate.pid, 'SIGTERM');
if (
!(await waitUntilStopped({
pid: candidate.pid,
processAlive,
timeoutMs: stopGraceMs,
pollIntervalMs,
}))
) {
killProcess(candidate.pid, 'SIGKILL');
if (
!(await waitUntilStopped({
pid: candidate.pid,
processAlive,
timeoutMs: stopGraceMs,
pollIntervalMs,
}))
) {
failed.push({ ...entry, detail: 'Process still running after SIGKILL' });
continue;
}
}
await removeStatePaths(candidate.statePaths);
stopped.push(entry);
} catch (error) {
failed.push({ ...entry, detail: error instanceof Error ? error.message : String(error) });
}
}
return {
runtimeRoot: discovery.runtimeRoot,
stopped,
stale,
failed,
scanErrors: discovery.scanErrors,
};
}