diff --git a/packages/cli/src/public-ingest.test.ts b/packages/cli/src/public-ingest.test.ts index 1c133a19..458874b8 100644 --- a/packages/cli/src/public-ingest.test.ts +++ b/packages/cli/src/public-ingest.test.ts @@ -48,16 +48,18 @@ describe('buildPublicIngestPlan', () => { { connectionId: 'warehouse', driver: 'postgres', - operation: 'scan', - debugCommand: 'ktx scan warehouse --debug', - steps: ['scan'], + operation: 'database-ingest', + debugCommand: 'ktx ingest warehouse --debug', + steps: ['database-schema'], + databaseDepth: 'fast', + queryHistory: { enabled: false }, }, { connectionId: 'docs', driver: 'notion', operation: 'source-ingest', adapter: 'notion', - debugCommand: 'ktx ingest run --connection-id docs --adapter notion --debug', + debugCommand: 'ktx ingest docs --debug', steps: ['source-ingest', 'memory-update'], }, { @@ -65,10 +67,11 @@ describe('buildPublicIngestPlan', () => { driver: 'metabase', operation: 'source-ingest', adapter: 'metabase', - debugCommand: 'ktx ingest run --connection-id prod_metabase --adapter metabase --debug', + debugCommand: 'ktx ingest prod_metabase --debug', steps: ['source-ingest', 'memory-update'], }, ], + warnings: [], }); }); @@ -80,6 +83,81 @@ describe('buildPublicIngestPlan', () => { ); }); + it('resolves database depth from flags, stored context, and defaults', () => { + const project = projectWithConnections({ + fast_default: { driver: 'postgres' }, + deep_default: { driver: 'postgres', context: { depth: 'deep' } }, + docs: { driver: 'notion' }, + }); + + expect( + buildPublicIngestPlan(project, { + projectDir: '/tmp/project', + targetConnectionId: 'fast_default', + all: false, + queryHistory: 'default', + }).targets[0], + ).toMatchObject({ connectionId: 'fast_default', databaseDepth: 'fast', queryHistory: { enabled: false } }); + + expect( + buildPublicIngestPlan(project, { + projectDir: '/tmp/project', + targetConnectionId: 'deep_default', + all: false, + queryHistory: 'default', + }).targets[0], + ).toMatchObject({ connectionId: 'deep_default', databaseDepth: 'deep' }); + + expect( + buildPublicIngestPlan(project, { + projectDir: '/tmp/project', + targetConnectionId: 'docs', + all: false, + depth: 'deep', + queryHistory: 'default', + }).warnings, + ).toEqual(['--deep affects database ingest only; ignoring it for docs.']); + }); + + it('upgrades effective depth when query history is explicitly enabled', () => { + const project = projectWithConnections({ + warehouse: { driver: 'postgres', context: { queryHistory: { enabled: false } } }, + }); + + const plan = buildPublicIngestPlan(project, { + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + depth: 'fast', + queryHistory: 'enabled', + queryHistoryWindowDays: 30, + }); + + expect(plan.targets[0]).toMatchObject({ + connectionId: 'warehouse', + databaseDepth: 'deep', + queryHistory: { enabled: true, windowDays: 30, dialect: 'postgres' }, + }); + expect(plan.warnings).toEqual(['--query-history requires deep ingest; running warehouse with --deep.']); + }); + + it('warns and skips query history for unsupported database drivers', () => { + const project = projectWithConnections({ local: { driver: 'sqlite' } }); + + const plan = buildPublicIngestPlan(project, { + projectDir: '/tmp/project', + targetConnectionId: 'local', + all: false, + queryHistory: 'enabled', + }); + + expect(plan.targets[0]).toMatchObject({ + connectionId: 'local', + databaseDepth: 'fast', + queryHistory: { enabled: false, unsupported: true }, + }); + expect(plan.warnings).toEqual(['--query-history is not supported for sqlite; running schema ingest for local.']); + }); }); describe('runKtxPublicIngest', () => { diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index b126e702..dd4daf38 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -7,9 +7,12 @@ import { profileMark } from './startup-profile.js'; profileMark('module:public-ingest'); -type KtxPublicIngestStepName = 'scan' | 'source-ingest' | 'enrich' | 'memory-update'; +type KtxPublicIngestStepName = 'database-schema' | 'query-history' | 'source-ingest' | 'memory-update'; type KtxPublicIngestStepStatus = 'done' | 'skipped' | 'failed' | 'not-run'; type KtxPublicIngestInputMode = 'auto' | 'disabled'; +type KtxPublicIngestDepth = 'fast' | 'deep'; +type KtxPublicIngestQueryHistoryFlag = 'default' | 'enabled' | 'disabled'; +type HistoricSqlDialect = 'postgres' | 'bigquery' | 'snowflake'; export type KtxPublicIngestArgs = | { @@ -19,6 +22,9 @@ export type KtxPublicIngestArgs = all: boolean; json: boolean; inputMode: KtxPublicIngestInputMode; + depth?: KtxPublicIngestDepth; + queryHistory?: KtxPublicIngestQueryHistoryFlag; + queryHistoryWindowDays?: number; scanMode?: Extract['mode']; detectRelationships?: boolean; } @@ -33,16 +39,25 @@ export type KtxPublicIngestArgs = export interface KtxPublicIngestPlanTarget { connectionId: string; driver: string; - operation: 'scan' | 'source-ingest'; + operation: 'database-ingest' | 'source-ingest'; adapter?: string; sourceDir?: string; debugCommand: string; steps: KtxPublicIngestStepName[]; + databaseDepth?: KtxPublicIngestDepth; + queryHistory?: { + enabled: boolean; + dialect?: HistoricSqlDialect; + windowDays?: number; + unsupported?: boolean; + skippedStoredByFast?: boolean; + }; } export interface KtxPublicIngestPlan { projectDir: string; targets: KtxPublicIngestPlanTarget[]; + warnings: string[]; } export interface KtxPublicIngestTargetResult { @@ -88,40 +103,159 @@ const warehouseDrivers = new Set([ 'snowflake', ]); +const queryHistoryDialectByDriver = new Map([ + ['postgres', 'postgres'], + ['postgresql', 'postgres'], + ['bigquery', 'bigquery'], + ['snowflake', 'snowflake'], +]); + function normalizedDriver(connection: KtxProjectConnectionConfig): string { return String(connection.driver ?? '') .trim() .toLowerCase(); } +function connectionContext(connection: KtxProjectConnectionConfig): Record { + const value = connection.context; + return typeof value === 'object' && value !== null && !Array.isArray(value) ? (value as Record) : {}; +} + +function storedDepth(connection: KtxProjectConnectionConfig): KtxPublicIngestDepth | undefined { + const value = connectionContext(connection).depth; + return value === 'fast' || value === 'deep' ? value : undefined; +} + +function storedQueryHistory(connection: KtxProjectConnectionConfig): Record { + const value = connectionContext(connection).queryHistory; + return typeof value === 'object' && value !== null && !Array.isArray(value) ? (value as Record) : {}; +} + +function positiveInteger(value: unknown): number | undefined { + return typeof value === 'number' && Number.isInteger(value) && value > 0 ? value : undefined; +} + +function depthFromLegacyScanMode( + mode: Extract['mode'] | undefined, +): KtxPublicIngestDepth | undefined { + return mode === 'enriched' || mode === 'relationships' ? 'deep' : undefined; +} + function sourceDirForConnection(connection: KtxProjectConnectionConfig): string | undefined { const value = connection.source_dir; return typeof value === 'string' && value.trim().length > 0 ? value.trim() : undefined; } -function targetForConnection(connectionId: string, connection: KtxProjectConnectionConfig): KtxPublicIngestPlanTarget { +function resolveDatabaseTargetOptions(input: { + connectionId: string; + driver: string; + connection: KtxProjectConnectionConfig; + args: { + depth?: KtxPublicIngestDepth; + queryHistory?: KtxPublicIngestQueryHistoryFlag; + queryHistoryWindowDays?: number; + scanMode?: Extract['mode']; + }; + warnings: string[]; +}): Pick { + const storedQh = storedQueryHistory(input.connection); + const dialect = queryHistoryDialectByDriver.get(input.driver); + const explicitQueryHistory = input.args.queryHistory ?? 'default'; + const storedEnabled = storedQh.enabled === true; + const requestedQh = explicitQueryHistory === 'enabled' || (explicitQueryHistory === 'default' && storedEnabled); + let depth = input.args.depth ?? depthFromLegacyScanMode(input.args.scanMode) ?? storedDepth(input.connection) ?? 'fast'; + const queryHistory = { + enabled: false, + ...(input.args.queryHistoryWindowDays !== undefined + ? { windowDays: input.args.queryHistoryWindowDays } + : positiveInteger(storedQh.windowDays) !== undefined + ? { windowDays: positiveInteger(storedQh.windowDays) } + : {}), + }; + + if (requestedQh && !dialect) { + input.warnings.push( + explicitQueryHistory === 'enabled' || input.args.queryHistoryWindowDays !== undefined + ? `--query-history is not supported for ${input.driver}; running schema ingest for ${input.connectionId}.` + : `${input.connectionId} has query history enabled in ktx.yaml, but ${input.driver} does not support it; running schema ingest.`, + ); + return { + databaseDepth: depth, + queryHistory: { ...queryHistory, unsupported: true }, + steps: ['database-schema'], + }; + } + + if (requestedQh && dialect) { + if (depth === 'fast') { + input.warnings.push(`--query-history requires deep ingest; running ${input.connectionId} with --deep.`); + } + depth = 'deep'; + return { + databaseDepth: depth, + queryHistory: { ...queryHistory, enabled: true, dialect }, + steps: ['database-schema', 'query-history'], + }; + } + + if (input.args.depth === 'fast' && explicitQueryHistory !== 'enabled' && storedEnabled) { + input.warnings.push( + `${input.connectionId} has query history enabled in ktx.yaml, but --fast skips query-history processing.`, + ); + return { + databaseDepth: 'fast', + queryHistory: { ...queryHistory, skippedStoredByFast: true }, + steps: ['database-schema'], + }; + } + + return { + databaseDepth: depth, + queryHistory, + steps: ['database-schema'], + }; +} + +function targetForConnection( + connectionId: string, + connection: KtxProjectConnectionConfig, + args: { + depth?: KtxPublicIngestDepth; + queryHistory?: KtxPublicIngestQueryHistoryFlag; + queryHistoryWindowDays?: number; + scanMode?: Extract['mode']; + }, + warnings: string[], +): KtxPublicIngestPlanTarget { const driver = normalizedDriver(connection); const adapter = sourceAdapterByDriver.get(driver); const sourceDir = sourceDirForConnection(connection); if (adapter) { + if (args.depth) { + warnings.push(`--${args.depth} affects database ingest only; ignoring it for ${connectionId}.`); + } + if (args.queryHistory === 'enabled' || args.queryHistoryWindowDays !== undefined) { + warnings.push(`--query-history affects database ingest only; ignoring it for ${connectionId}.`); + } return { connectionId, driver, operation: 'source-ingest', adapter, ...(sourceDir ? { sourceDir } : {}), - debugCommand: `ktx ingest run --connection-id ${connectionId} --adapter ${adapter} --debug`, + debugCommand: `ktx ingest ${connectionId} --debug`, steps: ['source-ingest', 'memory-update'], }; } if (warehouseDrivers.has(driver)) { + const options = resolveDatabaseTargetOptions({ connectionId, driver, connection, args, warnings }); return { connectionId, driver, - operation: 'scan', - debugCommand: `ktx scan ${connectionId} --debug`, - steps: ['scan'], + operation: 'database-ingest', + debugCommand: `ktx ingest ${connectionId} --debug`, + ...options, }; } @@ -130,7 +264,15 @@ function targetForConnection(connectionId: string, connection: KtxProjectConnect export function buildPublicIngestPlan( project: KtxPublicIngestProject, - args: { projectDir: string; targetConnectionId?: string; all: boolean }, + args: { + projectDir: string; + targetConnectionId?: string; + all: boolean; + depth?: KtxPublicIngestDepth; + queryHistory?: KtxPublicIngestQueryHistoryFlag; + queryHistoryWindowDays?: number; + scanMode?: Extract['mode']; + }, ): KtxPublicIngestPlan { if (!args.all && !args.targetConnectionId) { throw new Error('Context build requires a connection id or all targets'); @@ -146,10 +288,15 @@ export function buildPublicIngestPlan( throw new Error('No configured connections are eligible for ingest'); } - const targets = selected.map(([connectionId, connection]) => targetForConnection(connectionId, connection)); + const warnings: string[] = []; + const targets = selected.map(([connectionId, connection]) => targetForConnection(connectionId, connection, args, warnings)); return { projectDir: args.projectDir, - targets: [...targets.filter((t) => t.operation === 'scan'), ...targets.filter((t) => t.operation === 'source-ingest')], + targets: [ + ...targets.filter((t) => t.operation === 'database-ingest'), + ...targets.filter((t) => t.operation === 'source-ingest'), + ], + warnings, }; }