fix(cli): preserve query-history pull config in public ingest

This commit is contained in:
Andrey Avtomonov 2026-05-13 19:56:15 +02:00
parent 27e7f214f6
commit 41c5ecdcfb
2 changed files with 103 additions and 7 deletions

View file

@ -1,6 +1,11 @@
import { buildDefaultKtxProjectConfig, type KtxProjectConfig } from '@ktx/context/project';
import { describe, expect, it, vi } from 'vitest';
import { buildPublicIngestPlan, type KtxPublicIngestProject, runKtxPublicIngest } from './public-ingest.js';
import {
buildPublicIngestPlan,
type KtxPublicIngestDeps,
type KtxPublicIngestProject,
runKtxPublicIngest,
} from './public-ingest.js';
function makeIo(options: { isTTY?: boolean; interactive?: boolean } = {}) {
let stdout = '';
@ -353,7 +358,7 @@ describe('runKtxPublicIngest', () => {
warehouse: { driver: 'postgres', context: { queryHistory: { enabled: true, windowDays: 90 } } },
});
const runScan = vi.fn(async () => 0);
const runIngest = vi.fn(async () => 0);
const runIngest = vi.fn<NonNullable<KtxPublicIngestDeps['runIngest']>>(async () => 0);
await expect(
runKtxPublicIngest(
@ -388,6 +393,73 @@ describe('runKtxPublicIngest', () => {
);
});
it('preserves configured query-history pull fields while overriding the current-run window', async () => {
const io = makeIo();
const project = deepReadyProject({
warehouse: {
driver: 'postgres',
context: {
queryHistory: {
enabled: true,
windowDays: 90,
minExecutions: 7,
concurrency: 3,
staleArchiveAfterDays: 120,
filters: {
dropTrivialProbes: true,
serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' },
orchestrators: { mode: 'mark-only' },
dropFailedBelow: { errorRate: 0.5, executions: 3 },
},
redactionPatterns: ['(?i)secret'],
},
},
},
});
const runScan = vi.fn(async () => 0);
const runIngest = vi.fn(async () => 0);
await expect(
runKtxPublicIngest(
{
command: 'run',
projectDir: '/tmp/project',
targetConnectionId: 'warehouse',
all: false,
json: false,
inputMode: 'disabled',
queryHistory: 'enabled',
queryHistoryWindowDays: 30,
},
io.io,
{ loadProject: vi.fn(async () => project), runScan, runIngest },
),
).resolves.toBe(0);
const ingestArgs = runIngest.mock.calls[0]?.[0];
expect(ingestArgs).toMatchObject({
command: 'run',
connectionId: 'warehouse',
adapter: 'historic-sql',
allowImplicitAdapter: true,
historicSqlPullConfigOverride: {
dialect: 'postgres',
windowDays: 30,
minExecutions: 7,
concurrency: 3,
staleArchiveAfterDays: 120,
filters: {
dropTrivialProbes: true,
serviceAccounts: { patterns: ['^svc_'], mode: 'exclude' },
orchestrators: { mode: 'mark-only' },
dropFailedBelow: { errorRate: 0.5, executions: 3 },
},
redactionPatterns: ['(?i)secret'],
},
});
expect(ingestArgs?.historicSqlPullConfigOverride).not.toHaveProperty('enabled');
});
it('prints the schema-first notice for explicit query-history runs', async () => {
const io = makeIo();
const project = deepReadyProject({

View file

@ -58,6 +58,7 @@ export interface KtxPublicIngestPlanTarget {
enabled: boolean;
dialect?: HistoricSqlDialect;
windowDays?: number;
pullConfig?: Record<string, unknown>;
unsupported?: boolean;
skippedStoredByFast?: boolean;
};
@ -203,6 +204,19 @@ function positiveInteger(value: unknown): number | undefined {
return typeof value === 'number' && Number.isInteger(value) && value > 0 ? value : undefined;
}
function queryHistoryPullConfig(input: {
stored: Record<string, unknown>;
dialect: HistoricSqlDialect;
windowDays?: number;
}): Record<string, unknown> {
const { enabled: _enabled, dialect: _dialect, ...storedConfig } = input.stored;
return {
...storedConfig,
dialect: input.dialect,
...(input.windowDays !== undefined ? { windowDays: input.windowDays } : {}),
};
}
function depthFromLegacyScanMode(
mode: Extract<KtxScanArgs, { command: 'run' }>['mode'] | undefined,
): KtxPublicIngestDepth | undefined {
@ -265,7 +279,16 @@ function resolveDatabaseTargetOptions(input: {
depth = 'deep';
return {
databaseDepth: depth,
queryHistory: { ...queryHistory, enabled: true, dialect },
queryHistory: {
...queryHistory,
enabled: true,
dialect,
pullConfig: queryHistoryPullConfig({
stored: storedQh,
dialect,
windowDays: queryHistory.windowDays,
}),
},
steps: ['database-schema', 'query-history'],
};
}
@ -627,10 +650,11 @@ export async function executePublicIngestTarget(
outputMode: sourceIngestOutputMode(args, io),
inputMode: args.inputMode,
allowImplicitAdapter: true,
historicSqlPullConfigOverride: {
dialect: target.queryHistory.dialect,
...(target.queryHistory.windowDays !== undefined ? { windowDays: target.queryHistory.windowDays } : {}),
},
historicSqlPullConfigOverride:
target.queryHistory.pullConfig ?? {
dialect: target.queryHistory.dialect,
...(target.queryHistory.windowDays !== undefined ? { windowDays: target.queryHistory.windowDays } : {}),
},
};
const qhExitCode = await runIngest(ingestArgs, io);
if (qhExitCode !== 0) {