diff --git a/packages/cli/src/ingest-depth.ts b/packages/cli/src/ingest-depth.ts new file mode 100644 index 00000000..fa4cc10c --- /dev/null +++ b/packages/cli/src/ingest-depth.ts @@ -0,0 +1,77 @@ +import type { KtxProjectConfig, KtxProjectConnectionConfig } from '@ktx/context/project'; + +export type KtxDatabaseContextDepth = 'fast' | 'deep'; + +export const KTX_DATABASE_DRIVER_IDS = new Set([ + 'sqlite', + 'postgres', + 'postgresql', + 'mysql', + 'clickhouse', + 'sqlserver', + 'bigquery', + 'snowflake', +]); + +export function normalizeConnectionDriver(connection: KtxProjectConnectionConfig): string { + return String(connection.driver ?? '') + .trim() + .toLowerCase(); +} + +export function isDatabaseDriver(driver: string): boolean { + return KTX_DATABASE_DRIVER_IDS.has(driver.trim().toLowerCase()); +} + +export function connectionContextRecord(connection: KtxProjectConnectionConfig): Record { + const context = connection.context; + return typeof context === 'object' && context !== null && !Array.isArray(context) + ? (context as Record) + : {}; +} + +export function databaseContextDepth(connection: KtxProjectConnectionConfig): KtxDatabaseContextDepth | undefined { + const depth = connectionContextRecord(connection).depth; + return depth === 'fast' || depth === 'deep' ? depth : undefined; +} + +export function withDatabaseContextDepth( + connection: KtxProjectConnectionConfig, + depth: KtxDatabaseContextDepth, +): KtxProjectConnectionConfig { + return { + ...connection, + context: { + ...connectionContextRecord(connection), + depth, + }, + }; +} + +export function deepReadinessGaps(config: KtxProjectConfig): string[] { + const gaps: string[] = []; + if (config.llm.provider.backend === 'none' || !config.llm.models.default) { + gaps.push('model configuration'); + } + + if (config.scan.enrichment.mode !== 'llm') { + gaps.push('scan enrichment mode'); + } + + const embeddings = config.scan.enrichment.embeddings; + if ( + !embeddings || + embeddings.backend === 'none' || + embeddings.backend === 'deterministic' || + !embeddings.model || + embeddings.dimensions <= 0 + ) { + gaps.push('scan embeddings'); + } + + return gaps; +} + +export function recommendedDatabaseContextDepth(config: KtxProjectConfig): KtxDatabaseContextDepth { + return deepReadinessGaps(config).length === 0 ? 'deep' : 'fast'; +} diff --git a/packages/cli/src/public-ingest.test.ts b/packages/cli/src/public-ingest.test.ts index 0a5c35d3..edc4a226 100644 --- a/packages/cli/src/public-ingest.test.ts +++ b/packages/cli/src/public-ingest.test.ts @@ -34,6 +34,40 @@ function projectWithConnections(connections: KtxProjectConfig['connections']): K }; } +function deepReadyProject( + connections: KtxProjectConfig['connections'], + relationshipsEnabled = true, +): KtxPublicIngestProject { + const config = buildDefaultKtxProjectConfig('warehouse'); + return { + projectDir: '/tmp/project', + config: { + ...config, + connections, + llm: { + ...config.llm, + provider: { backend: 'gateway', gateway: { api_key: 'env:KTX_GATEWAY_API_KEY' } }, + models: { default: 'gpt-test' }, + }, + scan: { + ...config.scan, + enrichment: { + mode: 'llm', + embeddings: { + backend: 'openai', + model: 'text-embedding-3-small', + dimensions: 1536, + }, + }, + relationships: { + ...config.scan.relationships, + enabled: relationshipsEnabled, + }, + }, + }, + }; +} + describe('buildPublicIngestPlan', () => { it('plans warehouse connections as scan targets and source connections as source ingest targets', () => { const project = projectWithConnections({ @@ -52,6 +86,7 @@ describe('buildPublicIngestPlan', () => { debugCommand: 'ktx ingest warehouse --debug', steps: ['database-schema'], databaseDepth: 'fast', + detectRelationships: false, queryHistory: { enabled: false }, }, { @@ -158,12 +193,50 @@ describe('buildPublicIngestPlan', () => { }); expect(plan.warnings).toEqual(['--query-history is not supported for sqlite; running schema ingest for local.']); }); + + it('records a preflight failure for deep database ingest when readiness config is missing', () => { + const project = projectWithConnections({ + warehouse: { driver: 'postgres', context: { depth: 'deep' } }, + }); + + const plan = buildPublicIngestPlan(project, { + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + queryHistory: 'default', + }); + + expect(plan.targets[0]).toMatchObject({ + connectionId: 'warehouse', + databaseDepth: 'deep', + preflightFailure: + 'warehouse requires deep ingest readiness: model configuration, scan enrichment mode, scan embeddings. Run ktx setup or rerun with --fast.', + }); + }); + + it('honors scan.relationships.enabled when planning deep database ingest', () => { + const plan = buildPublicIngestPlan( + deepReadyProject({ warehouse: { driver: 'postgres', context: { depth: 'deep' } } }, false), + { + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + queryHistory: 'default', + }, + ); + + expect(plan.targets[0]).toMatchObject({ + connectionId: 'warehouse', + databaseDepth: 'deep', + detectRelationships: false, + }); + }); }); describe('runKtxPublicIngest', () => { it('maps fast and deep database targets to scan internals', async () => { const io = makeIo(); - const project = projectWithConnections({ + const project = deepReadyProject({ fast: { driver: 'postgres' }, deep: { driver: 'postgres', context: { depth: 'deep' } }, }); @@ -191,7 +264,7 @@ describe('runKtxPublicIngest', () => { it('runs query history after schema ingest with current-run window override', async () => { const io = makeIo(); - const project = projectWithConnections({ + const project = deepReadyProject({ warehouse: { driver: 'postgres', context: { queryHistory: { enabled: true, windowDays: 90 } } }, }); const runScan = vi.fn(async () => 0); @@ -278,9 +351,34 @@ describe('runKtxPublicIngest', () => { expect(io.stdout()).toContain('Debug: ktx ingest warehouse --debug'); }); + it('fails deep-readiness targets before work starts while continuing independent --all targets', async () => { + const io = makeIo(); + const project = projectWithConnections({ + warehouse: { driver: 'postgres', context: { depth: 'deep' } }, + docs: { driver: 'notion' }, + }); + const runScan = vi.fn(async () => 0); + const runIngest = vi.fn(async () => 0); + + await expect( + runKtxPublicIngest( + { command: 'run', projectDir: '/tmp/project', all: true, json: false, inputMode: 'disabled' }, + io.io, + { loadProject: vi.fn(async () => project), runScan, runIngest }, + ), + ).resolves.toBe(1); + + expect(runScan).not.toHaveBeenCalled(); + expect(runIngest).toHaveBeenCalledWith( + expect.objectContaining({ command: 'run', connectionId: 'docs', adapter: 'notion' }), + expect.anything(), + ); + expect(io.stdout()).toContain('warehouse requires deep ingest readiness'); + }); + it('can request enriched relationship scans for setup-managed context builds', async () => { const io = makeIo(); - const project = projectWithConnections({ warehouse: { driver: 'postgres' } }); + const project = deepReadyProject({ warehouse: { driver: 'postgres' } }); const runScan = vi.fn(async () => 0); await expect( diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index 490b1c50..a22b7704 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -2,6 +2,13 @@ import { type KtxLocalProject, type KtxProjectConnectionConfig, loadKtxProject } import type { KtxProgressPort } from '@ktx/context/scan'; import type { KtxCliIo } from './index.js'; import type { KtxIngestArgs, KtxIngestDeps, KtxIngestProgressUpdate } from './ingest.js'; +import { + type KtxDatabaseContextDepth, + databaseContextDepth, + deepReadinessGaps, + isDatabaseDriver, + normalizeConnectionDriver, +} from './ingest-depth.js'; import type { KtxScanArgs, KtxScanDeps } from './scan.js'; import { profileMark } from './startup-profile.js'; @@ -10,7 +17,7 @@ profileMark('module:public-ingest'); type KtxPublicIngestStepName = 'database-schema' | 'query-history' | 'source-ingest' | 'memory-update'; type KtxPublicIngestStepStatus = 'done' | 'skipped' | 'failed' | 'not-run'; type KtxPublicIngestInputMode = 'auto' | 'disabled'; -type KtxPublicIngestDepth = 'fast' | 'deep'; +type KtxPublicIngestDepth = KtxDatabaseContextDepth; type KtxPublicIngestQueryHistoryFlag = 'default' | 'enabled' | 'disabled'; type HistoricSqlDialect = 'postgres' | 'bigquery' | 'snowflake'; @@ -45,6 +52,8 @@ export interface KtxPublicIngestPlanTarget { debugCommand: string; steps: KtxPublicIngestStepName[]; databaseDepth?: KtxPublicIngestDepth; + detectRelationships?: boolean; + preflightFailure?: string; queryHistory?: { enabled: boolean; dialect?: HistoricSqlDialect; @@ -92,17 +101,6 @@ const sourceAdapterByDriver = new Map([ ['lookml', 'lookml'], ]); -const warehouseDrivers = new Set([ - 'sqlite', - 'postgres', - 'postgresql', - 'mysql', - 'clickhouse', - 'sqlserver', - 'bigquery', - 'snowflake', -]); - const queryHistoryDialectByDriver = new Map([ ['postgres', 'postgres'], ['postgresql', 'postgres'], @@ -110,24 +108,8 @@ const queryHistoryDialectByDriver = new Map([ ['snowflake', 'snowflake'], ]); -function normalizedDriver(connection: KtxProjectConnectionConfig): string { - return String(connection.driver ?? '') - .trim() - .toLowerCase(); -} - -function connectionContext(connection: KtxProjectConnectionConfig): Record { - const value = connection.context; - return typeof value === 'object' && value !== null && !Array.isArray(value) ? (value as Record) : {}; -} - -function storedDepth(connection: KtxProjectConnectionConfig): KtxPublicIngestDepth | undefined { - const value = connectionContext(connection).depth; - return value === 'fast' || value === 'deep' ? value : undefined; -} - function storedQueryHistory(connection: KtxProjectConnectionConfig): Record { - const value = connectionContext(connection).queryHistory; + const value = connection.context?.queryHistory; return typeof value === 'object' && value !== null && !Array.isArray(value) ? (value as Record) : {}; } @@ -163,7 +145,8 @@ function resolveDatabaseTargetOptions(input: { const explicitQueryHistory = input.args.queryHistory ?? 'default'; const storedEnabled = storedQh.enabled === true; const requestedQh = explicitQueryHistory === 'enabled' || (explicitQueryHistory === 'default' && storedEnabled); - let depth = input.args.depth ?? depthFromLegacyScanMode(input.args.scanMode) ?? storedDepth(input.connection) ?? 'fast'; + let depth = + input.args.depth ?? depthFromLegacyScanMode(input.args.scanMode) ?? databaseContextDepth(input.connection) ?? 'fast'; const queryHistory = { enabled: false, ...(input.args.queryHistoryWindowDays !== undefined @@ -219,6 +202,7 @@ function resolveDatabaseTargetOptions(input: { function targetForConnection( connectionId: string, connection: KtxProjectConnectionConfig, + projectConfig: KtxPublicIngestProject['config'], args: { depth?: KtxPublicIngestDepth; queryHistory?: KtxPublicIngestQueryHistoryFlag; @@ -227,7 +211,7 @@ function targetForConnection( }, warnings: string[], ): KtxPublicIngestPlanTarget { - const driver = normalizedDriver(connection); + const driver = normalizeConnectionDriver(connection); const adapter = sourceAdapterByDriver.get(driver); const sourceDir = sourceDirForConnection(connection); if (adapter) { @@ -248,13 +232,22 @@ function targetForConnection( }; } - if (warehouseDrivers.has(driver)) { + if (isDatabaseDriver(driver)) { const options = resolveDatabaseTargetOptions({ connectionId, driver, connection, args, warnings }); + const gaps = options.databaseDepth === 'deep' ? deepReadinessGaps(projectConfig) : []; return { connectionId, driver, operation: 'database-ingest', debugCommand: `ktx ingest ${connectionId} --debug`, + detectRelationships: options.databaseDepth === 'deep' && projectConfig.scan.relationships.enabled, + ...(gaps.length > 0 + ? { + preflightFailure: `${connectionId} requires deep ingest readiness: ${gaps.join( + ', ', + )}. Run ktx setup or rerun with --fast.`, + } + : {}), ...options, }; } @@ -289,7 +282,9 @@ export function buildPublicIngestPlan( } const warnings: string[] = []; - const targets = selected.map(([connectionId, connection]) => targetForConnection(connectionId, connection, args, warnings)); + const targets = selected.map(([connectionId, connection]) => + targetForConnection(connectionId, connection, project.config, args, warnings), + ); return { projectDir: args.projectDir, targets: [ @@ -411,6 +406,22 @@ export async function executePublicIngestTarget( io: KtxCliIo, deps: KtxPublicIngestDeps, ): Promise { + if (target.preflightFailure) { + return { + connectionId: target.connectionId, + driver: target.driver, + steps: defaultSteps(target).map((step) => + step.operation === 'database-schema' + ? { + ...step, + status: 'failed', + detail: target.preflightFailure, + } + : step, + ), + }; + } + if (target.operation === 'database-ingest') { const { runKtxScan } = await import('./scan.js'); const scanArgs: KtxScanArgs = { @@ -418,7 +429,7 @@ export async function executePublicIngestTarget( projectDir: args.projectDir, connectionId: target.connectionId, mode: target.databaseDepth === 'deep' ? 'enriched' : 'structural', - detectRelationships: target.databaseDepth === 'deep' ? true : false, + detectRelationships: target.detectRelationships === true, dryRun: false, }; const runScan = deps.runScan ?? runKtxScan;