feat: wire historic sql readers for bigquery and snowflake

This commit is contained in:
Andrey Avtomonov 2026-05-11 19:27:05 +02:00
parent 5d72c1f240
commit 98a475d6c5
2 changed files with 293 additions and 16 deletions

View file

@ -0,0 +1,141 @@
import { mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { loadKtxProject } from '@ktx/context/project';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { createKtxCliLocalIngestAdapters } from './local-adapters.js';
function sqlAnalysisStub() {
return {
async analyzeForFingerprint(sql: string) {
return {
fingerprint: 'fp',
normalizedSql: sql,
tablesTouched: [],
literalSlots: [],
};
},
async analyzeBatch() {
return new Map();
},
};
}
async function writeProject(projectDir: string, body: string): Promise<void> {
await writeFile(join(projectDir, 'ktx.yaml'), body, 'utf-8');
}
describe('CLI local ingest adapters', () => {
let tempDir: string;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'ktx-cli-local-adapters-'));
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
it('registers Postgres historic SQL from the requested connection', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' readonly: true',
' historicSql:',
' enabled: true',
' dialect: postgres',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'warehouse',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
it('registers BigQuery historic SQL from the requested connection', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' bq:',
' driver: bigquery',
' readonly: true',
' dataset_id: analytics',
' location: us',
' credentials_json: \'{"project_id":"demo-project"}\'',
' historicSql:',
' enabled: true',
' dialect: bigquery',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'bq',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
it('registers Snowflake historic SQL from the requested connection', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' sf:',
' driver: snowflake',
' readonly: true',
' account: acct',
' warehouse: wh',
' database: ANALYTICS',
' schema_name: PUBLIC',
' username: reader',
' password: env:SNOWFLAKE_PASSWORD',
' historicSql:',
' enabled: true',
' dialect: snowflake',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'sf',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
});

View file

@ -1,5 +1,10 @@
import { join } from 'node:path';
import { createBigQueryLiveDatabaseIntrospection, isKtxBigQueryConnectionConfig } from '@ktx/connector-bigquery';
import {
createBigQueryLiveDatabaseIntrospection,
isKtxBigQueryConnectionConfig,
KtxBigQueryScanConnector,
type KtxBigQueryConnectionConfig,
} from '@ktx/connector-bigquery';
import { createClickHouseLiveDatabaseIntrospection, isKtxClickHouseConnectionConfig } from '@ktx/connector-clickhouse';
import { createMysqlLiveDatabaseIntrospection, isKtxMysqlConnectionConfig } from '@ktx/connector-mysql';
import {
@ -11,15 +16,19 @@ import {
import { createSqliteLiveDatabaseIntrospection, isKtxSqliteConnectionConfig } from '@ktx/connector-sqlite';
import { createSqlServerLiveDatabaseIntrospection, isKtxSqlServerConnectionConfig } from '@ktx/connector-sqlserver';
import {
BigQueryHistoricSqlQueryHistoryReader,
createDaemonLiveDatabaseIntrospection,
createDefaultLocalIngestAdapters,
type DefaultLocalIngestAdaptersOptions,
type HistoricSqlReader,
type LiveDatabaseIntrospectionPort,
LiveDatabaseSourceAdapter,
PostgresPgssReader,
SnowflakeHistoricSqlQueryHistoryReader,
type SourceAdapter,
} from '@ktx/context/ingest';
import type { KtxLocalProject } from '@ktx/context/project';
import { createHttpSqlAnalysisPort } from '@ktx/context/sql-analysis';
import { createHttpSqlAnalysisPort, type SqlAnalysisPort } from '@ktx/context/sql-analysis';
import {
createManagedDaemonLookerTableIdentifierParser,
createManagedDaemonSqlAnalysisPort,
@ -35,6 +44,8 @@ function hasSnowflakeDriver(connection: unknown): boolean {
);
}
type SnowflakeConnectorModule = typeof import('@ktx/connector-snowflake');
function ktxCliDaemonDatabaseIntrospectionOptions(
options: KtxCliLocalIngestAdaptersOptions,
): DefaultLocalIngestAdaptersOptions['databaseIntrospection'] {
@ -61,6 +72,9 @@ function ktxCliLookerOptions(
}
function ktxCliHistoricSqlAnalysis(options: KtxCliLocalIngestAdaptersOptions) {
if (options.sqlAnalysis) {
return options.sqlAnalysis;
}
if (options.sqlAnalysisUrl) {
return createHttpSqlAnalysisPort({ baseUrl: options.sqlAnalysisUrl });
}
@ -145,21 +159,32 @@ function createKtxCliLiveDatabaseIntrospection(
export interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdaptersOptions {
historicSqlConnectionId?: string;
sqlAnalysis?: SqlAnalysisPort;
sqlAnalysisUrl?: string;
managedDaemon?: ManagedPythonCoreDaemonOptions;
}
function isEnabledPostgresHistoricSqlConnection(connection: KtxPostgresConnectionConfig | undefined): boolean {
if (!connection || !isKtxPostgresConnectionConfig(connection)) {
return false;
function historicSqlRecord(connection: unknown): Record<string, unknown> | null {
if (
connection &&
typeof connection === 'object' &&
'historicSql' in connection &&
typeof (connection as { historicSql?: unknown }).historicSql === 'object' &&
(connection as { historicSql?: unknown }).historicSql !== null &&
!Array.isArray((connection as { historicSql?: unknown }).historicSql)
) {
return (connection as { historicSql: Record<string, unknown> }).historicSql;
}
const historicSql =
typeof connection.historicSql === 'object' &&
connection.historicSql !== null &&
!Array.isArray(connection.historicSql)
? (connection.historicSql as Record<string, unknown>)
: null;
return historicSql?.enabled === true && historicSql.dialect === 'postgres';
return null;
}
function enabledHistoricSqlDialect(connection: unknown): 'postgres' | 'bigquery' | 'snowflake' | null {
const historicSql = historicSqlRecord(connection);
if (historicSql?.enabled !== true) {
return null;
}
const dialect = String(historicSql.dialect ?? '').toLowerCase();
return dialect === 'postgres' || dialect === 'bigquery' || dialect === 'snowflake' ? dialect : null;
}
function createEphemeralPostgresHistoricSqlClient(project: KtxLocalProject, connectionId: string) {
@ -184,20 +209,131 @@ function createEphemeralPostgresHistoricSqlClient(project: KtxLocalProject, conn
};
}
function createEphemeralBigQueryHistoricSqlClient(project: KtxLocalProject, connectionId: string) {
const connection = project.config.connections[connectionId] as KtxBigQueryConnectionConfig | undefined;
if (!isKtxBigQueryConnectionConfig(connection)) {
throw new Error(
`Historic SQL local ingest requires a BigQuery connection, got ${String(connection?.driver ?? 'unknown')}`,
);
}
return {
async executeQuery(query: string) {
const connector = new KtxBigQueryScanConnector({
connectionId,
connection,
});
try {
const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
};
} finally {
await connector.cleanup();
}
},
};
}
async function createEphemeralSnowflakeHistoricSqlClient(
project: KtxLocalProject,
connectionId: string,
connectorModule: SnowflakeConnectorModule,
) {
const connection = project.config.connections[connectionId];
if (!connectorModule.isKtxSnowflakeConnectionConfig(connection)) {
throw new Error(
`Historic SQL local ingest requires a Snowflake connection, got ${String(connection?.driver ?? 'unknown')}`,
);
}
return {
async executeQuery(query: string) {
const connector = new connectorModule.KtxSnowflakeScanConnector({
connectionId,
connection,
});
try {
const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
};
} finally {
await connector.cleanup();
}
},
};
}
function bigQueryProjectId(connection: KtxBigQueryConnectionConfig, env: NodeJS.ProcessEnv): string {
const raw = typeof connection.credentials_json === 'string' ? connection.credentials_json : '';
const resolved = raw.startsWith('env:') ? env[raw.slice('env:'.length)] ?? '' : raw;
const parsed = JSON.parse(resolved) as { project_id?: unknown };
if (typeof parsed.project_id !== 'string' || parsed.project_id.trim().length === 0) {
throw new Error('Historic SQL BigQuery connection requires credentials_json.project_id');
}
return parsed.project_id;
}
function bigQueryRegion(connection: KtxBigQueryConnectionConfig): string {
return typeof connection.location === 'string' && connection.location.trim().length > 0
? connection.location.trim()
: 'us';
}
function historicSqlOptionsForLocalRun(project: KtxLocalProject, options: KtxCliLocalIngestAdaptersOptions) {
const connectionId = options.historicSqlConnectionId;
if (!connectionId) {
return undefined;
}
const connection = project.config.connections[connectionId] as KtxPostgresConnectionConfig | undefined;
if (!isEnabledPostgresHistoricSqlConnection(connection)) {
const connection = project.config.connections[connectionId];
const dialect = enabledHistoricSqlDialect(connection);
if (!dialect) {
return undefined;
}
return {
const base = {
sqlAnalysis: ktxCliHistoricSqlAnalysis(options),
postgresQueryClient: createEphemeralPostgresHistoricSqlClient(project, connectionId),
postgresBaselineRootDir: join(project.projectDir, '.ktx/cache/historic-sql'),
};
if (dialect === 'postgres') {
return {
...base,
reader: new PostgresPgssReader() satisfies HistoricSqlReader,
queryClient: createEphemeralPostgresHistoricSqlClient(project, connectionId),
};
}
if (dialect === 'bigquery') {
if (!isKtxBigQueryConnectionConfig(connection)) {
throw new Error(
`Historic SQL local ingest requires a BigQuery connection, got ${String(connection?.driver ?? 'unknown')}`,
);
}
return {
...base,
reader: new BigQueryHistoricSqlQueryHistoryReader({
projectId: bigQueryProjectId(connection, process.env),
region: bigQueryRegion(connection),
}) satisfies HistoricSqlReader,
queryClient: createEphemeralBigQueryHistoricSqlClient(project, connectionId),
};
}
return {
...base,
reader: new SnowflakeHistoricSqlQueryHistoryReader() satisfies HistoricSqlReader,
queryClient: {
async executeQuery(query: string) {
const connectorModule = await import('@ktx/connector-snowflake');
const client = await createEphemeralSnowflakeHistoricSqlClient(project, connectionId, connectorModule);
return client.executeQuery(query);
},
},
};
}
export function createKtxCliLocalIngestAdapters(