feat(cli): route local adapters through managed daemon

This commit is contained in:
Andrey Avtomonov 2026-05-11 12:32:03 +02:00
parent ce2d39b9a9
commit c4bfac4506

View file

@ -20,6 +20,12 @@ import {
} from '@ktx/context/ingest';
import type { KtxLocalProject } from '@ktx/context/project';
import { createHttpSqlAnalysisPort } from '@ktx/context/sql-analysis';
import {
createManagedDaemonLookerTableIdentifierParser,
createManagedDaemonSqlAnalysisPort,
managedDaemonDatabaseIntrospectionOptions,
type ManagedPythonCoreDaemonOptions,
} from './managed-python-http.js';
function hasSnowflakeDriver(connection: unknown): boolean {
return (
@ -29,13 +35,55 @@ function hasSnowflakeDriver(connection: unknown): boolean {
);
}
function ktxCliDaemonDatabaseIntrospectionOptions(
options: KtxCliLocalIngestAdaptersOptions,
): DefaultLocalIngestAdaptersOptions['databaseIntrospection'] {
if (options.databaseIntrospectionUrl || options.databaseIntrospection?.requestJson || !options.managedDaemon) {
return options.databaseIntrospection;
}
return {
...(options.databaseIntrospection ?? {}),
...managedDaemonDatabaseIntrospectionOptions(options.managedDaemon),
};
}
function ktxCliLookerOptions(
options: KtxCliLocalIngestAdaptersOptions,
): DefaultLocalIngestAdaptersOptions['looker'] {
const looker = options.looker;
if (looker?.parser || looker?.daemonBaseUrl || process.env.KTX_DAEMON_URL || !options.managedDaemon) {
return looker;
}
return {
...(looker ?? {}),
parser: createManagedDaemonLookerTableIdentifierParser(options.managedDaemon),
};
}
function ktxCliHistoricSqlAnalysis(options: KtxCliLocalIngestAdaptersOptions) {
if (options.sqlAnalysisUrl) {
return createHttpSqlAnalysisPort({ baseUrl: options.sqlAnalysisUrl });
}
if (process.env.KTX_SQL_ANALYSIS_URL) {
return createHttpSqlAnalysisPort({ baseUrl: process.env.KTX_SQL_ANALYSIS_URL });
}
if (process.env.KTX_DAEMON_URL) {
return createHttpSqlAnalysisPort({ baseUrl: process.env.KTX_DAEMON_URL });
}
if (options.managedDaemon) {
return createManagedDaemonSqlAnalysisPort(options.managedDaemon);
}
return createHttpSqlAnalysisPort({ baseUrl: 'http://127.0.0.1:8765' });
}
function createKtxCliLiveDatabaseIntrospection(
project: KtxLocalProject,
options: DefaultLocalIngestAdaptersOptions = {},
options: KtxCliLocalIngestAdaptersOptions = {},
): LiveDatabaseIntrospectionPort {
const databaseIntrospection = ktxCliDaemonDatabaseIntrospectionOptions(options);
const daemon = createDaemonLiveDatabaseIntrospection({
connections: project.config.connections,
...options.databaseIntrospection,
...databaseIntrospection,
...(options.databaseIntrospectionUrl ? { baseUrl: options.databaseIntrospectionUrl } : {}),
});
const sqlite = createSqliteLiveDatabaseIntrospection({
@ -95,9 +143,10 @@ function createKtxCliLiveDatabaseIntrospection(
};
}
interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdaptersOptions {
export interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdaptersOptions {
historicSqlConnectionId?: string;
sqlAnalysisUrl?: string;
managedDaemon?: ManagedPythonCoreDaemonOptions;
}
function isEnabledPostgresHistoricSqlConnection(connection: KtxPostgresConnectionConfig | undefined): boolean {
@ -145,13 +194,7 @@ function historicSqlOptionsForLocalRun(project: KtxLocalProject, options: KtxCli
return undefined;
}
return {
sqlAnalysis: createHttpSqlAnalysisPort({
baseUrl:
options.sqlAnalysisUrl ??
process.env.KTX_SQL_ANALYSIS_URL ??
process.env.KTX_DAEMON_URL ??
'http://127.0.0.1:8765',
}),
sqlAnalysis: ktxCliHistoricSqlAnalysis(options),
postgresQueryClient: createEphemeralPostgresHistoricSqlClient(project, connectionId),
postgresBaselineRootDir: join(project.projectDir, '.ktx/cache/historic-sql'),
};
@ -164,6 +207,8 @@ export function createKtxCliLocalIngestAdapters(
const historicSql = historicSqlOptionsForLocalRun(project, options);
const base = createDefaultLocalIngestAdapters(project, {
...options,
databaseIntrospection: ktxCliDaemonDatabaseIntrospectionOptions(options),
looker: ktxCliLookerOptions(options),
...(historicSql ? { historicSql } : {}),
});
const liveDatabase = new LiveDatabaseSourceAdapter({