From a6cecd401d7fb5f1eaafcf43e636b2f3488bc783 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Mon, 11 May 2026 10:34:51 +0200 Subject: [PATCH] feat: manage python daemon lifecycle --- .../cli/src/managed-python-daemon.test.ts | 239 +++++++++++ packages/cli/src/managed-python-daemon.ts | 397 ++++++++++++++++++ 2 files changed, 636 insertions(+) create mode 100644 packages/cli/src/managed-python-daemon.test.ts create mode 100644 packages/cli/src/managed-python-daemon.ts diff --git a/packages/cli/src/managed-python-daemon.test.ts b/packages/cli/src/managed-python-daemon.test.ts new file mode 100644 index 00000000..24df2a78 --- /dev/null +++ b/packages/cli/src/managed-python-daemon.test.ts @@ -0,0 +1,239 @@ +import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + readManagedPythonDaemonStatus, + startManagedPythonDaemon, + stopManagedPythonDaemon, + type ManagedPythonDaemonChild, + type ManagedPythonDaemonFetch, + type ManagedPythonDaemonSpawn, + type ManagedPythonDaemonState, +} from './managed-python-daemon.js'; +import type { + InstalledKtxRuntimeManifest, + ManagedPythonRuntimeInstallResult, + ManagedPythonRuntimeLayout, +} from './managed-python-runtime.js'; + +function layout(root: string): ManagedPythonRuntimeLayout { + return { + cliVersion: '0.2.0', + runtimeRoot: join(root, 'runtime'), + versionDir: join(root, 'runtime', '0.2.0'), + venvDir: join(root, 'runtime', '0.2.0', '.venv'), + manifestPath: join(root, 'runtime', '0.2.0', 'manifest.json'), + installLogPath: join(root, 'runtime', '0.2.0', 'install.log'), + assetDir: join(root, 'assets', 'python'), + assetManifestPath: join(root, 'assets', 'python', 'manifest.json'), + pythonPath: join(root, 'runtime', '0.2.0', '.venv', 'bin', 'python'), + daemonPath: join(root, 'runtime', '0.2.0', '.venv', 'bin', 'ktx-daemon'), + daemonStatePath: join(root, 'runtime', '0.2.0', 'daemon.json'), + daemonStdoutPath: join(root, 'runtime', '0.2.0', 'daemon.stdout.log'), + daemonStderrPath: join(root, 'runtime', '0.2.0', 'daemon.stderr.log'), + }; +} + +function manifest(root: string, features: Array<'core' | 'local-embeddings'> = ['core']): InstalledKtxRuntimeManifest { + const runtimeLayout = layout(root); + return { + schemaVersion: 1, + cliVersion: '0.2.0', + installedAt: '2026-05-11T00:00:00.000Z', + asset: { + schemaVersion: 1, + distributionName: 'kaelio-ktx', + normalizedName: 'kaelio_ktx', + version: '0.2.0', + wheel: { + file: 'kaelio_ktx-0.2.0-py3-none-any.whl', + sha256: 'a'.repeat(64), + bytes: 123, + }, + }, + features, + python: { + executable: runtimeLayout.pythonPath, + daemonExecutable: runtimeLayout.daemonPath, + }, + installLog: runtimeLayout.installLogPath, + }; +} + +function installResult(root: string, features: Array<'core' | 'local-embeddings'> = ['core']): ManagedPythonRuntimeInstallResult { + return { + status: 'ready', + layout: layout(root), + asset: { + manifest: manifest(root, features).asset, + wheelPath: join(root, 'assets', 'python', 'kaelio_ktx-0.2.0-py3-none-any.whl'), + }, + manifest: manifest(root, features), + }; +} + +function makeFetch(version = '0.2.0'): ManagedPythonDaemonFetch { + return vi.fn(async () => ({ + ok: true, + status: 200, + json: async () => ({ status: 'healthy', version }), + text: async () => '', + })); +} + +function makeSpawn(pid = 4242): ManagedPythonDaemonSpawn { + return vi.fn((_command, _args, _options): ManagedPythonDaemonChild => ({ + pid, + unref: vi.fn(), + })); +} + +function runningState(root: string, overrides: Partial = {}): ManagedPythonDaemonState { + const runtimeLayout = layout(root); + return { + schemaVersion: 1, + pid: 4242, + host: '127.0.0.1', + port: 58731, + version: '0.2.0', + features: ['core'], + startedAt: '2026-05-11T00:00:00.000Z', + stdoutLog: runtimeLayout.daemonStdoutPath, + stderrLog: runtimeLayout.daemonStderrPath, + ...overrides, + }; +} + +describe('managed Python daemon lifecycle', () => { + let tempDir: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-managed-daemon-')); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('reports stopped when no daemon state exists', async () => { + const status = await readManagedPythonDaemonStatus({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + processAlive: vi.fn(() => false), + fetch: makeFetch(), + }); + + expect(status.kind).toBe('stopped'); + expect(status.detail).toContain('No daemon state'); + }); + + it('starts ktx-daemon serve-http, waits for health, and writes state', async () => { + const spawnDaemon = makeSpawn(5555); + const installRuntime = vi.fn(async () => installResult(tempDir)); + + const result = await startManagedPythonDaemon({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + features: ['core'], + installRuntime, + spawnDaemon, + fetch: makeFetch(), + allocatePort: vi.fn(async () => 61234), + now: () => new Date('2026-05-11T00:00:00.000Z'), + pollIntervalMs: 1, + }); + + expect(result.status).toBe('started'); + expect(result.baseUrl).toBe('http://127.0.0.1:61234'); + expect(installRuntime).toHaveBeenCalledWith({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + features: ['core'], + force: false, + }); + expect(spawnDaemon).toHaveBeenCalledWith( + layout(tempDir).daemonPath, + ['serve-http', '--host', '127.0.0.1', '--port', '61234'], + expect.objectContaining({ + detached: true, + env: expect.objectContaining({ KTX_DAEMON_VERSION: '0.2.0' }), + }), + ); + expect(JSON.parse(await readFile(layout(tempDir).daemonStatePath, 'utf8'))).toMatchObject({ + pid: 5555, + port: 61234, + version: '0.2.0', + features: ['core'], + stdoutLog: layout(tempDir).daemonStdoutPath, + stderrLog: layout(tempDir).daemonStderrPath, + }); + }); + + it('reuses a healthy daemon with the requested feature set', async () => { + await mkdir(layout(tempDir).versionDir, { recursive: true }); + await writeFile(layout(tempDir).daemonStatePath, `${JSON.stringify(runningState(tempDir), null, 2)}\n`); + const spawnDaemon = makeSpawn(9999); + + const result = await startManagedPythonDaemon({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + features: ['core'], + installRuntime: vi.fn(async () => installResult(tempDir)), + spawnDaemon, + fetch: makeFetch(), + processAlive: vi.fn(() => true), + pollIntervalMs: 1, + }); + + expect(result.status).toBe('reused'); + expect(result.baseUrl).toBe('http://127.0.0.1:58731'); + expect(spawnDaemon).not.toHaveBeenCalled(); + }); + + it('starts a fresh daemon when the previous state is stale', async () => { + await mkdir(layout(tempDir).versionDir, { recursive: true }); + await writeFile( + layout(tempDir).daemonStatePath, + `${JSON.stringify(runningState(tempDir, { version: '0.1.0' }), null, 2)}\n`, + ); + + const result = await startManagedPythonDaemon({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + features: ['core'], + installRuntime: vi.fn(async () => installResult(tempDir)), + spawnDaemon: makeSpawn(6666), + fetch: makeFetch(), + processAlive: vi.fn(() => true), + killProcess: vi.fn(), + allocatePort: vi.fn(async () => 61235), + now: () => new Date('2026-05-11T00:00:00.000Z'), + pollIntervalMs: 1, + }); + + expect(result.status).toBe('started'); + expect(JSON.parse(await readFile(layout(tempDir).daemonStatePath, 'utf8'))).toMatchObject({ + pid: 6666, + port: 61235, + version: '0.2.0', + }); + }); + + it('stops a recorded daemon and removes the state file', async () => { + await mkdir(layout(tempDir).versionDir, { recursive: true }); + await writeFile(layout(tempDir).daemonStatePath, `${JSON.stringify(runningState(tempDir), null, 2)}\n`); + const killProcess = vi.fn(); + + const result = await stopManagedPythonDaemon({ + cliVersion: '0.2.0', + runtimeRoot: join(tempDir, 'runtime'), + processAlive: vi.fn(() => true), + killProcess, + }); + + expect(result.status).toBe('stopped'); + expect(killProcess).toHaveBeenCalledWith(4242); + await expect(readFile(layout(tempDir).daemonStatePath, 'utf8')).rejects.toThrow(); + }); +}); diff --git a/packages/cli/src/managed-python-daemon.ts b/packages/cli/src/managed-python-daemon.ts new file mode 100644 index 00000000..159b293b --- /dev/null +++ b/packages/cli/src/managed-python-daemon.ts @@ -0,0 +1,397 @@ +import { spawn } from 'node:child_process'; +import { mkdir, open, readFile, rm, writeFile } from 'node:fs/promises'; +import { createServer } from 'node:net'; +import { setTimeout as delay } from 'node:timers/promises'; +import { z } from 'zod'; +import { + installManagedPythonRuntime, + managedPythonRuntimeLayout, + runtimeFeatureSchema, + type KtxRuntimeFeature, + type ManagedPythonRuntimeInstallOptions, + type ManagedPythonRuntimeInstallResult, + type ManagedPythonRuntimeLayout, + type ManagedPythonRuntimeLayoutOptions, +} from './managed-python-runtime.js'; + +export interface ManagedPythonDaemonState { + schemaVersion: 1; + pid: number; + host: '127.0.0.1'; + port: number; + version: string; + features: KtxRuntimeFeature[]; + startedAt: string; + stdoutLog: string; + stderrLog: string; +} + +export type ManagedPythonDaemonStatus = + | { kind: 'stopped'; detail: string; layout: ManagedPythonRuntimeLayout } + | { kind: 'running'; detail: string; layout: ManagedPythonRuntimeLayout; state: ManagedPythonDaemonState; baseUrl: string } + | { kind: 'stale'; detail: string; layout: ManagedPythonRuntimeLayout; state?: ManagedPythonDaemonState }; + +export interface ManagedPythonDaemonStartResult { + status: 'started' | 'reused'; + layout: ManagedPythonRuntimeLayout; + state: ManagedPythonDaemonState; + baseUrl: string; +} + +export interface ManagedPythonDaemonStopResult { + status: 'stopped' | 'already-stopped'; + layout: ManagedPythonRuntimeLayout; + state?: ManagedPythonDaemonState; +} + +export interface ManagedPythonDaemonChild { + pid?: number; + unref(): void; +} + +export type ManagedPythonDaemonSpawn = ( + command: string, + args: string[], + options: { + detached: boolean; + stdio: ['ignore', number, number]; + env: NodeJS.ProcessEnv; + }, +) => ManagedPythonDaemonChild; + +export type ManagedPythonDaemonFetch = ( + url: string, +) => Promise<{ + ok: boolean; + status: number; + json(): Promise; + text(): Promise; +}>; + +export interface ManagedPythonDaemonStartOptions extends ManagedPythonRuntimeLayoutOptions { + features: KtxRuntimeFeature[]; + force?: boolean; + installRuntime?: (options: ManagedPythonRuntimeInstallOptions) => Promise; + spawnDaemon?: ManagedPythonDaemonSpawn; + fetch?: ManagedPythonDaemonFetch; + allocatePort?: () => Promise; + processAlive?: (pid: number) => boolean; + killProcess?: (pid: number) => void; + now?: () => Date; + startupTimeoutMs?: number; + pollIntervalMs?: number; +} + +export interface ManagedPythonDaemonStatusOptions extends ManagedPythonRuntimeLayoutOptions { + fetch?: ManagedPythonDaemonFetch; + processAlive?: (pid: number) => boolean; +} + +export interface ManagedPythonDaemonStopOptions extends ManagedPythonRuntimeLayoutOptions { + processAlive?: (pid: number) => boolean; + killProcess?: (pid: number) => void; +} + +const daemonStateSchema = z.object({ + schemaVersion: z.literal(1), + pid: z.number().int().positive(), + host: z.literal('127.0.0.1'), + port: z.number().int().min(1).max(65535), + version: z.string().min(1), + features: z.array(runtimeFeatureSchema).min(1), + startedAt: z.string().min(1), + stdoutLog: z.string().min(1), + stderrLog: z.string().min(1), +}); + +function normalizeFeatures(features: KtxRuntimeFeature[]): KtxRuntimeFeature[] { + const requested = new Set(['core', ...features]); + return runtimeFeatureSchema.options.filter((feature) => requested.has(feature)); +} + +function hasFeatures(state: ManagedPythonDaemonState, features: KtxRuntimeFeature[]): boolean { + return normalizeFeatures(features).every((feature) => state.features.includes(feature)); +} + +function defaultFetch(url: string): ReturnType { + return fetch(url) as ReturnType; +} + +function defaultProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +function defaultKillProcess(pid: number): void { + try { + process.kill(pid, 'SIGTERM'); + } catch (error) { + const code = (error as { code?: unknown }).code; + if (code !== 'ESRCH') { + throw error; + } + } +} + +function defaultSpawnDaemon( + command: string, + args: string[], + options: Parameters[2], +): ManagedPythonDaemonChild { + return spawn(command, args, options); +} + +function baseUrl(state: Pick): string { + return `http://${state.host}:${state.port}`; +} + +async function readState(path: string): Promise { + try { + return daemonStateSchema.parse(JSON.parse(await readFile(path, 'utf8')) as unknown); + } catch (error) { + const code = (error as { code?: unknown }).code; + if (code === 'ENOENT') { + return undefined; + } + throw error; + } +} + +async function writeState(path: string, state: ManagedPythonDaemonState): Promise { + await writeFile(path, `${JSON.stringify(state, null, 2)}\n`); +} + +async function healthOk(input: { + state: ManagedPythonDaemonState; + cliVersion: string; + fetch: ManagedPythonDaemonFetch; +}): Promise<{ ok: true } | { ok: false; detail: string }> { + try { + const response = await input.fetch(`${baseUrl(input.state)}/health`); + if (!response.ok) { + return { ok: false, detail: `Health check returned HTTP ${response.status}: ${await response.text()}` }; + } + const body = (await response.json()) as unknown; + if (!body || typeof body !== 'object' || Array.isArray(body)) { + return { ok: false, detail: 'Health check returned non-object JSON' }; + } + const record = body as Record; + if (record.status !== 'healthy') { + return { ok: false, detail: `Health check returned status ${String(record.status)}` }; + } + if (record.version !== input.cliVersion) { + return { + ok: false, + detail: `Daemon version ${String(record.version)} does not match CLI ${input.cliVersion}`, + }; + } + return { ok: true }; + } catch (error) { + return { ok: false, detail: error instanceof Error ? error.message : String(error) }; + } +} + +export async function readManagedPythonDaemonStatus( + options: ManagedPythonDaemonStatusOptions, +): Promise { + const layout = managedPythonRuntimeLayout(options); + let state: ManagedPythonDaemonState | undefined; + try { + state = await readState(layout.daemonStatePath); + } catch (error) { + return { + kind: 'stale', + detail: `Daemon state is invalid: ${error instanceof Error ? error.message : String(error)}`, + layout, + }; + } + if (!state) { + return { kind: 'stopped', detail: `No daemon state at ${layout.daemonStatePath}`, layout }; + } + if (state.version !== options.cliVersion) { + return { + kind: 'stale', + detail: `Daemon is for CLI ${state.version}, current CLI is ${options.cliVersion}`, + layout, + state, + }; + } + const processAlive = options.processAlive ?? defaultProcessAlive; + if (!processAlive(state.pid)) { + return { kind: 'stale', detail: `Daemon process ${state.pid} is not running`, layout, state }; + } + const health = await healthOk({ + state, + cliVersion: options.cliVersion, + fetch: options.fetch ?? defaultFetch, + }); + if (!health.ok) { + return { kind: 'stale', detail: health.detail, layout, state }; + } + return { kind: 'running', detail: `Daemon running at ${baseUrl(state)}`, layout, state, baseUrl: baseUrl(state) }; +} + +export async function allocateDaemonPort(): Promise { + return await new Promise((resolve, reject) => { + const server = createServer(); + server.on('error', reject); + server.listen(0, '127.0.0.1', () => { + const address = server.address(); + server.close(() => { + if (address && typeof address === 'object') { + resolve(address.port); + return; + } + reject(new Error('Failed to allocate a daemon port')); + }); + }); + }); +} + +async function waitForHealth(input: { + state: ManagedPythonDaemonState; + cliVersion: string; + fetch: ManagedPythonDaemonFetch; + timeoutMs: number; + pollIntervalMs: number; +}): Promise { + const deadline = Date.now() + input.timeoutMs; + let lastDetail = 'daemon did not answer health checks'; + while (Date.now() <= deadline) { + const health = await healthOk({ + state: input.state, + cliVersion: input.cliVersion, + fetch: input.fetch, + }); + if (health.ok) { + return; + } + lastDetail = health.detail; + await delay(input.pollIntervalMs); + } + throw new Error(`KTX Python daemon failed to start: ${lastDetail}. stderr: ${input.state.stderrLog}`); +} + +async function removeState(layout: ManagedPythonRuntimeLayout): Promise { + await rm(layout.daemonStatePath, { force: true }); +} + +async function stopRecordedDaemon(input: { + layout: ManagedPythonRuntimeLayout; + state: ManagedPythonDaemonState; + processAlive: (pid: number) => boolean; + killProcess: (pid: number) => void; +}): Promise { + if (input.processAlive(input.state.pid)) { + input.killProcess(input.state.pid); + } + await removeState(input.layout); +} + +export async function startManagedPythonDaemon( + options: ManagedPythonDaemonStartOptions, +): Promise { + const features = normalizeFeatures(options.features); + const installRuntime = options.installRuntime ?? installManagedPythonRuntime; + const layoutOverrides = { + ...(options.runtimeRoot !== undefined ? { runtimeRoot: options.runtimeRoot } : {}), + ...(options.assetDir !== undefined ? { assetDir: options.assetDir } : {}), + ...(options.platform !== undefined ? { platform: options.platform } : {}), + ...(options.env !== undefined ? { env: options.env } : {}), + ...(options.homeDir !== undefined ? { homeDir: options.homeDir } : {}), + }; + const layout = managedPythonRuntimeLayout({ cliVersion: options.cliVersion, ...layoutOverrides }); + const processAlive = options.processAlive ?? defaultProcessAlive; + const killProcess = options.killProcess ?? defaultKillProcess; + const fetchImpl = options.fetch ?? defaultFetch; + + const status = await readManagedPythonDaemonStatus({ + cliVersion: options.cliVersion, + ...layoutOverrides, + fetch: fetchImpl, + processAlive, + }); + if (options.force !== true && status.kind === 'running' && hasFeatures(status.state, features)) { + return { status: 'reused', layout, state: status.state, baseUrl: status.baseUrl }; + } + if (status.state) { + await stopRecordedDaemon({ layout, state: status.state, processAlive, killProcess }); + } else { + await removeState(layout); + } + + const installed = await installRuntime({ + cliVersion: options.cliVersion, + ...layoutOverrides, + features, + force: false, + }); + + await mkdir(layout.versionDir, { recursive: true }); + const stdout = await open(layout.daemonStdoutPath, 'a'); + const stderr = await open(layout.daemonStderrPath, 'a'); + try { + const port = await (options.allocatePort ?? allocateDaemonPort)(); + const spawnDaemon = options.spawnDaemon ?? defaultSpawnDaemon; + const child = spawnDaemon( + installed.manifest.python.daemonExecutable, + ['serve-http', '--host', '127.0.0.1', '--port', String(port)], + { + detached: true, + stdio: ['ignore', stdout.fd, stderr.fd], + env: { + ...process.env, + KTX_DAEMON_VERSION: options.cliVersion, + }, + }, + ); + child.unref(); + if (!child.pid) { + throw new Error(`KTX Python daemon did not report a pid. stderr: ${layout.daemonStderrPath}`); + } + const state: ManagedPythonDaemonState = { + schemaVersion: 1, + pid: child.pid, + host: '127.0.0.1', + port, + version: options.cliVersion, + features: installed.manifest.features, + startedAt: (options.now ?? (() => new Date()))().toISOString(), + stdoutLog: layout.daemonStdoutPath, + stderrLog: layout.daemonStderrPath, + }; + await waitForHealth({ + state, + cliVersion: options.cliVersion, + fetch: fetchImpl, + timeoutMs: options.startupTimeoutMs ?? 10_000, + pollIntervalMs: options.pollIntervalMs ?? 100, + }); + await writeState(layout.daemonStatePath, state); + return { status: 'started', layout, state, baseUrl: baseUrl(state) }; + } finally { + await stdout.close(); + await stderr.close(); + } +} + +export async function stopManagedPythonDaemon( + options: ManagedPythonDaemonStopOptions, +): Promise { + const layout = managedPythonRuntimeLayout(options); + const state = await readState(layout.daemonStatePath); + if (!state) { + return { status: 'already-stopped', layout }; + } + await stopRecordedDaemon({ + layout, + state, + processAlive: options.processAlive ?? defaultProcessAlive, + killProcess: options.killProcess ?? defaultKillProcess, + }); + return { status: 'stopped', layout, state }; +}