From a366e352e0344591d60fc617b819622522629100 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Wed, 13 May 2026 18:24:54 +0200 Subject: [PATCH] feat(setup): store query history in connection context --- packages/cli/src/setup-databases.test.ts | 144 ++++++++++++++++++----- packages/cli/src/setup-databases.ts | 113 ++++++++++++------ 2 files changed, 193 insertions(+), 64 deletions(-) diff --git a/packages/cli/src/setup-databases.test.ts b/packages/cli/src/setup-databases.test.ts index df8b947f..817c37d5 100644 --- a/packages/cli/src/setup-databases.test.ts +++ b/packages/cli/src/setup-databases.test.ts @@ -1369,20 +1369,22 @@ describe('setup databases step', () => { expect(config.connections.snowflake).toMatchObject({ driver: 'snowflake', authMethod: 'password', - historicSql: { - enabled: true, - dialect: 'snowflake', - windowDays: 30, - filters: { - dropTrivialProbes: true, - serviceAccounts: { - patterns: ['^svc_'], - mode: 'exclude', + context: { + queryHistory: { + enabled: true, + windowDays: 30, + filters: { + dropTrivialProbes: true, + serviceAccounts: { + patterns: ['^svc_'], + mode: 'exclude', + }, }, + redactionPatterns: ['(?i)secret'], }, - redactionPatterns: ['(?i)secret'], }, }); + expect(config.connections.snowflake.historicSql).toBeUndefined(); expect(configText).not.toContain('live-database'); expect(configText).not.toContain('historic-sql'); expect(configText).not.toMatch(/^\s+adapters:/m); @@ -1421,21 +1423,23 @@ describe('setup databases step', () => { driver: 'postgres', url: 'env:DATABASE_URL', schemas: ['public'], - historicSql: { - enabled: true, - dialect: 'postgres', - minExecutions: 12, - filters: { - dropTrivialProbes: true, - serviceAccounts: { - patterns: ['^svc_'], - mode: 'exclude', + context: { + queryHistory: { + enabled: true, + minExecutions: 12, + filters: { + dropTrivialProbes: true, + serviceAccounts: { + patterns: ['^svc_'], + mode: 'exclude', + }, }, }, }, }); - expect(config.connections.warehouse.historicSql).not.toHaveProperty('windowDays'); - expect(config.connections.warehouse.historicSql).not.toHaveProperty('redactionPatterns'); + expect(config.connections.warehouse.historicSql).toBeUndefined(); + expect(config.connections.warehouse.context?.queryHistory).not.toHaveProperty('windowDays'); + expect(config.connections.warehouse.context?.queryHistory).not.toHaveProperty('redactionPatterns'); expect(configText).not.toContain('live-database'); expect(configText).not.toContain('historic-sql'); expect(configText).not.toMatch(/^\s+adapters:/m); @@ -1483,16 +1487,18 @@ describe('setup databases step', () => { const configText = await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'); const config = parseKtxProjectConfig(configText); expect(config.connections.analytics).toMatchObject({ - historicSql: { - enabled: true, - dialect: 'bigquery', - windowDays: 45, - filters: { - dropTrivialProbes: true, + context: { + queryHistory: { + enabled: true, + windowDays: 45, + filters: { + dropTrivialProbes: true, + }, + redactionPatterns: [], }, - redactionPatterns: [], }, }); + expect(config.connections.analytics.historicSql).toBeUndefined(); expect(configText).not.toContain('live-database'); expect(configText).not.toContain('historic-sql'); expect(configText).not.toMatch(/^\s+adapters:/m); @@ -1536,13 +1542,89 @@ describe('setup databases step', () => { expect(result.status).toBe('ready'); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); expect(config.connections.warehouse).toMatchObject({ - historicSql: { + context: { + queryHistory: { + enabled: true, + minExecutions: 8, + filters: { + dropTrivialProbes: true, + }, + }, + }, + }); + expect(config.connections.warehouse.historicSql).toBeUndefined(); + }); + + it('migrates legacy historicSql to context.queryHistory during database setup', async () => { + await writeFile( + join(tempDir, 'ktx.yaml'), + [ + 'project: warehouse', + 'connections:', + ' warehouse:', + ' driver: postgres', + ' readonly: true', + ' historicSql:', + ' enabled: true', + ' dialect: postgres', + ' windowDays: 45', + ' minExecutions: 9', + ' concurrency: 3', + ' staleArchiveAfterDays: 120', + ' filters:', + ' dropTrivialProbes: true', + ' serviceAccounts:', + ' mode: exclude', + ' patterns:', + " - '^svc_'", + ' orchestrators:', + ' mode: exclude', + ' patterns:', + ' - airflow', + ' dropFailedBelow: 2', + ' redactionPatterns:', + " - '(?i)secret'", + '', + ].join('\n'), + 'utf-8', + ); + + const io = makeIo(); + + await expect( + runKtxSetupDatabasesStep( + { + projectDir: tempDir, + inputMode: 'disabled', + databaseConnectionIds: ['warehouse'], + databaseSchemas: [], + skipDatabases: false, + }, + io.io, + { + testConnection: vi.fn(async () => 0), + scanConnection: vi.fn(async () => 0), + historicSqlProbe: vi.fn(async () => ({ ok: true, lines: [] })), + }, + ), + ).resolves.toMatchObject({ status: 'ready' }); + + const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections.warehouse.historicSql).toBeUndefined(); + expect(config.connections.warehouse.context).toMatchObject({ + queryHistory: { enabled: true, - dialect: 'postgres', - minExecutions: 8, + windowDays: 45, + minExecutions: 9, + concurrency: 3, + staleArchiveAfterDays: 120, filters: { dropTrivialProbes: true, + serviceAccounts: { mode: 'exclude', patterns: ['^svc_'] }, + orchestrators: { mode: 'exclude', patterns: ['airflow'] }, + dropFailedBelow: 2, }, + redactionPatterns: ['(?i)secret'], }, }); }); diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index 87fa1b19..7f353b01 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -234,6 +234,48 @@ function historicSqlConfigRecord(connection: KtxProjectConnectionConfig | undefi : null; } +function contextRecord(connection: KtxProjectConnectionConfig | undefined): Record { + const context = connection?.context; + return context && typeof context === 'object' && !Array.isArray(context) ? (context as Record) : {}; +} + +function queryHistoryConfigRecord(connection: KtxProjectConnectionConfig | undefined): Record | null { + const queryHistory = contextRecord(connection).queryHistory; + return queryHistory && typeof queryHistory === 'object' && !Array.isArray(queryHistory) + ? (queryHistory as Record) + : null; +} + +function stripLegacyHistoricSql(connection: KtxProjectConnectionConfig): KtxProjectConnectionConfig { + const { historicSql: _historicSql, ...rest } = connection as KtxProjectConnectionConfig & { + historicSql?: unknown; + }; + return rest; +} + +function withQueryHistoryConfig( + connection: KtxProjectConnectionConfig, + queryHistory: Record, +): KtxProjectConnectionConfig { + return { + ...stripLegacyHistoricSql(connection), + context: { + ...contextRecord(connection), + queryHistory, + }, + }; +} + +function migrateLegacyHistoricSqlConnection(connection: KtxProjectConnectionConfig): KtxProjectConnectionConfig { + const existingQueryHistory = queryHistoryConfigRecord(connection); + const legacy = historicSqlConfigRecord(connection); + if (existingQueryHistory || !legacy) { + return existingQueryHistory ? stripLegacyHistoricSql(connection) : connection; + } + const { dialect: _dialect, ...queryHistory } = legacy; + return withQueryHistoryConfig(connection, queryHistory); +} + function historicSqlProbeFailureLines(error: unknown): string[] { if (error instanceof Error && error.name === 'HistoricSqlExtensionMissingError') { return [ @@ -809,40 +851,31 @@ async function maybeApplyHistoricSqlConfig(input: { return input.connection; } - const existing = - typeof input.connection.historicSql === 'object' && input.connection.historicSql !== null - ? (input.connection.historicSql as Record) - : {}; + const existingRecord = queryHistoryConfigRecord(input.connection) ?? historicSqlConfigRecord(input.connection) ?? {}; + const { dialect: _dialect, ...existing } = existingRecord; if (!enabled) { - return { ...input.connection, historicSql: { ...existing, enabled: false, dialect } }; + return withQueryHistoryConfig(input.connection, { ...existing, enabled: false }); } const common: Record = { ...existing, enabled: true, - dialect, filters: historicSqlFiltersForSetup(input.args.historicSqlServiceAccountPatterns), }; if (dialect === 'postgres') { - return { - ...input.connection, - historicSql: { - ...common, - minExecutions: input.args.historicSqlMinExecutions ?? 5, - }, - }; + return withQueryHistoryConfig(input.connection, { + ...common, + minExecutions: input.args.historicSqlMinExecutions ?? 5, + }); } - return { - ...input.connection, - historicSql: { - ...common, - windowDays: input.args.historicSqlWindowDays ?? 90, - redactionPatterns: input.args.historicSqlRedactionPatterns ?? [], - }, - }; + return withQueryHistoryConfig(input.connection, { + ...common, + windowDays: input.args.historicSqlWindowDays ?? 90, + redactionPatterns: input.args.historicSqlRedactionPatterns ?? [], + }); } function historicSqlFiltersForSetup(patterns: string[] | undefined) { @@ -1084,22 +1117,24 @@ async function writeConnectionConfig(input: { connection: KtxProjectConnectionConfig; }): Promise { const project = await loadKtxProject({ projectDir: input.projectDir }); + const migratedConnections = Object.fromEntries( + Object.entries(project.config.connections).map(([connectionId, connection]) => [ + connectionId, + migrateLegacyHistoricSqlConnection(connection), + ]), + ); + const nextConnection = migrateLegacyHistoricSqlConnection(input.connection); const config = { ...project.config, connections: { - ...project.config.connections, - [input.connectionId]: input.connection, + ...migratedConnections, + [input.connectionId]: nextConnection, }, }; await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); - const historicSql = - typeof input.connection.historicSql === 'object' && - input.connection.historicSql !== null && - !Array.isArray(input.connection.historicSql) - ? (input.connection.historicSql as Record) - : null; - if (historicSql?.enabled === true) { + const queryHistory = queryHistoryConfigRecord(nextConnection); + if (queryHistory?.enabled === true) { await ensureHistoricSqlIngestDefaults(input.projectDir); } } @@ -1394,7 +1429,18 @@ async function ensureHistoricSqlIngestDefaults(projectDir: string): Promise { const project = await loadKtxProject({ projectDir }); - const config = setKtxSetupDatabaseConnectionIds(project.config, unique(connectionIds)); + const config = setKtxSetupDatabaseConnectionIds( + { + ...project.config, + connections: Object.fromEntries( + Object.entries(project.config.connections).map(([connectionId, connection]) => [ + connectionId, + migrateLegacyHistoricSqlConnection(connection), + ]), + ), + }, + unique(connectionIds), + ); await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8'); await markKtxSetupStateStepComplete(projectDir, 'databases'); } @@ -1407,8 +1453,9 @@ async function maybeRunHistoricSqlSetupProbe(input: { }): Promise { const project = await loadKtxProject({ projectDir: input.projectDir }); const connection = project.config.connections[input.connectionId]; - const historicSql = historicSqlConfigRecord(connection); - if (historicSql?.enabled !== true || historicSql.dialect !== 'postgres') { + const queryHistory = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection); + const driver = normalizeDriver(connection?.driver); + if (queryHistory?.enabled !== true || driver !== 'postgres') { return; }