From ce2d39b9a96c345cf0f5e3d199b510f97b852d0e Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Mon, 11 May 2026 12:31:33 +0200 Subject: [PATCH] feat(cli): add managed daemon HTTP helpers --- packages/cli/src/managed-python-http.test.ts | 171 ++++++++++++++++ packages/cli/src/managed-python-http.ts | 194 +++++++++++++++++++ 2 files changed, 365 insertions(+) create mode 100644 packages/cli/src/managed-python-http.test.ts create mode 100644 packages/cli/src/managed-python-http.ts diff --git a/packages/cli/src/managed-python-http.test.ts b/packages/cli/src/managed-python-http.test.ts new file mode 100644 index 00000000..c0153c45 --- /dev/null +++ b/packages/cli/src/managed-python-http.test.ts @@ -0,0 +1,171 @@ +import { describe, expect, it, vi } from 'vitest'; +import { + createManagedDaemonHttpJsonRunner, + createManagedDaemonLookerTableIdentifierParser, + createManagedDaemonSqlAnalysisPort, + createManagedPythonDaemonBaseUrlResolver, + managedDaemonDatabaseIntrospectionOptions, +} from './managed-python-http.js'; + +function io() { + let stderr = ''; + return { + io: { + stdout: { write: vi.fn() }, + stderr: { write: (chunk: string) => (stderr += chunk) }, + }, + stderr: () => stderr, + }; +} + +describe('createManagedPythonDaemonBaseUrlResolver', () => { + it('ensures the core runtime, starts the daemon, reports the URL, and caches the result', async () => { + const testIo = io(); + const ensureRuntime = vi.fn(async () => ({ + layout: {} as never, + manifest: {} as never, + })); + const startDaemon = vi.fn(async () => ({ + status: 'started' as const, + layout: {} as never, + state: { pid: 1234 } as never, + baseUrl: 'http://127.0.0.1:61234', + })); + const resolveBaseUrl = createManagedPythonDaemonBaseUrlResolver({ + cliVersion: '0.2.0', + installPolicy: 'auto', + io: testIo.io, + ensureRuntime, + startDaemon, + }); + + await expect(resolveBaseUrl()).resolves.toBe('http://127.0.0.1:61234'); + await expect(resolveBaseUrl()).resolves.toBe('http://127.0.0.1:61234'); + + expect(ensureRuntime).toHaveBeenCalledTimes(1); + expect(ensureRuntime).toHaveBeenCalledWith({ + cliVersion: '0.2.0', + installPolicy: 'auto', + io: testIo.io, + feature: 'core', + }); + expect(startDaemon).toHaveBeenCalledTimes(1); + expect(startDaemon).toHaveBeenCalledWith({ + cliVersion: '0.2.0', + features: ['core'], + force: false, + }); + expect(testIo.stderr()).toContain('Started KTX Python daemon: http://127.0.0.1:61234'); + }); + + it('reports daemon reuse without reinstalling after the first resolved URL', async () => { + const testIo = io(); + const ensureRuntime = vi.fn(async () => ({ + layout: {} as never, + manifest: {} as never, + })); + const startDaemon = vi.fn(async () => ({ + status: 'reused' as const, + layout: {} as never, + state: { pid: 1234 } as never, + baseUrl: 'http://127.0.0.1:61234', + })); + const resolveBaseUrl = createManagedPythonDaemonBaseUrlResolver({ + cliVersion: '0.2.0', + installPolicy: 'never', + io: testIo.io, + ensureRuntime, + startDaemon, + }); + + await expect(resolveBaseUrl()).resolves.toBe('http://127.0.0.1:61234'); + await expect(resolveBaseUrl()).resolves.toBe('http://127.0.0.1:61234'); + + expect(ensureRuntime).toHaveBeenCalledTimes(1); + expect(startDaemon).toHaveBeenCalledTimes(1); + expect(testIo.stderr()).toContain('Using existing KTX Python daemon: http://127.0.0.1:61234'); + }); +}); + +describe('createManagedDaemonHttpJsonRunner', () => { + it('resolves the managed base URL lazily for each HTTP JSON request', async () => { + const postJson = vi.fn(async () => ({ ok: true })); + const runner = createManagedDaemonHttpJsonRunner({ + resolveBaseUrl: async () => 'http://127.0.0.1:61234', + postJson, + }); + + await expect(runner('/sql/parse-table-identifier', { items: [] })).resolves.toEqual({ ok: true }); + + expect(postJson).toHaveBeenCalledWith('http://127.0.0.1:61234', '/sql/parse-table-identifier', { items: [] }); + }); +}); + +describe('managed daemon ingest ports', () => { + it('creates a Looker table parser backed by the managed daemon runner', async () => { + const requestJson = vi.fn(async () => ({ + results: { + 'model.explore': { + ok: true, + catalog: 'warehouse', + schema: 'public', + name: 'orders', + canonical_table: 'public.orders', + }, + }, + })); + const parser = createManagedDaemonLookerTableIdentifierParser({ requestJson }); + + await expect( + parser.parse([{ key: 'model.explore', sql_table_name: 'public.orders', dialect: 'postgres' }]), + ).resolves.toEqual({ + 'model.explore': { + ok: true, + catalog: 'warehouse', + schema: 'public', + name: 'orders', + canonical_table: 'public.orders', + }, + }); + expect(requestJson).toHaveBeenCalledWith('/sql/parse-table-identifier', { + items: [{ key: 'model.explore', sql_table_name: 'public.orders', dialect: 'postgres' }], + }); + }); + + it('creates a SQL analysis port backed by the managed daemon runner', async () => { + const requestJson = vi.fn(async () => ({ + fingerprint: 'select-orders', + normalized_sql: 'SELECT * FROM public.orders WHERE id = ?', + tables_touched: ['public.orders'], + literal_slots: [{ position: 1, type: 'number', example_value: '42' }], + })); + const sqlAnalysis = createManagedDaemonSqlAnalysisPort({ requestJson }); + + await expect(sqlAnalysis.analyzeForFingerprint('SELECT * FROM public.orders WHERE id = 42', 'postgres')).resolves + .toEqual({ + fingerprint: 'select-orders', + normalizedSql: 'SELECT * FROM public.orders WHERE id = ?', + tablesTouched: ['public.orders'], + literalSlots: [{ position: 1, type: 'number', exampleValue: '42' }], + }); + expect(requestJson).toHaveBeenCalledWith('/api/sql/analyze-for-fingerprint', { + sql: 'SELECT * FROM public.orders WHERE id = 42', + dialect: 'postgres', + }); + }); + + it('returns live-database daemon request options backed by the managed runner', async () => { + const requestJson = vi.fn(async () => ({ + connection_id: 'warehouse', + tables: [], + })); + const options = managedDaemonDatabaseIntrospectionOptions({ requestJson }); + expect(options.requestJson).toBeDefined(); + + await expect(options.requestJson?.('/database/introspect', { connection_id: 'warehouse' })).resolves.toEqual({ + connection_id: 'warehouse', + tables: [], + }); + expect(requestJson).toHaveBeenCalledWith('/database/introspect', { connection_id: 'warehouse' }); + }); +}); diff --git a/packages/cli/src/managed-python-http.ts b/packages/cli/src/managed-python-http.ts new file mode 100644 index 00000000..1cd1f7d1 --- /dev/null +++ b/packages/cli/src/managed-python-http.ts @@ -0,0 +1,194 @@ +import { request as httpRequest } from 'node:http'; +import { request as httpsRequest } from 'node:https'; +import { URL } from 'node:url'; +import { + createDaemonLookerTableIdentifierParser, + type DaemonLiveDatabaseIntrospectionOptions, + type KtxDaemonDatabaseHttpJsonRunner, + type KtxDaemonTableIdentifierHttpJsonRunner, + type LookerTableIdentifierParser, +} from '@ktx/context/ingest'; +import { + createHttpSqlAnalysisPort, + type KtxSqlAnalysisHttpJsonRunner, + type SqlAnalysisPort, +} from '@ktx/context/sql-analysis'; +import type { KtxCliIo } from './cli-runtime.js'; +import { + ensureManagedPythonCommandRuntime, + type KtxManagedPythonInstallPolicy, + type ManagedPythonCommandRuntime, +} from './managed-python-command.js'; +import { startManagedPythonDaemon, type ManagedPythonDaemonStartResult } from './managed-python-daemon.js'; + +export type ManagedPythonHttpJsonRunner = ( + path: string, + payload: Record, +) => Promise>; + +export type ManagedPythonHttpPostJson = ( + baseUrl: string, + path: string, + payload: Record, +) => Promise>; + +export interface ManagedPythonCoreDaemonOptions { + cliVersion: string; + installPolicy: KtxManagedPythonInstallPolicy; + io: KtxCliIo; + ensureRuntime?: (options: { + cliVersion: string; + installPolicy: KtxManagedPythonInstallPolicy; + io: KtxCliIo; + feature: 'core'; + }) => Promise; + startDaemon?: (options: { + cliVersion: string; + features: ['core']; + force: false; + }) => Promise; +} + +export type ManagedPythonDaemonHttpOptions = + | { + requestJson: ManagedPythonHttpJsonRunner; + } + | { + resolveBaseUrl: () => Promise; + postJson?: ManagedPythonHttpPostJson; + } + | (ManagedPythonCoreDaemonOptions & { + postJson?: ManagedPythonHttpPostJson; + }); + +function normalizedBaseUrl(baseUrl: string): string { + return baseUrl.endsWith('/') ? baseUrl : `${baseUrl}/`; +} + +function parseJsonObject(raw: string, path: string): Record { + const parsed = JSON.parse(raw) as unknown; + if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { + throw new Error(`KTX managed daemon HTTP ${path} returned non-object JSON`); + } + return parsed as Record; +} + +export async function postManagedDaemonJson( + baseUrl: string, + path: string, + payload: Record, +): Promise> { + return await new Promise((resolve, reject) => { + const target = new URL(path.replace(/^\//, ''), normalizedBaseUrl(baseUrl)); + const body = JSON.stringify(payload); + const client = target.protocol === 'https:' ? httpsRequest : httpRequest; + const request = client( + target, + { + method: 'POST', + headers: { + accept: 'application/json', + 'content-type': 'application/json', + 'content-length': Buffer.byteLength(body), + }, + }, + (response) => { + const chunks: Buffer[] = []; + response.on('data', (chunk: Buffer) => chunks.push(chunk)); + response.on('end', () => { + const text = Buffer.concat(chunks).toString('utf8'); + const statusCode = response.statusCode ?? 0; + if (statusCode < 200 || statusCode >= 300) { + reject(new Error(`KTX managed daemon HTTP ${path} failed with ${statusCode}: ${text}`)); + return; + } + try { + resolve(parseJsonObject(text, path)); + } catch (error) { + reject(error); + } + }); + }, + ); + request.on('error', reject); + request.end(body); + }); +} + +export function createManagedPythonDaemonBaseUrlResolver( + options: ManagedPythonCoreDaemonOptions, +): () => Promise { + let cachedBaseUrl: string | undefined; + + return async () => { + if (cachedBaseUrl) { + return cachedBaseUrl; + } + + const ensureRuntime = options.ensureRuntime ?? ensureManagedPythonCommandRuntime; + const startDaemon = options.startDaemon ?? startManagedPythonDaemon; + await ensureRuntime({ + cliVersion: options.cliVersion, + installPolicy: options.installPolicy, + io: options.io, + feature: 'core', + }); + const daemon = await startDaemon({ + cliVersion: options.cliVersion, + features: ['core'], + force: false, + }); + const verb = daemon.status === 'started' ? 'Started' : 'Using existing'; + options.io.stderr.write(`${verb} KTX Python daemon: ${daemon.baseUrl}\n`); + cachedBaseUrl = daemon.baseUrl; + return cachedBaseUrl; + }; +} + +function isRequestJsonOnly(options: ManagedPythonDaemonHttpOptions): options is { requestJson: ManagedPythonHttpJsonRunner } { + return 'requestJson' in options; +} + +function isResolveBaseUrlOnly( + options: ManagedPythonDaemonHttpOptions, +): options is { resolveBaseUrl: () => Promise; postJson?: ManagedPythonHttpPostJson } { + return 'resolveBaseUrl' in options; +} + +export function createManagedDaemonHttpJsonRunner( + options: ManagedPythonDaemonHttpOptions, +): ManagedPythonHttpJsonRunner { + if (isRequestJsonOnly(options)) { + return options.requestJson; + } + const resolveBaseUrl = isResolveBaseUrlOnly(options) + ? options.resolveBaseUrl + : createManagedPythonDaemonBaseUrlResolver(options); + const postJson = options.postJson ?? postManagedDaemonJson; + + return async (path, payload) => postJson(await resolveBaseUrl(), path, payload); +} + +export function createManagedDaemonLookerTableIdentifierParser( + options: ManagedPythonDaemonHttpOptions, +): LookerTableIdentifierParser { + return createDaemonLookerTableIdentifierParser({ + baseUrl: 'http://127.0.0.1:0', + requestJson: createManagedDaemonHttpJsonRunner(options) as KtxDaemonTableIdentifierHttpJsonRunner, + }); +} + +export function createManagedDaemonSqlAnalysisPort(options: ManagedPythonDaemonHttpOptions): SqlAnalysisPort { + return createHttpSqlAnalysisPort({ + baseUrl: 'http://127.0.0.1:0', + requestJson: createManagedDaemonHttpJsonRunner(options) as KtxSqlAnalysisHttpJsonRunner, + }); +} + +export function managedDaemonDatabaseIntrospectionOptions( + options: ManagedPythonDaemonHttpOptions, +): Pick { + return { + requestJson: createManagedDaemonHttpJsonRunner(options) as KtxDaemonDatabaseHttpJsonRunner, + }; +}