From 98a475d6c585df7b8edd574750c61f01bc531225 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov <7889985+andreybavt@users.noreply.github.com> Date: Mon, 11 May 2026 19:27:05 +0200 Subject: [PATCH] feat: wire historic sql readers for bigquery and snowflake --- packages/cli/src/local-adapters.test.ts | 141 ++++++++++++++++++++ packages/cli/src/local-adapters.ts | 168 +++++++++++++++++++++--- 2 files changed, 293 insertions(+), 16 deletions(-) create mode 100644 packages/cli/src/local-adapters.test.ts diff --git a/packages/cli/src/local-adapters.test.ts b/packages/cli/src/local-adapters.test.ts new file mode 100644 index 00000000..517c0588 --- /dev/null +++ b/packages/cli/src/local-adapters.test.ts @@ -0,0 +1,141 @@ +import { mkdtemp, rm, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { loadKtxProject } from '@ktx/context/project'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; +import { createKtxCliLocalIngestAdapters } from './local-adapters.js'; + +function sqlAnalysisStub() { + return { + async analyzeForFingerprint(sql: string) { + return { + fingerprint: 'fp', + normalizedSql: sql, + tablesTouched: [], + literalSlots: [], + }; + }, + async analyzeBatch() { + return new Map(); + }, + }; +} + +async function writeProject(projectDir: string, body: string): Promise { + await writeFile(join(projectDir, 'ktx.yaml'), body, 'utf-8'); +} + +describe('CLI local ingest adapters', () => { + let tempDir: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-cli-local-adapters-')); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('registers Postgres historic SQL from the requested connection', async () => { + await writeProject( + tempDir, + [ + 'project: warehouse', + 'connections:', + ' warehouse:', + ' driver: postgres', + ' url: env:WAREHOUSE_DATABASE_URL', + ' readonly: true', + ' historicSql:', + ' enabled: true', + ' dialect: postgres', + 'ingest:', + ' adapters:', + ' - historic-sql', + '', + ].join('\n'), + ); + const project = await loadKtxProject({ projectDir: tempDir }); + + const adapters = createKtxCliLocalIngestAdapters(project, { + historicSqlConnectionId: 'warehouse', + sqlAnalysis: sqlAnalysisStub(), + }); + + expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([ + 'historic_sql_table_digest', + 'historic_sql_patterns', + ]); + }); + + it('registers BigQuery historic SQL from the requested connection', async () => { + await writeProject( + tempDir, + [ + 'project: warehouse', + 'connections:', + ' bq:', + ' driver: bigquery', + ' readonly: true', + ' dataset_id: analytics', + ' location: us', + ' credentials_json: \'{"project_id":"demo-project"}\'', + ' historicSql:', + ' enabled: true', + ' dialect: bigquery', + 'ingest:', + ' adapters:', + ' - historic-sql', + '', + ].join('\n'), + ); + const project = await loadKtxProject({ projectDir: tempDir }); + + const adapters = createKtxCliLocalIngestAdapters(project, { + historicSqlConnectionId: 'bq', + sqlAnalysis: sqlAnalysisStub(), + }); + + expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([ + 'historic_sql_table_digest', + 'historic_sql_patterns', + ]); + }); + + it('registers Snowflake historic SQL from the requested connection', async () => { + await writeProject( + tempDir, + [ + 'project: warehouse', + 'connections:', + ' sf:', + ' driver: snowflake', + ' readonly: true', + ' account: acct', + ' warehouse: wh', + ' database: ANALYTICS', + ' schema_name: PUBLIC', + ' username: reader', + ' password: env:SNOWFLAKE_PASSWORD', + ' historicSql:', + ' enabled: true', + ' dialect: snowflake', + 'ingest:', + ' adapters:', + ' - historic-sql', + '', + ].join('\n'), + ); + const project = await loadKtxProject({ projectDir: tempDir }); + + const adapters = createKtxCliLocalIngestAdapters(project, { + historicSqlConnectionId: 'sf', + sqlAnalysis: sqlAnalysisStub(), + }); + + expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([ + 'historic_sql_table_digest', + 'historic_sql_patterns', + ]); + }); +}); diff --git a/packages/cli/src/local-adapters.ts b/packages/cli/src/local-adapters.ts index 90d17306..d0a5f571 100644 --- a/packages/cli/src/local-adapters.ts +++ b/packages/cli/src/local-adapters.ts @@ -1,5 +1,10 @@ import { join } from 'node:path'; -import { createBigQueryLiveDatabaseIntrospection, isKtxBigQueryConnectionConfig } from '@ktx/connector-bigquery'; +import { + createBigQueryLiveDatabaseIntrospection, + isKtxBigQueryConnectionConfig, + KtxBigQueryScanConnector, + type KtxBigQueryConnectionConfig, +} from '@ktx/connector-bigquery'; import { createClickHouseLiveDatabaseIntrospection, isKtxClickHouseConnectionConfig } from '@ktx/connector-clickhouse'; import { createMysqlLiveDatabaseIntrospection, isKtxMysqlConnectionConfig } from '@ktx/connector-mysql'; import { @@ -11,15 +16,19 @@ import { import { createSqliteLiveDatabaseIntrospection, isKtxSqliteConnectionConfig } from '@ktx/connector-sqlite'; import { createSqlServerLiveDatabaseIntrospection, isKtxSqlServerConnectionConfig } from '@ktx/connector-sqlserver'; import { + BigQueryHistoricSqlQueryHistoryReader, createDaemonLiveDatabaseIntrospection, createDefaultLocalIngestAdapters, type DefaultLocalIngestAdaptersOptions, + type HistoricSqlReader, type LiveDatabaseIntrospectionPort, LiveDatabaseSourceAdapter, + PostgresPgssReader, + SnowflakeHistoricSqlQueryHistoryReader, type SourceAdapter, } from '@ktx/context/ingest'; import type { KtxLocalProject } from '@ktx/context/project'; -import { createHttpSqlAnalysisPort } from '@ktx/context/sql-analysis'; +import { createHttpSqlAnalysisPort, type SqlAnalysisPort } from '@ktx/context/sql-analysis'; import { createManagedDaemonLookerTableIdentifierParser, createManagedDaemonSqlAnalysisPort, @@ -35,6 +44,8 @@ function hasSnowflakeDriver(connection: unknown): boolean { ); } +type SnowflakeConnectorModule = typeof import('@ktx/connector-snowflake'); + function ktxCliDaemonDatabaseIntrospectionOptions( options: KtxCliLocalIngestAdaptersOptions, ): DefaultLocalIngestAdaptersOptions['databaseIntrospection'] { @@ -61,6 +72,9 @@ function ktxCliLookerOptions( } function ktxCliHistoricSqlAnalysis(options: KtxCliLocalIngestAdaptersOptions) { + if (options.sqlAnalysis) { + return options.sqlAnalysis; + } if (options.sqlAnalysisUrl) { return createHttpSqlAnalysisPort({ baseUrl: options.sqlAnalysisUrl }); } @@ -145,21 +159,32 @@ function createKtxCliLiveDatabaseIntrospection( export interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdaptersOptions { historicSqlConnectionId?: string; + sqlAnalysis?: SqlAnalysisPort; sqlAnalysisUrl?: string; managedDaemon?: ManagedPythonCoreDaemonOptions; } -function isEnabledPostgresHistoricSqlConnection(connection: KtxPostgresConnectionConfig | undefined): boolean { - if (!connection || !isKtxPostgresConnectionConfig(connection)) { - return false; +function historicSqlRecord(connection: unknown): Record | null { + if ( + connection && + typeof connection === 'object' && + 'historicSql' in connection && + typeof (connection as { historicSql?: unknown }).historicSql === 'object' && + (connection as { historicSql?: unknown }).historicSql !== null && + !Array.isArray((connection as { historicSql?: unknown }).historicSql) + ) { + return (connection as { historicSql: Record }).historicSql; } - const historicSql = - typeof connection.historicSql === 'object' && - connection.historicSql !== null && - !Array.isArray(connection.historicSql) - ? (connection.historicSql as Record) - : null; - return historicSql?.enabled === true && historicSql.dialect === 'postgres'; + return null; +} + +function enabledHistoricSqlDialect(connection: unknown): 'postgres' | 'bigquery' | 'snowflake' | null { + const historicSql = historicSqlRecord(connection); + if (historicSql?.enabled !== true) { + return null; + } + const dialect = String(historicSql.dialect ?? '').toLowerCase(); + return dialect === 'postgres' || dialect === 'bigquery' || dialect === 'snowflake' ? dialect : null; } function createEphemeralPostgresHistoricSqlClient(project: KtxLocalProject, connectionId: string) { @@ -184,20 +209,131 @@ function createEphemeralPostgresHistoricSqlClient(project: KtxLocalProject, conn }; } +function createEphemeralBigQueryHistoricSqlClient(project: KtxLocalProject, connectionId: string) { + const connection = project.config.connections[connectionId] as KtxBigQueryConnectionConfig | undefined; + if (!isKtxBigQueryConnectionConfig(connection)) { + throw new Error( + `Historic SQL local ingest requires a BigQuery connection, got ${String(connection?.driver ?? 'unknown')}`, + ); + } + return { + async executeQuery(query: string) { + const connector = new KtxBigQueryScanConnector({ + connectionId, + connection, + }); + try { + const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never); + return { + headers: result.headers, + rows: result.rows, + totalRows: result.totalRows, + }; + } finally { + await connector.cleanup(); + } + }, + }; +} + +async function createEphemeralSnowflakeHistoricSqlClient( + project: KtxLocalProject, + connectionId: string, + connectorModule: SnowflakeConnectorModule, +) { + const connection = project.config.connections[connectionId]; + if (!connectorModule.isKtxSnowflakeConnectionConfig(connection)) { + throw new Error( + `Historic SQL local ingest requires a Snowflake connection, got ${String(connection?.driver ?? 'unknown')}`, + ); + } + return { + async executeQuery(query: string) { + const connector = new connectorModule.KtxSnowflakeScanConnector({ + connectionId, + connection, + }); + try { + const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never); + return { + headers: result.headers, + rows: result.rows, + totalRows: result.totalRows, + }; + } finally { + await connector.cleanup(); + } + }, + }; +} + +function bigQueryProjectId(connection: KtxBigQueryConnectionConfig, env: NodeJS.ProcessEnv): string { + const raw = typeof connection.credentials_json === 'string' ? connection.credentials_json : ''; + const resolved = raw.startsWith('env:') ? env[raw.slice('env:'.length)] ?? '' : raw; + const parsed = JSON.parse(resolved) as { project_id?: unknown }; + if (typeof parsed.project_id !== 'string' || parsed.project_id.trim().length === 0) { + throw new Error('Historic SQL BigQuery connection requires credentials_json.project_id'); + } + return parsed.project_id; +} + +function bigQueryRegion(connection: KtxBigQueryConnectionConfig): string { + return typeof connection.location === 'string' && connection.location.trim().length > 0 + ? connection.location.trim() + : 'us'; +} + function historicSqlOptionsForLocalRun(project: KtxLocalProject, options: KtxCliLocalIngestAdaptersOptions) { const connectionId = options.historicSqlConnectionId; if (!connectionId) { return undefined; } - const connection = project.config.connections[connectionId] as KtxPostgresConnectionConfig | undefined; - if (!isEnabledPostgresHistoricSqlConnection(connection)) { + const connection = project.config.connections[connectionId]; + const dialect = enabledHistoricSqlDialect(connection); + if (!dialect) { return undefined; } - return { + + const base = { sqlAnalysis: ktxCliHistoricSqlAnalysis(options), - postgresQueryClient: createEphemeralPostgresHistoricSqlClient(project, connectionId), postgresBaselineRootDir: join(project.projectDir, '.ktx/cache/historic-sql'), }; + + if (dialect === 'postgres') { + return { + ...base, + reader: new PostgresPgssReader() satisfies HistoricSqlReader, + queryClient: createEphemeralPostgresHistoricSqlClient(project, connectionId), + }; + } + + if (dialect === 'bigquery') { + if (!isKtxBigQueryConnectionConfig(connection)) { + throw new Error( + `Historic SQL local ingest requires a BigQuery connection, got ${String(connection?.driver ?? 'unknown')}`, + ); + } + return { + ...base, + reader: new BigQueryHistoricSqlQueryHistoryReader({ + projectId: bigQueryProjectId(connection, process.env), + region: bigQueryRegion(connection), + }) satisfies HistoricSqlReader, + queryClient: createEphemeralBigQueryHistoricSqlClient(project, connectionId), + }; + } + + return { + ...base, + reader: new SnowflakeHistoricSqlQueryHistoryReader() satisfies HistoricSqlReader, + queryClient: { + async executeQuery(query: string) { + const connectorModule = await import('@ktx/connector-snowflake'); + const client = await createEphemeralSnowflakeHistoricSqlClient(project, connectionId, connectorModule); + return client.executeQuery(query); + }, + }, + }; } export function createKtxCliLocalIngestAdapters(