import { type KtxLocalProject, type KtxProjectConnectionConfig, loadKtxProject } from '@ktx/context/project'; import type { KtxProgressPort } from '@ktx/context/scan'; import type { KtxCliIo } from './index.js'; import type { KtxIngestArgs, KtxIngestDeps, KtxIngestProgressUpdate } from './ingest.js'; import type { KtxScanArgs, KtxScanDeps } from './scan.js'; import { profileMark } from './startup-profile.js'; profileMark('module:public-ingest'); type KtxPublicIngestStepName = 'database-schema' | 'query-history' | 'source-ingest' | 'memory-update'; type KtxPublicIngestStepStatus = 'done' | 'skipped' | 'failed' | 'not-run'; type KtxPublicIngestInputMode = 'auto' | 'disabled'; type KtxPublicIngestDepth = 'fast' | 'deep'; type KtxPublicIngestQueryHistoryFlag = 'default' | 'enabled' | 'disabled'; type HistoricSqlDialect = 'postgres' | 'bigquery' | 'snowflake'; export type KtxPublicIngestArgs = | { command: 'run'; projectDir: string; targetConnectionId?: string; all: boolean; json: boolean; inputMode: KtxPublicIngestInputMode; depth?: KtxPublicIngestDepth; queryHistory?: KtxPublicIngestQueryHistoryFlag; queryHistoryWindowDays?: number; scanMode?: Extract['mode']; detectRelationships?: boolean; } | { command: 'status' | 'watch'; projectDir: string; runId?: string; json: boolean; inputMode: KtxPublicIngestInputMode; }; export interface KtxPublicIngestPlanTarget { connectionId: string; driver: string; operation: 'database-ingest' | 'source-ingest'; adapter?: string; sourceDir?: string; debugCommand: string; steps: KtxPublicIngestStepName[]; databaseDepth?: KtxPublicIngestDepth; queryHistory?: { enabled: boolean; dialect?: HistoricSqlDialect; windowDays?: number; unsupported?: boolean; skippedStoredByFast?: boolean; }; } export interface KtxPublicIngestPlan { projectDir: string; targets: KtxPublicIngestPlanTarget[]; warnings: string[]; } export interface KtxPublicIngestTargetResult { connectionId: string; driver: string; steps: Array<{ operation: KtxPublicIngestStepName; status: KtxPublicIngestStepStatus; detail?: string; debugCommand?: string; }>; } export type KtxPublicIngestProject = Pick; export interface KtxPublicIngestDeps { loadProject?: (options: Parameters[0]) => Promise; runScan?: (args: KtxScanArgs, io: KtxCliIo, deps?: KtxScanDeps) => Promise; runIngest?: (args: KtxIngestArgs, io: KtxCliIo, deps?: KtxIngestDeps) => Promise; scanProgress?: KtxProgressPort; ingestProgress?: (update: KtxIngestProgressUpdate) => void; } const sourceAdapterByDriver = new Map([ ['metabase', 'metabase'], ['local_metabase', 'metabase'], ['looker', 'looker'], ['local_looker', 'looker'], ['notion', 'notion'], ['metricflow', 'metricflow'], ['dbt', 'dbt'], ['lookml', 'lookml'], ]); const warehouseDrivers = new Set([ 'sqlite', 'postgres', 'postgresql', 'mysql', 'clickhouse', 'sqlserver', 'bigquery', 'snowflake', ]); const queryHistoryDialectByDriver = new Map([ ['postgres', 'postgres'], ['postgresql', 'postgres'], ['bigquery', 'bigquery'], ['snowflake', 'snowflake'], ]); function normalizedDriver(connection: KtxProjectConnectionConfig): string { return String(connection.driver ?? '') .trim() .toLowerCase(); } function connectionContext(connection: KtxProjectConnectionConfig): Record { const value = connection.context; return typeof value === 'object' && value !== null && !Array.isArray(value) ? (value as Record) : {}; } function storedDepth(connection: KtxProjectConnectionConfig): KtxPublicIngestDepth | undefined { const value = connectionContext(connection).depth; return value === 'fast' || value === 'deep' ? value : undefined; } function storedQueryHistory(connection: KtxProjectConnectionConfig): Record { const value = connectionContext(connection).queryHistory; return typeof value === 'object' && value !== null && !Array.isArray(value) ? (value as Record) : {}; } function positiveInteger(value: unknown): number | undefined { return typeof value === 'number' && Number.isInteger(value) && value > 0 ? value : undefined; } function depthFromLegacyScanMode( mode: Extract['mode'] | undefined, ): KtxPublicIngestDepth | undefined { return mode === 'enriched' || mode === 'relationships' ? 'deep' : undefined; } function sourceDirForConnection(connection: KtxProjectConnectionConfig): string | undefined { const value = connection.source_dir; return typeof value === 'string' && value.trim().length > 0 ? value.trim() : undefined; } function resolveDatabaseTargetOptions(input: { connectionId: string; driver: string; connection: KtxProjectConnectionConfig; args: { depth?: KtxPublicIngestDepth; queryHistory?: KtxPublicIngestQueryHistoryFlag; queryHistoryWindowDays?: number; scanMode?: Extract['mode']; }; warnings: string[]; }): Pick { const storedQh = storedQueryHistory(input.connection); const dialect = queryHistoryDialectByDriver.get(input.driver); const explicitQueryHistory = input.args.queryHistory ?? 'default'; const storedEnabled = storedQh.enabled === true; const requestedQh = explicitQueryHistory === 'enabled' || (explicitQueryHistory === 'default' && storedEnabled); let depth = input.args.depth ?? depthFromLegacyScanMode(input.args.scanMode) ?? storedDepth(input.connection) ?? 'fast'; const queryHistory = { enabled: false, ...(input.args.queryHistoryWindowDays !== undefined ? { windowDays: input.args.queryHistoryWindowDays } : positiveInteger(storedQh.windowDays) !== undefined ? { windowDays: positiveInteger(storedQh.windowDays) } : {}), }; if (requestedQh && !dialect) { input.warnings.push( explicitQueryHistory === 'enabled' || input.args.queryHistoryWindowDays !== undefined ? `--query-history is not supported for ${input.driver}; running schema ingest for ${input.connectionId}.` : `${input.connectionId} has query history enabled in ktx.yaml, but ${input.driver} does not support it; running schema ingest.`, ); return { databaseDepth: depth, queryHistory: { ...queryHistory, unsupported: true }, steps: ['database-schema'], }; } if (requestedQh && dialect) { if (depth === 'fast') { input.warnings.push(`--query-history requires deep ingest; running ${input.connectionId} with --deep.`); } depth = 'deep'; return { databaseDepth: depth, queryHistory: { ...queryHistory, enabled: true, dialect }, steps: ['database-schema', 'query-history'], }; } if (input.args.depth === 'fast' && explicitQueryHistory !== 'enabled' && storedEnabled) { input.warnings.push( `${input.connectionId} has query history enabled in ktx.yaml, but --fast skips query-history processing.`, ); return { databaseDepth: 'fast', queryHistory: { ...queryHistory, skippedStoredByFast: true }, steps: ['database-schema'], }; } return { databaseDepth: depth, queryHistory, steps: ['database-schema'], }; } function targetForConnection( connectionId: string, connection: KtxProjectConnectionConfig, args: { depth?: KtxPublicIngestDepth; queryHistory?: KtxPublicIngestQueryHistoryFlag; queryHistoryWindowDays?: number; scanMode?: Extract['mode']; }, warnings: string[], ): KtxPublicIngestPlanTarget { const driver = normalizedDriver(connection); const adapter = sourceAdapterByDriver.get(driver); const sourceDir = sourceDirForConnection(connection); if (adapter) { if (args.depth) { warnings.push(`--${args.depth} affects database ingest only; ignoring it for ${connectionId}.`); } if (args.queryHistory === 'enabled' || args.queryHistoryWindowDays !== undefined) { warnings.push(`--query-history affects database ingest only; ignoring it for ${connectionId}.`); } return { connectionId, driver, operation: 'source-ingest', adapter, ...(sourceDir ? { sourceDir } : {}), debugCommand: `ktx ingest ${connectionId} --debug`, steps: ['source-ingest', 'memory-update'], }; } if (warehouseDrivers.has(driver)) { const options = resolveDatabaseTargetOptions({ connectionId, driver, connection, args, warnings }); return { connectionId, driver, operation: 'database-ingest', debugCommand: `ktx ingest ${connectionId} --debug`, ...options, }; } throw new Error(`Connection "${connectionId}" uses unsupported public ingest driver "${driver || 'unknown'}"`); } export function buildPublicIngestPlan( project: KtxPublicIngestProject, args: { projectDir: string; targetConnectionId?: string; all: boolean; depth?: KtxPublicIngestDepth; queryHistory?: KtxPublicIngestQueryHistoryFlag; queryHistoryWindowDays?: number; scanMode?: Extract['mode']; }, ): KtxPublicIngestPlan { if (!args.all && !args.targetConnectionId) { throw new Error('Context build requires a connection id or all targets'); } const entries = Object.entries(project.config.connections).sort(([a], [b]) => a.localeCompare(b)); const selected = args.all ? entries : entries.filter(([connectionId]) => connectionId === args.targetConnectionId); if (!args.all && selected.length === 0) { throw new Error(`Connection "${args.targetConnectionId}" is not configured in ktx.yaml`); } if (selected.length === 0) { throw new Error('No configured connections are eligible for ingest'); } const warnings: string[] = []; const targets = selected.map(([connectionId, connection]) => targetForConnection(connectionId, connection, args, warnings)); return { projectDir: args.projectDir, targets: [ ...targets.filter((t) => t.operation === 'database-ingest'), ...targets.filter((t) => t.operation === 'source-ingest'), ], warnings, }; } function defaultSteps(target: KtxPublicIngestPlanTarget): KtxPublicIngestTargetResult['steps'] { return [ { operation: 'scan', status: target.steps.includes('scan') ? 'not-run' : 'skipped', ...(target.operation === 'scan' ? { debugCommand: target.debugCommand } : {}), }, { operation: 'source-ingest', status: target.steps.includes('source-ingest') ? 'not-run' : 'skipped', ...(target.operation === 'source-ingest' ? { debugCommand: target.debugCommand } : {}), }, { operation: 'enrich', status: 'skipped' }, { operation: 'memory-update', status: target.steps.includes('memory-update') ? 'not-run' : 'skipped', ...(target.operation === 'source-ingest' ? { debugCommand: target.debugCommand } : {}), }, ]; } function markTargetResult(target: KtxPublicIngestPlanTarget, status: 'done' | 'failed'): KtxPublicIngestTargetResult { const failedOperation = target.operation === 'scan' ? 'scan' : 'source-ingest'; return { connectionId: target.connectionId, driver: target.driver, steps: defaultSteps(target).map((step) => { if (!target.steps.includes(step.operation)) { return step; } if (status === 'done') { return { ...step, status: 'done' }; } if (step.operation === failedOperation) { return { ...step, status: 'failed', detail: `${target.connectionId} failed at ${failedOperation}.` }; } return { ...step, status: 'not-run' }; }), }; } function resultFailed(result: KtxPublicIngestTargetResult): boolean { return result.steps.some((step) => step.status === 'failed'); } function stepStatus(result: KtxPublicIngestTargetResult, operation: KtxPublicIngestStepName): string { return result.steps.find((step) => step.operation === operation)?.status ?? 'not-run'; } function renderPlainResults(results: KtxPublicIngestTargetResult[], io: KtxCliIo): void { const failures = results.filter(resultFailed); io.stdout.write(failures.length > 0 ? 'Ingest finished with partial failures\n' : 'Ingest finished\n'); io.stdout.write('\n'); io.stdout.write('Source Scan Source ingest Enrich Memory update\n'); for (const result of results) { io.stdout.write( `${result.connectionId.padEnd(14)} ${stepStatus(result, 'scan').padEnd(9)} ${stepStatus( result, 'source-ingest', ).padEnd(14)} ${stepStatus(result, 'enrich').padEnd(8)} ${stepStatus(result, 'memory-update')}\n`, ); } if (failures.length === 0) { return; } io.stdout.write('\nFailed sources:\n'); for (const result of failures) { const failedStep = result.steps.find((step) => step.status === 'failed'); if (!failedStep) { continue; } io.stdout.write(` ${failedStep.detail ?? `${result.connectionId} failed.`}\n`); if (failedStep.debugCommand) { io.stdout.write(` Debug: ${failedStep.debugCommand}\n`); } } } function hasInteractiveInput(io: KtxCliIo): boolean { const stdin = (io as { stdin?: { isTTY?: boolean; setRawMode?: (value: boolean) => void } }).stdin; return stdin?.isTTY === true && typeof stdin.setRawMode === 'function'; } function sourceIngestOutputMode(args: Extract, io: KtxCliIo): 'plain' | 'viz' { return args.inputMode === 'auto' && io.stdout.isTTY === true && hasInteractiveInput(io) ? 'viz' : 'plain'; } export async function executePublicIngestTarget( target: KtxPublicIngestPlanTarget, args: Extract, io: KtxCliIo, deps: KtxPublicIngestDeps, ): Promise { if (target.operation === 'scan') { const { runKtxScan } = await import('./scan.js'); const scanArgs: KtxScanArgs = { command: 'run', projectDir: args.projectDir, connectionId: target.connectionId, mode: args.scanMode ?? 'structural', detectRelationships: args.detectRelationships ?? false, dryRun: false, }; const runScan = deps.runScan ?? runKtxScan; const exitCode = deps.scanProgress ? await runScan(scanArgs, io, { progress: deps.scanProgress }) : await runScan(scanArgs, io); return markTargetResult(target, exitCode === 0 ? 'done' : 'failed'); } const { runKtxIngest } = await import('./ingest.js'); const ingestArgs: KtxIngestArgs = { command: 'run', projectDir: args.projectDir, connectionId: target.connectionId, adapter: target.adapter ?? target.driver, ...(target.sourceDir ? { sourceDir: target.sourceDir } : {}), outputMode: sourceIngestOutputMode(args, io), inputMode: args.inputMode, }; const runIngest = deps.runIngest ?? runKtxIngest; const exitCode = deps.ingestProgress ? await runIngest(ingestArgs, io, { progress: deps.ingestProgress }) : await runIngest(ingestArgs, io); return markTargetResult(target, exitCode === 0 ? 'done' : 'failed'); } export async function runKtxPublicIngest( args: KtxPublicIngestArgs, io: KtxCliIo, deps: KtxPublicIngestDeps = {}, ): Promise { if (args.command !== 'run') { const { runKtxIngest } = await import('./ingest.js'); return await (deps.runIngest ?? runKtxIngest)( { command: args.command, projectDir: args.projectDir, ...(args.runId ? { runId: args.runId } : {}), outputMode: args.json ? 'json' : args.command === 'watch' ? 'viz' : 'plain', inputMode: args.inputMode, }, io, ); } const loadProject = deps.loadProject ?? loadKtxProject; const project = await loadProject({ projectDir: args.projectDir }); const plan = buildPublicIngestPlan(project, args); const results: KtxPublicIngestTargetResult[] = []; for (const target of plan.targets) { results.push(await executePublicIngestTarget(target, args, io, deps)); } if (args.json) { io.stdout.write(`${JSON.stringify({ plan, results }, null, 2)}\n`); } else { renderPlainResults(results, io); } return results.some(resultFailed) ? 1 : 0; }