diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index c1096b2b..a60ceaf0 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -49,6 +49,8 @@ export type KtxIngestArgs = cliVersion?: string; runtimeInstallPolicy?: KtxManagedPythonInstallPolicy; debugLlmRequestFile?: string; + allowImplicitAdapter?: boolean; + historicSqlPullConfigOverride?: Record; outputMode: KtxIngestOutputMode; inputMode?: KtxIngestInputMode; } @@ -571,6 +573,19 @@ export async function runKtxIngest( const project = await loadKtxProject({ projectDir: args.projectDir }); const env = deps.env ?? process.env; if (args.command === 'run') { + const ingestProject = + args.allowImplicitAdapter && !project.config.ingest.adapters.includes(args.adapter) + ? { + ...project, + config: { + ...project.config, + ingest: { + ...project.config.ingest, + adapters: [...project.config.ingest.adapters, args.adapter], + }, + }, + } + : project; const createAdapters = deps.createAdapters ?? (deps.runLocalIngest || deps.runLocalMetabaseIngest ? () => [] : createKtxCliLocalIngestAdapters); @@ -583,11 +598,14 @@ export async function runKtxIngest( ...(args.databaseIntrospectionUrl ? { databaseIntrospectionUrl: args.databaseIntrospectionUrl } : {}), ...(managedDaemon ? { managedDaemon } : {}), ...(args.adapter === 'historic-sql' ? { historicSqlConnectionId: args.connectionId } : {}), + ...(args.historicSqlPullConfigOverride + ? { historicSqlPullConfigOverride: args.historicSqlPullConfigOverride } + : {}), logger: operationalLogger, }; const queryExecutor = localIngestOptions.queryExecutor ?? - (deps.createQueryExecutor ?? createKtxCliIngestQueryExecutor)(project); + (deps.createQueryExecutor ?? createKtxCliIngestQueryExecutor)(ingestProject); if (args.adapter === 'metabase' && args.sourceDir) { throw new Error('source-dir uploads are not supported for the Metabase fan-out adapter'); } @@ -604,8 +622,8 @@ export async function runKtxIngest( deps.progress, ); const result = await executeMetabaseFanout({ - project, - adapters: createAdapters(project, adapterOptions), + project: ingestProject, + adapters: createAdapters(ingestProject, adapterOptions), metabaseConnectionId: args.connectionId, ...localIngestOptions, queryExecutor, @@ -668,8 +686,8 @@ export async function runKtxIngest( try { const result = await executeLocalIngest({ - project, - adapters: createAdapters(project, adapterOptions), + project: ingestProject, + adapters: createAdapters(ingestProject, adapterOptions), adapter: args.adapter, connectionId: args.connectionId, sourceDir: args.sourceDir, diff --git a/packages/cli/src/public-ingest.test.ts b/packages/cli/src/public-ingest.test.ts index 458874b8..0a5c35d3 100644 --- a/packages/cli/src/public-ingest.test.ts +++ b/packages/cli/src/public-ingest.test.ts @@ -161,6 +161,75 @@ describe('buildPublicIngestPlan', () => { }); describe('runKtxPublicIngest', () => { + it('maps fast and deep database targets to scan internals', async () => { + const io = makeIo(); + const project = projectWithConnections({ + fast: { driver: 'postgres' }, + deep: { driver: 'postgres', context: { depth: 'deep' } }, + }); + const runScan = vi.fn(async () => 0); + + await expect( + runKtxPublicIngest( + { command: 'run', projectDir: '/tmp/project', all: true, json: false, inputMode: 'disabled', queryHistory: 'default' }, + io.io, + { loadProject: vi.fn(async () => project), runScan }, + ), + ).resolves.toBe(0); + + expect(runScan).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ connectionId: 'deep', mode: 'enriched', detectRelationships: true }), + expect.anything(), + ); + expect(runScan).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ connectionId: 'fast', mode: 'structural', detectRelationships: false }), + expect.anything(), + ); + }); + + it('runs query history after schema ingest with current-run window override', async () => { + const io = makeIo(); + const project = projectWithConnections({ + warehouse: { driver: 'postgres', context: { queryHistory: { enabled: true, windowDays: 90 } } }, + }); + const runScan = vi.fn(async () => 0); + const runIngest = vi.fn(async () => 0); + + await expect( + runKtxPublicIngest( + { + command: 'run', + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + json: false, + inputMode: 'disabled', + queryHistory: 'enabled', + queryHistoryWindowDays: 30, + }, + io.io, + { loadProject: vi.fn(async () => project), runScan, runIngest }, + ), + ).resolves.toBe(0); + + expect(runScan).toHaveBeenCalledWith( + expect.objectContaining({ connectionId: 'warehouse', mode: 'enriched' }), + expect.anything(), + ); + expect(runIngest).toHaveBeenCalledWith( + expect.objectContaining({ + command: 'run', + connectionId: 'warehouse', + adapter: 'historic-sql', + allowImplicitAdapter: true, + historicSqlPullConfigOverride: expect.objectContaining({ dialect: 'postgres', windowDays: 30 }), + }), + expect.anything(), + ); + }); + it('runs all independent targets and reports partial failures', async () => { const io = makeIo(); const project = projectWithConnections({ @@ -205,8 +274,8 @@ describe('runKtxPublicIngest', () => { expect.anything(), ); expect(io.stdout()).toContain('Ingest finished with partial failures'); - expect(io.stdout()).toContain('warehouse failed at scan.'); - expect(io.stdout()).toContain('Debug: ktx scan warehouse --debug'); + expect(io.stdout()).toContain('warehouse failed at database-schema.'); + expect(io.stdout()).toContain('Debug: ktx ingest warehouse --debug'); }); it('can request enriched relationship scans for setup-managed context builds', async () => { diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index dd4daf38..490b1c50 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -303,16 +303,20 @@ export function buildPublicIngestPlan( function defaultSteps(target: KtxPublicIngestPlanTarget): KtxPublicIngestTargetResult['steps'] { return [ { - operation: 'scan', - status: target.steps.includes('scan') ? 'not-run' : 'skipped', - ...(target.operation === 'scan' ? { debugCommand: target.debugCommand } : {}), + operation: 'database-schema', + status: target.steps.includes('database-schema') ? 'not-run' : 'skipped', + ...(target.operation === 'database-ingest' ? { debugCommand: target.debugCommand } : {}), + }, + { + operation: 'query-history', + status: target.steps.includes('query-history') ? 'not-run' : 'skipped', + ...(target.operation === 'database-ingest' ? { debugCommand: target.debugCommand } : {}), }, { operation: 'source-ingest', status: target.steps.includes('source-ingest') ? 'not-run' : 'skipped', ...(target.operation === 'source-ingest' ? { debugCommand: target.debugCommand } : {}), }, - { operation: 'enrich', status: 'skipped' }, { operation: 'memory-update', status: target.steps.includes('memory-update') ? 'not-run' : 'skipped', @@ -321,8 +325,13 @@ function defaultSteps(target: KtxPublicIngestPlanTarget): KtxPublicIngestTargetR ]; } -function markTargetResult(target: KtxPublicIngestPlanTarget, status: 'done' | 'failed'): KtxPublicIngestTargetResult { - const failedOperation = target.operation === 'scan' ? 'scan' : 'source-ingest'; +function markTargetResult( + target: KtxPublicIngestPlanTarget, + status: 'done' | 'failed', + failedOperation?: KtxPublicIngestStepName, +): KtxPublicIngestTargetResult { + const selectedFailedOperation = + failedOperation ?? (target.operation === 'database-ingest' ? 'database-schema' : 'source-ingest'); return { connectionId: target.connectionId, driver: target.driver, @@ -333,8 +342,12 @@ function markTargetResult(target: KtxPublicIngestPlanTarget, status: 'done' | 'f if (status === 'done') { return { ...step, status: 'done' }; } - if (step.operation === failedOperation) { - return { ...step, status: 'failed', detail: `${target.connectionId} failed at ${failedOperation}.` }; + if (step.operation === selectedFailedOperation) { + return { + ...step, + status: 'failed', + detail: `${target.connectionId} failed at ${selectedFailedOperation}.`, + }; } return { ...step, status: 'not-run' }; }), @@ -353,13 +366,16 @@ function renderPlainResults(results: KtxPublicIngestTargetResult[], io: KtxCliIo const failures = results.filter(resultFailed); io.stdout.write(failures.length > 0 ? 'Ingest finished with partial failures\n' : 'Ingest finished\n'); io.stdout.write('\n'); - io.stdout.write('Source Scan Source ingest Enrich Memory update\n'); + io.stdout.write('Source Database schema Query history Source ingest Memory update\n'); for (const result of results) { io.stdout.write( - `${result.connectionId.padEnd(14)} ${stepStatus(result, 'scan').padEnd(9)} ${stepStatus( + `${result.connectionId.padEnd(14)} ${stepStatus(result, 'database-schema').padEnd(16)} ${stepStatus( + result, + 'query-history', + ).padEnd(14)} ${stepStatus( result, 'source-ingest', - ).padEnd(14)} ${stepStatus(result, 'enrich').padEnd(8)} ${stepStatus(result, 'memory-update')}\n`, + ).padEnd(14)} ${stepStatus(result, 'memory-update')}\n`, ); } @@ -395,21 +411,47 @@ export async function executePublicIngestTarget( io: KtxCliIo, deps: KtxPublicIngestDeps, ): Promise { - if (target.operation === 'scan') { + if (target.operation === 'database-ingest') { const { runKtxScan } = await import('./scan.js'); const scanArgs: KtxScanArgs = { command: 'run', projectDir: args.projectDir, connectionId: target.connectionId, - mode: args.scanMode ?? 'structural', - detectRelationships: args.detectRelationships ?? false, + mode: target.databaseDepth === 'deep' ? 'enriched' : 'structural', + detectRelationships: target.databaseDepth === 'deep' ? true : false, dryRun: false, }; const runScan = deps.runScan ?? runKtxScan; - const exitCode = deps.scanProgress + const scanExitCode = deps.scanProgress ? await runScan(scanArgs, io, { progress: deps.scanProgress }) : await runScan(scanArgs, io); - return markTargetResult(target, exitCode === 0 ? 'done' : 'failed'); + if (scanExitCode !== 0) { + return markTargetResult(target, 'failed', 'database-schema'); + } + + if (target.queryHistory?.enabled === true) { + const { runKtxIngest } = await import('./ingest.js'); + const runIngest = deps.runIngest ?? runKtxIngest; + const ingestArgs: KtxIngestArgs = { + command: 'run', + projectDir: args.projectDir, + connectionId: target.connectionId, + adapter: 'historic-sql', + outputMode: sourceIngestOutputMode(args, io), + inputMode: args.inputMode, + allowImplicitAdapter: true, + historicSqlPullConfigOverride: { + dialect: target.queryHistory.dialect, + ...(target.queryHistory.windowDays !== undefined ? { windowDays: target.queryHistory.windowDays } : {}), + }, + }; + const qhExitCode = await runIngest(ingestArgs, io); + if (qhExitCode !== 0) { + return markTargetResult(target, 'failed', 'query-history'); + } + } + + return markTargetResult(target, 'done'); } const { runKtxIngest } = await import('./ingest.js'); @@ -453,6 +495,12 @@ export async function runKtxPublicIngest( const plan = buildPublicIngestPlan(project, args); const results: KtxPublicIngestTargetResult[] = []; + if (!args.json && plan.warnings.length > 0) { + for (const warning of plan.warnings) { + io.stderr.write(`Warning: ${warning}\n`); + } + } + for (const target of plan.targets) { results.push(await executePublicIngestTarget(target, args, io, deps)); }