feat(setup): store query history in connection context

This commit is contained in:
Andrey Avtomonov 2026-05-13 18:24:54 +02:00
parent f2b1ecbb61
commit a366e352e0
2 changed files with 193 additions and 64 deletions

View file

@ -1369,20 +1369,22 @@ describe('setup databases step', () => {
expect(config.connections.snowflake).toMatchObject({
driver: 'snowflake',
authMethod: 'password',
historicSql: {
enabled: true,
dialect: 'snowflake',
windowDays: 30,
filters: {
dropTrivialProbes: true,
serviceAccounts: {
patterns: ['^svc_'],
mode: 'exclude',
context: {
queryHistory: {
enabled: true,
windowDays: 30,
filters: {
dropTrivialProbes: true,
serviceAccounts: {
patterns: ['^svc_'],
mode: 'exclude',
},
},
redactionPatterns: ['(?i)secret'],
},
redactionPatterns: ['(?i)secret'],
},
});
expect(config.connections.snowflake.historicSql).toBeUndefined();
expect(configText).not.toContain('live-database');
expect(configText).not.toContain('historic-sql');
expect(configText).not.toMatch(/^\s+adapters:/m);
@ -1421,21 +1423,23 @@ describe('setup databases step', () => {
driver: 'postgres',
url: 'env:DATABASE_URL',
schemas: ['public'],
historicSql: {
enabled: true,
dialect: 'postgres',
minExecutions: 12,
filters: {
dropTrivialProbes: true,
serviceAccounts: {
patterns: ['^svc_'],
mode: 'exclude',
context: {
queryHistory: {
enabled: true,
minExecutions: 12,
filters: {
dropTrivialProbes: true,
serviceAccounts: {
patterns: ['^svc_'],
mode: 'exclude',
},
},
},
},
});
expect(config.connections.warehouse.historicSql).not.toHaveProperty('windowDays');
expect(config.connections.warehouse.historicSql).not.toHaveProperty('redactionPatterns');
expect(config.connections.warehouse.historicSql).toBeUndefined();
expect(config.connections.warehouse.context?.queryHistory).not.toHaveProperty('windowDays');
expect(config.connections.warehouse.context?.queryHistory).not.toHaveProperty('redactionPatterns');
expect(configText).not.toContain('live-database');
expect(configText).not.toContain('historic-sql');
expect(configText).not.toMatch(/^\s+adapters:/m);
@ -1483,16 +1487,18 @@ describe('setup databases step', () => {
const configText = await readFile(join(tempDir, 'ktx.yaml'), 'utf-8');
const config = parseKtxProjectConfig(configText);
expect(config.connections.analytics).toMatchObject({
historicSql: {
enabled: true,
dialect: 'bigquery',
windowDays: 45,
filters: {
dropTrivialProbes: true,
context: {
queryHistory: {
enabled: true,
windowDays: 45,
filters: {
dropTrivialProbes: true,
},
redactionPatterns: [],
},
redactionPatterns: [],
},
});
expect(config.connections.analytics.historicSql).toBeUndefined();
expect(configText).not.toContain('live-database');
expect(configText).not.toContain('historic-sql');
expect(configText).not.toMatch(/^\s+adapters:/m);
@ -1536,13 +1542,89 @@ describe('setup databases step', () => {
expect(result.status).toBe('ready');
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.warehouse).toMatchObject({
historicSql: {
context: {
queryHistory: {
enabled: true,
minExecutions: 8,
filters: {
dropTrivialProbes: true,
},
},
},
});
expect(config.connections.warehouse.historicSql).toBeUndefined();
});
it('migrates legacy historicSql to context.queryHistory during database setup', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'project: warehouse',
'connections:',
' warehouse:',
' driver: postgres',
' readonly: true',
' historicSql:',
' enabled: true',
' dialect: postgres',
' windowDays: 45',
' minExecutions: 9',
' concurrency: 3',
' staleArchiveAfterDays: 120',
' filters:',
' dropTrivialProbes: true',
' serviceAccounts:',
' mode: exclude',
' patterns:',
" - '^svc_'",
' orchestrators:',
' mode: exclude',
' patterns:',
' - airflow',
' dropFailedBelow: 2',
' redactionPatterns:',
" - '(?i)secret'",
'',
].join('\n'),
'utf-8',
);
const io = makeIo();
await expect(
runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'disabled',
databaseConnectionIds: ['warehouse'],
databaseSchemas: [],
skipDatabases: false,
},
io.io,
{
testConnection: vi.fn(async () => 0),
scanConnection: vi.fn(async () => 0),
historicSqlProbe: vi.fn(async () => ({ ok: true, lines: [] })),
},
),
).resolves.toMatchObject({ status: 'ready' });
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.warehouse.historicSql).toBeUndefined();
expect(config.connections.warehouse.context).toMatchObject({
queryHistory: {
enabled: true,
dialect: 'postgres',
minExecutions: 8,
windowDays: 45,
minExecutions: 9,
concurrency: 3,
staleArchiveAfterDays: 120,
filters: {
dropTrivialProbes: true,
serviceAccounts: { mode: 'exclude', patterns: ['^svc_'] },
orchestrators: { mode: 'exclude', patterns: ['airflow'] },
dropFailedBelow: 2,
},
redactionPatterns: ['(?i)secret'],
},
});
});

View file

@ -234,6 +234,48 @@ function historicSqlConfigRecord(connection: KtxProjectConnectionConfig | undefi
: null;
}
function contextRecord(connection: KtxProjectConnectionConfig | undefined): Record<string, unknown> {
const context = connection?.context;
return context && typeof context === 'object' && !Array.isArray(context) ? (context as Record<string, unknown>) : {};
}
function queryHistoryConfigRecord(connection: KtxProjectConnectionConfig | undefined): Record<string, unknown> | null {
const queryHistory = contextRecord(connection).queryHistory;
return queryHistory && typeof queryHistory === 'object' && !Array.isArray(queryHistory)
? (queryHistory as Record<string, unknown>)
: null;
}
function stripLegacyHistoricSql(connection: KtxProjectConnectionConfig): KtxProjectConnectionConfig {
const { historicSql: _historicSql, ...rest } = connection as KtxProjectConnectionConfig & {
historicSql?: unknown;
};
return rest;
}
function withQueryHistoryConfig(
connection: KtxProjectConnectionConfig,
queryHistory: Record<string, unknown>,
): KtxProjectConnectionConfig {
return {
...stripLegacyHistoricSql(connection),
context: {
...contextRecord(connection),
queryHistory,
},
};
}
function migrateLegacyHistoricSqlConnection(connection: KtxProjectConnectionConfig): KtxProjectConnectionConfig {
const existingQueryHistory = queryHistoryConfigRecord(connection);
const legacy = historicSqlConfigRecord(connection);
if (existingQueryHistory || !legacy) {
return existingQueryHistory ? stripLegacyHistoricSql(connection) : connection;
}
const { dialect: _dialect, ...queryHistory } = legacy;
return withQueryHistoryConfig(connection, queryHistory);
}
function historicSqlProbeFailureLines(error: unknown): string[] {
if (error instanceof Error && error.name === 'HistoricSqlExtensionMissingError') {
return [
@ -809,40 +851,31 @@ async function maybeApplyHistoricSqlConfig(input: {
return input.connection;
}
const existing =
typeof input.connection.historicSql === 'object' && input.connection.historicSql !== null
? (input.connection.historicSql as Record<string, unknown>)
: {};
const existingRecord = queryHistoryConfigRecord(input.connection) ?? historicSqlConfigRecord(input.connection) ?? {};
const { dialect: _dialect, ...existing } = existingRecord;
if (!enabled) {
return { ...input.connection, historicSql: { ...existing, enabled: false, dialect } };
return withQueryHistoryConfig(input.connection, { ...existing, enabled: false });
}
const common: Record<string, unknown> = {
...existing,
enabled: true,
dialect,
filters: historicSqlFiltersForSetup(input.args.historicSqlServiceAccountPatterns),
};
if (dialect === 'postgres') {
return {
...input.connection,
historicSql: {
...common,
minExecutions: input.args.historicSqlMinExecutions ?? 5,
},
};
return withQueryHistoryConfig(input.connection, {
...common,
minExecutions: input.args.historicSqlMinExecutions ?? 5,
});
}
return {
...input.connection,
historicSql: {
...common,
windowDays: input.args.historicSqlWindowDays ?? 90,
redactionPatterns: input.args.historicSqlRedactionPatterns ?? [],
},
};
return withQueryHistoryConfig(input.connection, {
...common,
windowDays: input.args.historicSqlWindowDays ?? 90,
redactionPatterns: input.args.historicSqlRedactionPatterns ?? [],
});
}
function historicSqlFiltersForSetup(patterns: string[] | undefined) {
@ -1084,22 +1117,24 @@ async function writeConnectionConfig(input: {
connection: KtxProjectConnectionConfig;
}): Promise<void> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const migratedConnections = Object.fromEntries(
Object.entries(project.config.connections).map(([connectionId, connection]) => [
connectionId,
migrateLegacyHistoricSqlConnection(connection),
]),
);
const nextConnection = migrateLegacyHistoricSqlConnection(input.connection);
const config = {
...project.config,
connections: {
...project.config.connections,
[input.connectionId]: input.connection,
...migratedConnections,
[input.connectionId]: nextConnection,
},
};
await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8');
const historicSql =
typeof input.connection.historicSql === 'object' &&
input.connection.historicSql !== null &&
!Array.isArray(input.connection.historicSql)
? (input.connection.historicSql as Record<string, unknown>)
: null;
if (historicSql?.enabled === true) {
const queryHistory = queryHistoryConfigRecord(nextConnection);
if (queryHistory?.enabled === true) {
await ensureHistoricSqlIngestDefaults(input.projectDir);
}
}
@ -1394,7 +1429,18 @@ async function ensureHistoricSqlIngestDefaults(projectDir: string): Promise<void
async function markDatabasesComplete(projectDir: string, connectionIds: string[]): Promise<void> {
const project = await loadKtxProject({ projectDir });
const config = setKtxSetupDatabaseConnectionIds(project.config, unique(connectionIds));
const config = setKtxSetupDatabaseConnectionIds(
{
...project.config,
connections: Object.fromEntries(
Object.entries(project.config.connections).map(([connectionId, connection]) => [
connectionId,
migrateLegacyHistoricSqlConnection(connection),
]),
),
},
unique(connectionIds),
);
await writeFile(project.configPath, serializeKtxProjectConfig(config), 'utf-8');
await markKtxSetupStateStepComplete(projectDir, 'databases');
}
@ -1407,8 +1453,9 @@ async function maybeRunHistoricSqlSetupProbe(input: {
}): Promise<void> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const connection = project.config.connections[input.connectionId];
const historicSql = historicSqlConfigRecord(connection);
if (historicSql?.enabled !== true || historicSql.dialect !== 'postgres') {
const queryHistory = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection);
const driver = normalizeDriver(connection?.driver);
if (queryHistory?.enabled !== true || driver !== 'postgres') {
return;
}