feat(ingest): read connection query history config

This commit is contained in:
Andrey Avtomonov 2026-05-13 18:00:52 +02:00
parent 7dd14bb333
commit 03d2d26e71
4 changed files with 126 additions and 6 deletions

View file

@ -68,6 +68,38 @@ describe('CLI local ingest adapters', () => {
]);
});
it('registers Postgres historic SQL from connection context query history', async () => {
await writeProject(
tempDir,
[
'project: warehouse',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:WAREHOUSE_DATABASE_URL',
' readonly: true',
' context:',
' queryHistory:',
' enabled: true',
'ingest:',
' adapters:',
' - historic-sql',
'',
].join('\n'),
);
const project = await loadKtxProject({ projectDir: tempDir });
const adapters = createKtxCliLocalIngestAdapters(project, {
historicSqlConnectionId: 'warehouse',
sqlAnalysis: sqlAnalysisStub(),
});
expect(adapters.find((adapter) => adapter.source === 'historic-sql')?.skillNames).toEqual([
'historic_sql_table_digest',
'historic_sql_patterns',
]);
});
it('registers BigQuery historic SQL from the requested connection', async () => {
await writeProject(
tempDir,

View file

@ -180,12 +180,30 @@ function historicSqlRecord(connection: unknown): Record<string, unknown> | null
}
function enabledHistoricSqlDialect(connection: unknown): 'postgres' | 'bigquery' | 'snowflake' | null {
const historicSql = historicSqlRecord(connection);
if (historicSql?.enabled !== true) {
const direct = historicSqlRecord(connection);
const context =
connection && typeof connection === 'object' && !Array.isArray(connection)
? (connection as { context?: unknown }).context
: null;
const queryHistory =
context && typeof context === 'object' && !Array.isArray(context)
? (context as { queryHistory?: unknown }).queryHistory
: null;
const enabled =
queryHistory && typeof queryHistory === 'object' && !Array.isArray(queryHistory)
? (queryHistory as { enabled?: unknown }).enabled === true
: direct?.enabled === true;
if (!enabled) {
return null;
}
const dialect = String(historicSql.dialect ?? '').toLowerCase();
return dialect === 'postgres' || dialect === 'bigquery' || dialect === 'snowflake' ? dialect : null;
const driver = String((connection as { driver?: unknown })?.driver ?? '').toLowerCase();
if (driver === 'postgres' || driver === 'postgresql') return 'postgres';
if (driver === 'bigquery') return 'bigquery';
if (driver === 'snowflake') return 'snowflake';
const legacyDialect = String(direct?.dialect ?? '').toLowerCase();
return legacyDialect === 'postgres' || legacyDialect === 'bigquery' || legacyDialect === 'snowflake'
? legacyDialect
: null;
}
function createEphemeralPostgresHistoricSqlClient(project: KtxLocalProject, connectionId: string) {

View file

@ -204,6 +204,46 @@ describe('local ingest adapters', () => {
});
});
it('maps connection context.queryHistory to historic-sql pull config', async () => {
const project = projectWithConnections({
warehouse: {
driver: 'postgres',
context: {
queryHistory: {
enabled: true,
windowDays: 45,
minExecutions: 7,
filters: { dropTrivialProbes: true },
},
},
},
});
const adapter = { source: 'historic-sql' } as never;
await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({
dialect: 'postgres',
windowDays: 45,
minExecutions: 7,
filters: { dropTrivialProbes: true },
});
});
it('prefers context.queryHistory over legacy historicSql', async () => {
const project = projectWithConnections({
warehouse: {
driver: 'postgres',
historicSql: { enabled: true, dialect: 'postgres', windowDays: 90 },
context: { queryHistory: { enabled: true, windowDays: 30 } },
},
});
const adapter = { source: 'historic-sql' } as never;
await expect(localPullConfigForAdapter(project, adapter, 'warehouse')).resolves.toMatchObject({
dialect: 'postgres',
windowDays: 30,
});
});
it('rejects local historic-sql pulls when the connection has not enabled historic SQL', async () => {
const historicSql = createDefaultLocalIngestAdapters(project, {
historicSql: {
@ -235,7 +275,7 @@ describe('local ingest adapters', () => {
});
await expect(localPullConfigForAdapter(postgresProject, historicSql!, 'warehouse')).rejects.toThrow(
'Connection "warehouse" does not have historicSql.enabled: true',
'Connection "warehouse" does not have context.queryHistory.enabled: true',
);
});

View file

@ -52,6 +52,7 @@ export interface DefaultLocalIngestAdaptersOptions {
postgresQueryClient?: KtxPostgresQueryClient;
now?: () => Date;
};
historicSqlPullConfigOverride?: Record<string, unknown>;
looker?: {
daemonBaseUrl?: string;
client?: Pick<LookerMappingClient, 'listLookmlModels' | 'getExplore'>;
@ -155,6 +156,28 @@ function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}
const historicSqlDialectByDriver = new Map<string, 'postgres' | 'bigquery' | 'snowflake'>([
['postgres', 'postgres'],
['postgresql', 'postgres'],
['bigquery', 'bigquery'],
['snowflake', 'snowflake'],
]);
function queryHistoryRecord(connection: unknown): Record<string, unknown> | null {
if (!isRecord(connection)) return null;
const context = isRecord(connection.context) ? connection.context : null;
const queryHistory = isRecord(context?.queryHistory) ? context.queryHistory : null;
return queryHistory;
}
function queryHistoryPullConfig(connection: unknown): Record<string, unknown> | null {
const queryHistory = queryHistoryRecord(connection);
if (queryHistory?.enabled !== true || !isRecord(connection)) return null;
const dialect = historicSqlDialectByDriver.get(String(connection.driver ?? '').toLowerCase());
if (!dialect) return null;
return { ...queryHistory, dialect };
}
function stringField(value: unknown): string | null {
return typeof value === 'string' && value.trim().length > 0 ? value.trim() : null;
}
@ -210,9 +233,16 @@ export async function localPullConfigForAdapter(
}
const connection = project.config.connections[connectionId];
if (adapter.source === HISTORIC_SQL_SOURCE_KEY) {
if (options.historicSqlPullConfigOverride) {
return historicSqlUnifiedPullConfigSchema.parse(options.historicSqlPullConfigOverride);
}
const queryHistory = queryHistoryPullConfig(connection);
if (queryHistory) {
return historicSqlUnifiedPullConfigSchema.parse(queryHistory);
}
const historicSql = isRecord(connection?.historicSql) ? connection.historicSql : null;
if (historicSql?.enabled !== true) {
throw new Error(`Connection "${connectionId}" does not have historicSql.enabled: true`);
throw new Error(`Connection "${connectionId}" does not have context.queryHistory.enabled: true`);
}
return historicSqlUnifiedPullConfigSchema.parse({
...historicSql,