fix(ingest): honor query history window intent

This commit is contained in:
Andrey Avtomonov 2026-05-13 18:49:55 +02:00
parent 9cb0bdbeac
commit 6c4bf2a52a
2 changed files with 120 additions and 7 deletions

View file

@ -194,6 +194,68 @@ describe('buildPublicIngestPlan', () => {
expect(plan.warnings).toEqual(['--query-history is not supported for sqlite; running schema ingest for local.']);
});
it('treats query-history window override as current-run query-history enablement', () => {
const project = deepReadyProject({
warehouse: { driver: 'postgres', context: { queryHistory: { enabled: false, windowDays: 90 } } },
});
const plan = buildPublicIngestPlan(project, {
projectDir: '/tmp/project',
targetConnectionId: 'warehouse',
all: false,
queryHistory: 'default',
queryHistoryWindowDays: 30,
});
expect(plan.targets[0]).toMatchObject({
connectionId: 'warehouse',
databaseDepth: 'deep',
queryHistory: { enabled: true, dialect: 'postgres', windowDays: 30 },
steps: ['database-schema', 'query-history'],
});
});
it('warns and skips query-history window override for unsupported database drivers', () => {
const plan = buildPublicIngestPlan(
projectWithConnections({
local: { driver: 'sqlite' },
}),
{
projectDir: '/tmp/project',
targetConnectionId: 'local',
all: false,
queryHistory: 'default',
queryHistoryWindowDays: 30,
},
);
expect(plan.targets[0]).toMatchObject({
connectionId: 'local',
databaseDepth: 'fast',
queryHistory: { enabled: false, windowDays: 30, unsupported: true },
steps: ['database-schema'],
});
expect(plan.warnings).toEqual(['--query-history is not supported for sqlite; running schema ingest for local.']);
});
it('aggregates ignored database-depth warnings for all source targets', () => {
const plan = buildPublicIngestPlan(
projectWithConnections({
warehouse: { driver: 'postgres' },
docs: { driver: 'notion' },
dbt: { driver: 'dbt' },
}),
{
projectDir: '/tmp/project',
all: true,
depth: 'deep',
queryHistory: 'default',
},
);
expect(plan.warnings).toEqual(['--deep ignored for 2 non-database sources.']);
});
it('records a preflight failure for deep database ingest when readiness config is missing', () => {
const project = projectWithConnections({
warehouse: { driver: 'postgres', context: { depth: 'deep' } },

View file

@ -108,6 +108,54 @@ const queryHistoryDialectByDriver = new Map<string, HistoricSqlDialect>([
['snowflake', 'snowflake'],
]);
interface KtxPublicIngestWarningAccumulator {
warnings: string[];
ignoredDepthForSources: string[];
ignoredQueryHistoryForSources: string[];
}
function createWarningAccumulator(): KtxPublicIngestWarningAccumulator {
return {
warnings: [],
ignoredDepthForSources: [],
ignoredQueryHistoryForSources: [],
};
}
function sourceIgnoredWarning(option: string, connectionIds: string[], all: boolean): string | null {
if (connectionIds.length === 0) {
return null;
}
if (all) {
const sourceLabel =
connectionIds.length === 1 ? '1 non-database source' : `${connectionIds.length} non-database sources`;
return `${option} ignored for ${sourceLabel}.`;
}
return `${option} affects database ingest only; ignoring it for ${connectionIds[0]}.`;
}
function finalizeWarnings(
accumulator: KtxPublicIngestWarningAccumulator,
args: {
all: boolean;
depth?: KtxPublicIngestDepth;
queryHistory?: KtxPublicIngestQueryHistoryFlag;
queryHistoryWindowDays?: number;
},
): string[] {
const warnings = [...accumulator.warnings];
const depthOption = args.depth ? `--${args.depth}` : null;
if (depthOption) {
const warning = sourceIgnoredWarning(depthOption, accumulator.ignoredDepthForSources, args.all);
if (warning) warnings.push(warning);
}
if (args.queryHistory === 'enabled' || args.queryHistoryWindowDays !== undefined) {
const warning = sourceIgnoredWarning('--query-history', accumulator.ignoredQueryHistoryForSources, args.all);
if (warning) warnings.push(warning);
}
return warnings;
}
function storedQueryHistory(connection: KtxProjectConnectionConfig): Record<string, unknown> {
const context = connection.context;
const contextRecord =
@ -147,7 +195,10 @@ function resolveDatabaseTargetOptions(input: {
const dialect = queryHistoryDialectByDriver.get(input.driver);
const explicitQueryHistory = input.args.queryHistory ?? 'default';
const storedEnabled = storedQh.enabled === true;
const requestedQh = explicitQueryHistory === 'enabled' || (explicitQueryHistory === 'default' && storedEnabled);
const windowOverrideRequested = input.args.queryHistoryWindowDays !== undefined;
const requestedQh =
explicitQueryHistory === 'enabled' ||
(explicitQueryHistory !== 'disabled' && (windowOverrideRequested || storedEnabled));
let depth =
input.args.depth ?? depthFromLegacyScanMode(input.args.scanMode) ?? databaseContextDepth(input.connection) ?? 'fast';
const queryHistory = {
@ -212,17 +263,17 @@ function targetForConnection(
queryHistoryWindowDays?: number;
scanMode?: Extract<KtxScanArgs, { command: 'run' }>['mode'];
},
warnings: string[],
warnings: KtxPublicIngestWarningAccumulator,
): KtxPublicIngestPlanTarget {
const driver = normalizeConnectionDriver(connection);
const adapter = sourceAdapterByDriver.get(driver);
const sourceDir = sourceDirForConnection(connection);
if (adapter) {
if (args.depth) {
warnings.push(`--${args.depth} affects database ingest only; ignoring it for ${connectionId}.`);
warnings.ignoredDepthForSources.push(connectionId);
}
if (args.queryHistory === 'enabled' || args.queryHistoryWindowDays !== undefined) {
warnings.push(`--query-history affects database ingest only; ignoring it for ${connectionId}.`);
warnings.ignoredQueryHistoryForSources.push(connectionId);
}
return {
connectionId,
@ -236,7 +287,7 @@ function targetForConnection(
}
if (isDatabaseDriver(driver)) {
const options = resolveDatabaseTargetOptions({ connectionId, driver, connection, args, warnings });
const options = resolveDatabaseTargetOptions({ connectionId, driver, connection, args, warnings: warnings.warnings });
const gaps = options.databaseDepth === 'deep' ? deepReadinessGaps(projectConfig) : [];
return {
connectionId,
@ -284,7 +335,7 @@ export function buildPublicIngestPlan(
throw new Error('No configured connections are eligible for ingest');
}
const warnings: string[] = [];
const warnings = createWarningAccumulator();
const targets = selected.map(([connectionId, connection]) =>
targetForConnection(connectionId, connection, project.config, args, warnings),
);
@ -294,7 +345,7 @@ export function buildPublicIngestPlan(
...targets.filter((t) => t.operation === 'database-ingest'),
...targets.filter((t) => t.operation === 'source-ingest'),
],
warnings,
warnings: finalizeWarnings(warnings, args),
};
}