From 03d2d26e7197b65eb60cc3d08b5cf607ce4e1169 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Wed, 13 May 2026 18:00:52 +0200 Subject: [PATCH] feat(ingest): read connection query history config --- packages/cli/src/local-adapters.test.ts | 32 ++++++++++++++ packages/cli/src/local-adapters.ts | 26 ++++++++++-- .../context/src/ingest/local-adapters.test.ts | 42 ++++++++++++++++++- packages/context/src/ingest/local-adapters.ts | 32 +++++++++++++- 4 files changed, 126 insertions(+), 6 deletions(-) diff --git a/packages/cli/src/local-adapters.test.ts b/packages/cli/src/local-adapters.test.ts index 517c0588..62c46c03 100644 --- a/packages/cli/src/local-adapters.test.ts +++ b/packages/cli/src/local-adapters.test.ts @@ -68,6 +68,38 @@ describe('CLI local ingest adapters', () => { ]); }); + it('registers Postgres historic SQL from connection context query history', async () => { + await writeProject( + tempDir, + [ + 'project: warehouse', + 'connections:', + ' warehouse:', + ' driver: postgres', + ' url: env:WAREHOUSE_DATABASE_URL', + ' readonly: true', + ' context:', + ' queryHistory:', + ' enabled: true', + 'ingest:', + ' adapters:', + ' - historic-sql', + '', + ].join('\n'), + ); + const project = await loadKtxProject({ projectDir: tempDir }); + + const adapters = createKtxCliLocalIngestAdapters(project, { + historicSqlConnectionId: 'warehouse', + sqlAnalysis: sqlAnalysisStub(), + }); + + expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([ + 'historic_sql_table_digest', + 'historic_sql_patterns', + ]); + }); + it('registers BigQuery historic SQL from the requested connection', async () => { await writeProject( tempDir, diff --git a/packages/cli/src/local-adapters.ts b/packages/cli/src/local-adapters.ts index 9a6915c2..a99af7df 100644 --- a/packages/cli/src/local-adapters.ts +++ b/packages/cli/src/local-adapters.ts @@ -180,12 +180,30 @@ function historicSqlRecord(connection: unknown): Record | null } function enabledHistoricSqlDialect(connection: unknown): 'postgres' | 'bigquery' | 'snowflake' | null { - const historicSql = historicSqlRecord(connection); - if (historicSql?.enabled !== true) { + const direct = historicSqlRecord(connection); + const context = + connection && typeof connection === 'object' && !Array.isArray(connection) + ? (connection as { context?: unknown }).context + : null; + const queryHistory = + context && typeof context === 'object' && !Array.isArray(context) + ? (context as { queryHistory?: unknown }).queryHistory + : null; + const enabled = + queryHistory && typeof queryHistory === 'object' && !Array.isArray(queryHistory) + ? (queryHistory as { enabled?: unknown }).enabled === true + : direct?.enabled === true; + if (!enabled) { return null; } - const dialect = String(historicSql.dialect ?? '').toLowerCase(); - return dialect === 'postgres' || dialect === 'bigquery' || dialect === 'snowflake' ? dialect : null; + const driver = String((connection as { driver?: unknown })?.driver ?? '').toLowerCase(); + if (driver === 'postgres' || driver === 'postgresql') return 'postgres'; + if (driver === 'bigquery') return 'bigquery'; + if (driver === 'snowflake') return 'snowflake'; + const legacyDialect = String(direct?.dialect ?? '').toLowerCase(); + return legacyDialect === 'postgres' || legacyDialect === 'bigquery' || legacyDialect === 'snowflake' + ? legacyDialect + : null; } function createEphemeralPostgresHistoricSqlClient(project: KtxLocalProject, connectionId: string) { diff --git a/packages/context/src/ingest/local-adapters.test.ts b/packages/context/src/ingest/local-adapters.test.ts index a962763d..6ea245b2 100644 --- a/packages/context/src/ingest/local-adapters.test.ts +++ b/packages/context/src/ingest/local-adapters.test.ts @@ -204,6 +204,46 @@ describe('local ingest adapters', () => { }); }); + it('maps connection context.queryHistory to historic-sql pull config', async () => { + const project = projectWithConnections({ + warehouse: { + driver: 'postgres', + context: { + queryHistory: { + enabled: true, + windowDays: 45, + minExecutions: 7, + filters: { dropTrivialProbes: true }, + }, + }, + }, + }); + const adapter = { source: 'historic-sql' } as never; + + await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({ + dialect: 'postgres', + windowDays: 45, + minExecutions: 7, + filters: { dropTrivialProbes: true }, + }); + }); + + it('prefers context.queryHistory over legacy historicSql', async () => { + const project = projectWithConnections({ + warehouse: { + driver: 'postgres', + historicSql: { enabled: true, dialect: 'postgres', windowDays: 90 }, + context: { queryHistory: { enabled: true, windowDays: 30 } }, + }, + }); + const adapter = { source: 'historic-sql' } as never; + + await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({ + dialect: 'postgres', + windowDays: 30, + }); + }); + it('rejects local historic-sql pulls when the connection has not enabled historic SQL', async () => { const historicSql = createDefaultLocalIngestAdapters(project, { historicSql: { @@ -235,7 +275,7 @@ describe('local ingest adapters', () => { }); await expect(localPullConfigForAdapter(postgresProject, historicSql!, 'warehouse')).rejects.toThrow( - 'Connection "warehouse" does not have historicSql.enabled: true', + 'Connection "warehouse" does not have context.queryHistory.enabled: true', ); }); diff --git a/packages/context/src/ingest/local-adapters.ts b/packages/context/src/ingest/local-adapters.ts index 0bf5fd42..3c503192 100644 --- a/packages/context/src/ingest/local-adapters.ts +++ b/packages/context/src/ingest/local-adapters.ts @@ -52,6 +52,7 @@ export interface DefaultLocalIngestAdaptersOptions { postgresQueryClient?: KtxPostgresQueryClient; now?: () => Date; }; + historicSqlPullConfigOverride?: Record; looker?: { daemonBaseUrl?: string; client?: Pick; @@ -155,6 +156,28 @@ function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null && !Array.isArray(value); } +const historicSqlDialectByDriver = new Map([ + ['postgres', 'postgres'], + ['postgresql', 'postgres'], + ['bigquery', 'bigquery'], + ['snowflake', 'snowflake'], +]); + +function queryHistoryRecord(connection: unknown): Record | null { + if (!isRecord(connection)) return null; + const context = isRecord(connection.context) ? connection.context : null; + const queryHistory = isRecord(context?.queryHistory) ? context.queryHistory : null; + return queryHistory; +} + +function queryHistoryPullConfig(connection: unknown): Record | null { + const queryHistory = queryHistoryRecord(connection); + if (queryHistory?.enabled !== true || !isRecord(connection)) return null; + const dialect = historicSqlDialectByDriver.get(String(connection.driver ?? '').toLowerCase()); + if (!dialect) return null; + return { ...queryHistory, dialect }; +} + function stringField(value: unknown): string | null { return typeof value === 'string' && value.trim().length > 0 ? value.trim() : null; } @@ -210,9 +233,16 @@ export async function localPullConfigForAdapter( } const connection = project.config.connections[connectionId]; if (adapter.source === HISTORIC_SQL_SOURCE_KEY) { + if (options.historicSqlPullConfigOverride) { + return historicSqlUnifiedPullConfigSchema.parse(options.historicSqlPullConfigOverride); + } + const queryHistory = queryHistoryPullConfig(connection); + if (queryHistory) { + return historicSqlUnifiedPullConfigSchema.parse(queryHistory); + } const historicSql = isRecord(connection?.historicSql) ? connection.historicSql : null; if (historicSql?.enabled !== true) { - throw new Error(`Connection "${connectionId}" does not have historicSql.enabled: true`); + throw new Error(`Connection "${connectionId}" does not have context.queryHistory.enabled: true`); } return historicSqlUnifiedPullConfigSchema.parse({ ...historicSql,