feat(cli): add managed daemon HTTP helpers

This commit is contained in:
Andrey Avtomonov 2026-05-11 12:31:33 +02:00
parent d686990ed5
commit ce2d39b9a9
2 changed files with 365 additions and 0 deletions

View file

@ -0,0 +1,171 @@
import { describe, expect, it, vi } from 'vitest';
import {
createManagedDaemonHttpJsonRunner,
createManagedDaemonLookerTableIdentifierParser,
createManagedDaemonSqlAnalysisPort,
createManagedPythonDaemonBaseUrlResolver,
managedDaemonDatabaseIntrospectionOptions,
} from './managed-python-http.js';
function io() {
let stderr = '';
return {
io: {
stdout: { write: vi.fn() },
stderr: { write: (chunk: string) => (stderr += chunk) },
},
stderr: () => stderr,
};
}
describe('createManagedPythonDaemonBaseUrlResolver', () => {
it('ensures the core runtime, starts the daemon, reports the URL, and caches the result', async () => {
const testIo = io();
const ensureRuntime = vi.fn(async () => ({
layout: {} as never,
manifest: {} as never,
}));
const startDaemon = vi.fn(async () => ({
status: 'started' as const,
layout: {} as never,
state: { pid: 1234 } as never,
baseUrl: 'http://127.0.0.1:61234',
}));
const resolveBaseUrl = createManagedPythonDaemonBaseUrlResolver({
cliVersion: '0.2.0',
installPolicy: 'auto',
io: testIo.io,
ensureRuntime,
startDaemon,
});
await expect(resolveBaseUrl()).resolves.toBe('http://127.0.0.1:61234');
await expect(resolveBaseUrl()).resolves.toBe('http://127.0.0.1:61234');
expect(ensureRuntime).toHaveBeenCalledTimes(1);
expect(ensureRuntime).toHaveBeenCalledWith({
cliVersion: '0.2.0',
installPolicy: 'auto',
io: testIo.io,
feature: 'core',
});
expect(startDaemon).toHaveBeenCalledTimes(1);
expect(startDaemon).toHaveBeenCalledWith({
cliVersion: '0.2.0',
features: ['core'],
force: false,
});
expect(testIo.stderr()).toContain('Started KTX Python daemon: http://127.0.0.1:61234');
});
it('reports daemon reuse without reinstalling after the first resolved URL', async () => {
const testIo = io();
const ensureRuntime = vi.fn(async () => ({
layout: {} as never,
manifest: {} as never,
}));
const startDaemon = vi.fn(async () => ({
status: 'reused' as const,
layout: {} as never,
state: { pid: 1234 } as never,
baseUrl: 'http://127.0.0.1:61234',
}));
const resolveBaseUrl = createManagedPythonDaemonBaseUrlResolver({
cliVersion: '0.2.0',
installPolicy: 'never',
io: testIo.io,
ensureRuntime,
startDaemon,
});
await expect(resolveBaseUrl()).resolves.toBe('http://127.0.0.1:61234');
await expect(resolveBaseUrl()).resolves.toBe('http://127.0.0.1:61234');
expect(ensureRuntime).toHaveBeenCalledTimes(1);
expect(startDaemon).toHaveBeenCalledTimes(1);
expect(testIo.stderr()).toContain('Using existing KTX Python daemon: http://127.0.0.1:61234');
});
});
describe('createManagedDaemonHttpJsonRunner', () => {
it('resolves the managed base URL lazily for each HTTP JSON request', async () => {
const postJson = vi.fn(async () => ({ ok: true }));
const runner = createManagedDaemonHttpJsonRunner({
resolveBaseUrl: async () => 'http://127.0.0.1:61234',
postJson,
});
await expect(runner('/sql/parse-table-identifier', { items: [] })).resolves.toEqual({ ok: true });
expect(postJson).toHaveBeenCalledWith('http://127.0.0.1:61234', '/sql/parse-table-identifier', { items: [] });
});
});
describe('managed daemon ingest ports', () => {
it('creates a Looker table parser backed by the managed daemon runner', async () => {
const requestJson = vi.fn(async () => ({
results: {
'model.explore': {
ok: true,
catalog: 'warehouse',
schema: 'public',
name: 'orders',
canonical_table: 'public.orders',
},
},
}));
const parser = createManagedDaemonLookerTableIdentifierParser({ requestJson });
await expect(
parser.parse([{ key: 'model.explore', sql_table_name: 'public.orders', dialect: 'postgres' }]),
).resolves.toEqual({
'model.explore': {
ok: true,
catalog: 'warehouse',
schema: 'public',
name: 'orders',
canonical_table: 'public.orders',
},
});
expect(requestJson).toHaveBeenCalledWith('/sql/parse-table-identifier', {
items: [{ key: 'model.explore', sql_table_name: 'public.orders', dialect: 'postgres' }],
});
});
it('creates a SQL analysis port backed by the managed daemon runner', async () => {
const requestJson = vi.fn(async () => ({
fingerprint: 'select-orders',
normalized_sql: 'SELECT * FROM public.orders WHERE id = ?',
tables_touched: ['public.orders'],
literal_slots: [{ position: 1, type: 'number', example_value: '42' }],
}));
const sqlAnalysis = createManagedDaemonSqlAnalysisPort({ requestJson });
await expect(sqlAnalysis.analyzeForFingerprint('SELECT * FROM public.orders WHERE id = 42', 'postgres')).resolves
.toEqual({
fingerprint: 'select-orders',
normalizedSql: 'SELECT * FROM public.orders WHERE id = ?',
tablesTouched: ['public.orders'],
literalSlots: [{ position: 1, type: 'number', exampleValue: '42' }],
});
expect(requestJson).toHaveBeenCalledWith('/api/sql/analyze-for-fingerprint', {
sql: 'SELECT * FROM public.orders WHERE id = 42',
dialect: 'postgres',
});
});
it('returns live-database daemon request options backed by the managed runner', async () => {
const requestJson = vi.fn(async () => ({
connection_id: 'warehouse',
tables: [],
}));
const options = managedDaemonDatabaseIntrospectionOptions({ requestJson });
expect(options.requestJson).toBeDefined();
await expect(options.requestJson?.('/database/introspect', { connection_id: 'warehouse' })).resolves.toEqual({
connection_id: 'warehouse',
tables: [],
});
expect(requestJson).toHaveBeenCalledWith('/database/introspect', { connection_id: 'warehouse' });
});
});

View file

@ -0,0 +1,194 @@
import { request as httpRequest } from 'node:http';
import { request as httpsRequest } from 'node:https';
import { URL } from 'node:url';
import {
createDaemonLookerTableIdentifierParser,
type DaemonLiveDatabaseIntrospectionOptions,
type KtxDaemonDatabaseHttpJsonRunner,
type KtxDaemonTableIdentifierHttpJsonRunner,
type LookerTableIdentifierParser,
} from '@ktx/context/ingest';
import {
createHttpSqlAnalysisPort,
type KtxSqlAnalysisHttpJsonRunner,
type SqlAnalysisPort,
} from '@ktx/context/sql-analysis';
import type { KtxCliIo } from './cli-runtime.js';
import {
ensureManagedPythonCommandRuntime,
type KtxManagedPythonInstallPolicy,
type ManagedPythonCommandRuntime,
} from './managed-python-command.js';
import { startManagedPythonDaemon, type ManagedPythonDaemonStartResult } from './managed-python-daemon.js';
export type ManagedPythonHttpJsonRunner = (
path: string,
payload: Record<string, unknown>,
) => Promise<Record<string, unknown>>;
export type ManagedPythonHttpPostJson = (
baseUrl: string,
path: string,
payload: Record<string, unknown>,
) => Promise<Record<string, unknown>>;
export interface ManagedPythonCoreDaemonOptions {
cliVersion: string;
installPolicy: KtxManagedPythonInstallPolicy;
io: KtxCliIo;
ensureRuntime?: (options: {
cliVersion: string;
installPolicy: KtxManagedPythonInstallPolicy;
io: KtxCliIo;
feature: 'core';
}) => Promise<ManagedPythonCommandRuntime>;
startDaemon?: (options: {
cliVersion: string;
features: ['core'];
force: false;
}) => Promise<ManagedPythonDaemonStartResult>;
}
export type ManagedPythonDaemonHttpOptions =
| {
requestJson: ManagedPythonHttpJsonRunner;
}
| {
resolveBaseUrl: () => Promise<string>;
postJson?: ManagedPythonHttpPostJson;
}
| (ManagedPythonCoreDaemonOptions & {
postJson?: ManagedPythonHttpPostJson;
});
function normalizedBaseUrl(baseUrl: string): string {
return baseUrl.endsWith('/') ? baseUrl : `${baseUrl}/`;
}
function parseJsonObject(raw: string, path: string): Record<string, unknown> {
const parsed = JSON.parse(raw) as unknown;
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
throw new Error(`KTX managed daemon HTTP ${path} returned non-object JSON`);
}
return parsed as Record<string, unknown>;
}
export async function postManagedDaemonJson(
baseUrl: string,
path: string,
payload: Record<string, unknown>,
): Promise<Record<string, unknown>> {
return await new Promise((resolve, reject) => {
const target = new URL(path.replace(/^\//, ''), normalizedBaseUrl(baseUrl));
const body = JSON.stringify(payload);
const client = target.protocol === 'https:' ? httpsRequest : httpRequest;
const request = client(
target,
{
method: 'POST',
headers: {
accept: 'application/json',
'content-type': 'application/json',
'content-length': Buffer.byteLength(body),
},
},
(response) => {
const chunks: Buffer[] = [];
response.on('data', (chunk: Buffer) => chunks.push(chunk));
response.on('end', () => {
const text = Buffer.concat(chunks).toString('utf8');
const statusCode = response.statusCode ?? 0;
if (statusCode < 200 || statusCode >= 300) {
reject(new Error(`KTX managed daemon HTTP ${path} failed with ${statusCode}: ${text}`));
return;
}
try {
resolve(parseJsonObject(text, path));
} catch (error) {
reject(error);
}
});
},
);
request.on('error', reject);
request.end(body);
});
}
export function createManagedPythonDaemonBaseUrlResolver(
options: ManagedPythonCoreDaemonOptions,
): () => Promise<string> {
let cachedBaseUrl: string | undefined;
return async () => {
if (cachedBaseUrl) {
return cachedBaseUrl;
}
const ensureRuntime = options.ensureRuntime ?? ensureManagedPythonCommandRuntime;
const startDaemon = options.startDaemon ?? startManagedPythonDaemon;
await ensureRuntime({
cliVersion: options.cliVersion,
installPolicy: options.installPolicy,
io: options.io,
feature: 'core',
});
const daemon = await startDaemon({
cliVersion: options.cliVersion,
features: ['core'],
force: false,
});
const verb = daemon.status === 'started' ? 'Started' : 'Using existing';
options.io.stderr.write(`${verb} KTX Python daemon: ${daemon.baseUrl}\n`);
cachedBaseUrl = daemon.baseUrl;
return cachedBaseUrl;
};
}
function isRequestJsonOnly(options: ManagedPythonDaemonHttpOptions): options is { requestJson: ManagedPythonHttpJsonRunner } {
return 'requestJson' in options;
}
function isResolveBaseUrlOnly(
options: ManagedPythonDaemonHttpOptions,
): options is { resolveBaseUrl: () => Promise<string>; postJson?: ManagedPythonHttpPostJson } {
return 'resolveBaseUrl' in options;
}
export function createManagedDaemonHttpJsonRunner(
options: ManagedPythonDaemonHttpOptions,
): ManagedPythonHttpJsonRunner {
if (isRequestJsonOnly(options)) {
return options.requestJson;
}
const resolveBaseUrl = isResolveBaseUrlOnly(options)
? options.resolveBaseUrl
: createManagedPythonDaemonBaseUrlResolver(options);
const postJson = options.postJson ?? postManagedDaemonJson;
return async (path, payload) => postJson(await resolveBaseUrl(), path, payload);
}
export function createManagedDaemonLookerTableIdentifierParser(
options: ManagedPythonDaemonHttpOptions,
): LookerTableIdentifierParser {
return createDaemonLookerTableIdentifierParser({
baseUrl: 'http://127.0.0.1:0',
requestJson: createManagedDaemonHttpJsonRunner(options) as KtxDaemonTableIdentifierHttpJsonRunner,
});
}
export function createManagedDaemonSqlAnalysisPort(options: ManagedPythonDaemonHttpOptions): SqlAnalysisPort {
return createHttpSqlAnalysisPort({
baseUrl: 'http://127.0.0.1:0',
requestJson: createManagedDaemonHttpJsonRunner(options) as KtxSqlAnalysisHttpJsonRunner,
});
}
export function managedDaemonDatabaseIntrospectionOptions(
options: ManagedPythonDaemonHttpOptions,
): Pick<DaemonLiveDatabaseIntrospectionOptions, 'requestJson'> {
return {
requestJson: createManagedDaemonHttpJsonRunner(options) as KtxDaemonDatabaseHttpJsonRunner,
};
}