From 01e1fe55694639dd4e4d81a5d5dd9a1aa3b28e6b Mon Sep 17 00:00:00 2001 From: Luca Martial Date: Tue, 12 May 2026 18:25:14 -0700 Subject: [PATCH] feat(cli): prefetch primary scans during setup --- packages/cli/src/commands/setup-commands.ts | 19 + packages/cli/src/context-build-view.test.ts | 33 ++ packages/cli/src/context-build-view.ts | 40 +- packages/cli/src/setup-context.test.ts | 86 ++++- packages/cli/src/setup-context.ts | 98 +++-- .../cli/src/setup-primary-scan-prefetch.ts | 349 ++++++++++++++++++ packages/cli/src/setup.test.ts | 82 +++- packages/cli/src/setup.ts | 42 +++ 8 files changed, 693 insertions(+), 56 deletions(-) create mode 100644 packages/cli/src/setup-primary-scan-prefetch.ts diff --git a/packages/cli/src/commands/setup-commands.ts b/packages/cli/src/commands/setup-commands.ts index 90251ae1..7036292b 100644 --- a/packages/cli/src/commands/setup-commands.ts +++ b/packages/cli/src/commands/setup-commands.ts @@ -311,6 +311,14 @@ export function registerSetupCommands(program: Command, context: KtxCliCommandCo ) .option('--skip-initial-source-ingest', 'Validate source setup without building source context during setup', false) .option('--skip-sources', 'Mark optional source setup complete with no sources', false) + .addOption(new Option('--internal-primary-scan-prefetch', 'Run the internal setup primary scan prefetch worker').hideHelp().default(false)) + .addOption(new Option('--primary-scan-prefetch-run-id ', 'Internal setup primary scan prefetch run id').hideHelp()) + .addOption( + new Option('--primary-scan-prefetch-connection-id ', 'Internal setup primary scan target connection id') + .hideHelp() + .argParser((value, previous: string[]) => [...previous, value]) + .default([] as string[]), + ) .showHelpAfterError(); setup.hook('preAction', (_thisCommand, actionCommand) => { @@ -318,6 +326,17 @@ export function registerSetupCommands(program: Command, context: KtxCliCommandCo }); setup.action(async (options, command) => { + if (options.internalPrimaryScanPrefetch) { + await runSetupArgs(context, { + command: 'primary-scan-prefetch', + projectDir: resolveCommandProjectDir(command), + ...(options.primaryScanPrefetchRunId ? { runId: options.primaryScanPrefetchRunId } : {}), + ...(options.primaryScanPrefetchConnectionId.length > 0 + ? { connectionIds: options.primaryScanPrefetchConnectionId } + : {}), + }); + return; + } if (options.anthropicApiKeyEnv && options.anthropicApiKeyFile) { context.io.stderr.write( 'Choose only one Anthropic credential source: --anthropic-api-key-env or --anthropic-api-key-file.\n', diff --git a/packages/cli/src/context-build-view.test.ts b/packages/cli/src/context-build-view.test.ts index 647357a7..e81948af 100644 --- a/packages/cli/src/context-build-view.test.ts +++ b/packages/cli/src/context-build-view.test.ts @@ -474,6 +474,39 @@ describe('runContextBuild', () => { ); }); + it('skips targets already completed by an earlier primary scan prefetch', async () => { + const io = makeIo(); + const project = projectWithConnections({ + warehouse: { driver: 'postgres' }, + dbt_main: { driver: 'dbt' }, + }); + const executeTarget = vi.fn(async (target) => successResult(target.connectionId, target.driver, target.operation)); + + const result = await runContextBuild( + project, + { + projectDir: '/tmp/project', + inputMode: 'disabled', + completedSourceProgress: [ + { connectionId: 'warehouse', operation: 'scan', status: 'done', elapsedMs: 120000, summaryText: '42 tables' }, + ], + }, + io.io, + { executeTarget, now: () => 1000 }, + ); + + expect(result).toEqual({ exitCode: 0, detached: false }); + expect(executeTarget).toHaveBeenCalledTimes(1); + expect(executeTarget).toHaveBeenCalledWith( + expect.objectContaining({ connectionId: 'dbt_main', operation: 'source-ingest' }), + expect.anything(), + expect.anything(), + {}, + ); + expect(io.stdout()).toContain('warehouse'); + expect(io.stdout()).toContain('42 tables'); + }); + it('exits immediately with paused message when d is pressed', async () => { const mockExit = vi.spyOn(process, 'exit').mockImplementation(() => { throw new Error('process.exit'); diff --git a/packages/cli/src/context-build-view.ts b/packages/cli/src/context-build-view.ts index 7457f9b5..7c91260c 100644 --- a/packages/cli/src/context-build-view.ts +++ b/packages/cli/src/context-build-view.ts @@ -40,6 +40,9 @@ export interface ContextBuildArgs { inputMode: 'auto' | 'disabled'; scanMode?: 'structural' | 'enriched'; detectRelationships?: boolean; + targetOperations?: Array<'scan' | 'source-ingest'>; + targetConnectionIds?: string[]; + completedSourceProgress?: ContextBuildSourceProgressUpdate[]; } export interface ContextBuildResult { @@ -523,10 +526,33 @@ function failureTextForTarget(input: { return input.fallback ?? `${input.target.connectionId} failed.`; } -export function initViewState(targets: KtxPublicIngestPlanTarget[]): ContextBuildViewState { +function progressKey(input: Pick): string { + return `${input.operation}:${input.connectionId}`; +} + +export function initViewState( + targets: KtxPublicIngestPlanTarget[], + completedSourceProgress: ContextBuildSourceProgressUpdate[] = [], +): ContextBuildViewState { + const completedByKey = new Map( + completedSourceProgress.filter((source) => source.status === 'done').map((source) => [progressKey(source), source]), + ); + const makeTargetWithProgress = (target: KtxPublicIngestPlanTarget): ContextBuildTargetState => { + const completed = completedByKey.get(progressKey(target)); + const state = makeTargetState(target); + if (!completed) { + return state; + } + return { + ...state, + status: 'done', + elapsedMs: completed.elapsedMs ?? 0, + summaryText: completed.summaryText ?? null, + }; + }; return { - primarySources: targets.filter((t) => t.operation === 'scan').map(makeTargetState), - contextSources: targets.filter((t) => t.operation === 'source-ingest').map(makeTargetState), + primarySources: targets.filter((t) => t.operation === 'scan').map(makeTargetWithProgress), + contextSources: targets.filter((t) => t.operation === 'source-ingest').map(makeTargetWithProgress), frame: 0, startedAt: null, totalElapsedMs: 0, @@ -540,7 +566,12 @@ export async function runContextBuild( deps: ContextBuildDeps = {}, ): Promise { const plan = buildPublicIngestPlan(project, { projectDir: args.projectDir, all: true }); - const state = initViewState(plan.targets); + const targetOperations = new Set(args.targetOperations ?? ['scan', 'source-ingest']); + const targetConnectionIds = args.targetConnectionIds ? new Set(args.targetConnectionIds) : null; + const targets = plan.targets.filter( + (target) => targetOperations.has(target.operation) && (!targetConnectionIds || targetConnectionIds.has(target.connectionId)), + ); + const state = initViewState(targets, args.completedSourceProgress); const isTTY = io.stdout.isTTY === true; const nowFn = deps.now ?? (() => Date.now()); @@ -618,6 +649,7 @@ export async function runContextBuild( try { for (const targetState of orderedTargets) { if (detached) break; + if (targetState.status === 'done') continue; targetState.status = 'running'; targetState.startedAt = nowFn(); diff --git a/packages/cli/src/setup-context.test.ts b/packages/cli/src/setup-context.test.ts index 7012edb6..afab99db 100644 --- a/packages/cli/src/setup-context.test.ts +++ b/packages/cli/src/setup-context.test.ts @@ -3,6 +3,7 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { readKtxSetupState } from '@ktx/context/project'; import { contextBuildCommands, readKtxSetupContextState, @@ -203,7 +204,7 @@ describe('setup context build state', () => { expect.objectContaining({ onDetach: expect.any(Function) }), ); expect(verifyContextReady).toHaveBeenCalledWith(tempDir); - expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).toContain(' - context'); + expect(await readKtxSetupState(tempDir)).toMatchObject({ completed_steps: expect.arrayContaining(['context']) }); await expect(readKtxSetupContextState(tempDir)).resolves.toMatchObject({ runId: 'setup-context-local-abc123', status: 'completed', @@ -284,7 +285,7 @@ describe('setup context build state', () => { ).resolves.toEqual({ status: 'ready', projectDir: tempDir, runId: 'setup-context-local-existing' }); expect(runContextBuildMock).not.toHaveBeenCalled(); - expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).toContain(' - context'); + expect(await readKtxSetupState(tempDir)).toMatchObject({ completed_steps: expect.arrayContaining(['context']) }); await expect(readKtxSetupContextState(tempDir)).resolves.toMatchObject({ runId: 'setup-context-local-existing', status: 'completed', @@ -486,6 +487,87 @@ describe('setup context build state', () => { expect(io.stdout()).toContain('KTX context built: yes'); }); + it('continues the full context build after auto-watching a completed primary scan prefetch', async () => { + await writeReadyProject(tempDir); + await writeKtxSetupContextState(tempDir, { + runId: 'setup-context-local-prefetch', + status: 'detached', + startedAt: '2026-05-09T10:00:00.000Z', + updatedAt: '2026-05-09T10:00:00.000Z', + primarySourceConnectionIds: ['warehouse'], + contextSourceConnectionIds: [], + reportIds: [], + artifactPaths: [], + retryableFailedTargets: [], + commands: contextBuildCommands(tempDir, 'setup-context-local-prefetch'), + sourceProgress: [ + { connectionId: 'warehouse', operation: 'scan' as const, status: 'running' as const, startedAtMs: Date.now() }, + ], + }); + const io = makeIo(); + const completePrefetch = async () => { + await writeKtxSetupContextState(tempDir, { + runId: 'setup-context-local-prefetch', + status: 'paused', + startedAt: '2026-05-09T10:00:00.000Z', + updatedAt: '2026-05-09T10:02:00.000Z', + primarySourceConnectionIds: ['warehouse'], + contextSourceConnectionIds: [], + reportIds: ['warehouse-report'], + artifactPaths: ['raw-sources/warehouse/live-database/sync-1/scan-report.json'], + retryableFailedTargets: [], + commands: contextBuildCommands(tempDir, 'setup-context-local-prefetch'), + sourceProgress: [ + { connectionId: 'warehouse', operation: 'scan' as const, status: 'done' as const, elapsedMs: 120000 }, + ], + }); + }; + const runContextBuildMock = vi.fn(async () => ({ + exitCode: 0, + detached: false, + reportIds: ['docs-report'], + artifactPaths: ['raw-sources/docs/notion/sync-1/ingest-report.json'], + })); + const verifyContextReady = vi.fn(async () => ({ + ready: true, + agentContextReady: true, + semanticSearchReady: true, + details: ['warehouse: enriched scan complete', 'docs: memory update complete'], + })); + const select = vi.fn(async () => { + throw new Error('should not prompt while auto-watching a prefetch'); + }); + + await expect( + runKtxSetupContextStep( + { projectDir: tempDir, inputMode: 'auto', autoWatch: true }, + io.io, + { + prompts: { select, cancel: vi.fn() }, + sleep: completePrefetch, + watchIntervalMs: 1, + runIdFactory: () => 'setup-context-local-final', + now: () => new Date('2026-05-09T10:03:00.000Z'), + runContextBuild: runContextBuildMock, + verifyContextReady, + }, + ), + ).resolves.toEqual({ status: 'ready', projectDir: tempDir, runId: 'setup-context-local-final' }); + + expect(select).not.toHaveBeenCalled(); + expect(runContextBuildMock).toHaveBeenCalledWith( + expect.objectContaining({ projectDir: tempDir }), + expect.objectContaining({ + projectDir: tempDir, + completedSourceProgress: [ + { connectionId: 'warehouse', operation: 'scan', status: 'done', elapsedMs: 120000 }, + ], + }), + io.io, + expect.anything(), + ); + }); + it('renders the progress view when watching a build with sourceProgress', async () => { await writeReadyProject(tempDir); await writeKtxSetupContextState(tempDir, { diff --git a/packages/cli/src/setup-context.ts b/packages/cli/src/setup-context.ts index efcd35f1..ba2b9c3e 100644 --- a/packages/cli/src/setup-context.ts +++ b/packages/cli/src/setup-context.ts @@ -551,10 +551,12 @@ async function runBuild( deps: KtxSetupContextDeps, project: KtxLocalProject, targets: KtxSetupContextTargets, + existingState?: KtxSetupContextState, ): Promise { const now = deps.now ?? (() => new Date()); const runId = deps.runIdFactory?.() ?? runIdFactory(); const startedAt = now().toISOString(); + const completedSourceProgress = existingState?.sourceProgress?.filter((source) => source.status === 'done') ?? []; const runningState: KtxSetupContextState = { runId, status: 'running', @@ -566,10 +568,12 @@ async function runBuild( artifactPaths: [], retryableFailedTargets: [], commands: contextBuildCommands(args.projectDir, runId), + ...(completedSourceProgress.length > 0 ? { sourceProgress: completedSourceProgress } : {}), }; await writeKtxSetupContextState(args.projectDir, runningState); - let lastSourceProgress: ContextBuildSourceProgressUpdate[] | undefined; + let lastSourceProgress: ContextBuildSourceProgressUpdate[] | undefined = + completedSourceProgress.length > 0 ? completedSourceProgress : undefined; const contextBuild = deps.runContextBuild ?? runContextBuild; const buildResult = await contextBuild( project, @@ -578,6 +582,7 @@ async function runBuild( inputMode: args.inputMode, scanMode: 'enriched', detectRelationships: true, + ...(completedSourceProgress.length > 0 ? { completedSourceProgress } : {}), }, io, { @@ -609,8 +614,10 @@ async function runBuild( }, }, ); - const completedReportIds = buildResult.reportIds ?? []; - const completedArtifactPaths = buildResult.artifactPaths ?? []; + const completedReportIds = [...new Set([...(existingState?.reportIds ?? []), ...(buildResult.reportIds ?? [])])]; + const completedArtifactPaths = [ + ...new Set([...(existingState?.artifactPaths ?? []), ...(buildResult.artifactPaths ?? [])]), + ]; if (buildResult.detached) { const updatedAt = now().toISOString(); await writeKtxSetupContextState(args.projectDir, { @@ -713,7 +720,7 @@ export async function runKtxSetupContextStep( ): Promise { try { const project = await loadKtxProject({ projectDir: args.projectDir }); - const existingState = await readKtxSetupContextState(args.projectDir); + let existingState = await readKtxSetupContextState(args.projectDir); const completedSteps = ktxSetupCompletedSteps(project.config, await readKtxSetupState(args.projectDir)); if (completedSteps.includes('context') && existingState.status === 'completed') { return { status: 'ready', projectDir: args.projectDir, runId: existingState.runId ?? 'setup-context-completed' }; @@ -734,41 +741,52 @@ export async function runKtxSetupContextStep( io, deps, ); - return setupResultFromWatchedState(args.projectDir, watched.state); + if (watched.state.status !== 'paused') { + return setupResultFromWatchedState(args.projectDir, watched.state); + } + existingState = watched.state; } - const prompts = deps.prompts ?? createPromptAdapter(); - const choice = await prompts.select({ - message: - 'A context build is running in the background.\n\n' + - 'You can watch it until it finishes, check its status once, or start a fresh build.', - options: [ - { value: 'watch', label: 'Watch progress' }, - { value: 'status', label: 'Check status' }, - { value: 'rebuild', label: 'Start a fresh context build' }, - { value: 'back', label: 'Back' }, - ], - }); - if (choice === 'watch') { - const watched = await watchContextStatus( - { - projectDir: args.projectDir, - ...(existingState.runId ? { runId: existingState.runId } : {}), - inputMode: args.inputMode, - }, - existingState, - io, - deps, - ); - return setupResultFromWatchedState(args.projectDir, watched.state); - } - if (choice === 'status') { - const commands = contextBuildCommands(args.projectDir, existingState.runId); - io.stdout.write(`\nRun: ${commands.status}\n`); - io.stdout.write(`Log: ${join(resolve(args.projectDir), '.ktx', 'setup', 'context-build.log')}\n`); - return { status: 'detached', projectDir: args.projectDir, runId: existingState.runId ?? '' }; - } - if (choice === 'back') { - return { status: 'back', projectDir: args.projectDir }; + if (existingState.status === 'running' || existingState.status === 'detached') { + const prompts = deps.prompts ?? createPromptAdapter(); + const choice = await prompts.select({ + message: + 'A context build is running in the background.\n\n' + + 'You can watch it until it finishes, check its status once, or start a fresh build.', + options: [ + { value: 'watch', label: 'Watch progress' }, + { value: 'status', label: 'Check status' }, + { value: 'rebuild', label: 'Start a fresh context build' }, + { value: 'back', label: 'Back' }, + ], + }); + if (choice === 'watch') { + const watched = await watchContextStatus( + { + projectDir: args.projectDir, + ...(existingState.runId ? { runId: existingState.runId } : {}), + inputMode: args.inputMode, + }, + existingState, + io, + deps, + ); + if (watched.state.status !== 'paused') { + return setupResultFromWatchedState(args.projectDir, watched.state); + } + existingState = watched.state; + } + if (choice === 'status') { + const commands = contextBuildCommands(args.projectDir, existingState.runId); + io.stdout.write(`\nRun: ${commands.status}\n`); + io.stdout.write(`Log: ${join(resolve(args.projectDir), '.ktx', 'setup', 'context-build.log')}\n`); + return { status: 'detached', projectDir: args.projectDir, runId: existingState.runId ?? '' }; + } + if (choice === 'back') { + return { status: 'back', projectDir: args.projectDir }; + } + if (choice === 'rebuild') { + existingState = notStartedState(args.projectDir); + } } } @@ -797,7 +815,7 @@ export async function runKtxSetupContextStep( } } - if (args.inputMode !== 'disabled' && args.prompt !== false) { + if (args.inputMode !== 'disabled' && args.prompt !== false && existingState.status !== 'paused') { const choice = await promptForBuild(deps.prompts ?? createPromptAdapter()); if (choice === 'back') { return { status: 'back', projectDir: args.projectDir }; @@ -808,7 +826,7 @@ export async function runKtxSetupContextStep( } } - return await runBuild(args, io, deps, project, targets); + return await runBuild(args, io, deps, project, targets, existingState); } catch (error) { io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); return { status: 'failed', projectDir: args.projectDir }; diff --git a/packages/cli/src/setup-primary-scan-prefetch.ts b/packages/cli/src/setup-primary-scan-prefetch.ts new file mode 100644 index 00000000..f4ecdeac --- /dev/null +++ b/packages/cli/src/setup-primary-scan-prefetch.ts @@ -0,0 +1,349 @@ +import { spawn } from 'node:child_process'; +import { mkdirSync, openSync } from 'node:fs'; +import { join, resolve } from 'node:path'; +import { cancel, isCancel, select } from '@clack/prompts'; +import { loadKtxProject } from '@ktx/context/project'; +import type { KtxCliIo } from './cli-runtime.js'; +import { + contextBuildCommands, + readKtxSetupContextState, + writeKtxSetupContextState, + type KtxSetupContextState, +} from './setup-context.js'; +import { buildPublicIngestPlan } from './public-ingest.js'; +import { + type ContextBuildSourceProgressUpdate, + type ContextBuildResult, + runContextBuild, +} from './context-build-view.js'; +import { withMenuOptionsSpacing } from './prompt-navigation.js'; +import { withSetupInterruptConfirmation } from './setup-interrupt.js'; + +export interface KtxPrimaryScanPrefetchArgs { + projectDir: string; + inputMode: 'auto' | 'disabled'; + yes: boolean; + connectionIds?: string[]; +} + +export interface KtxPrimaryScanPrefetchWorkerArgs { + projectDir: string; + runId?: string; + connectionIds?: string[]; +} + +export type KtxPrimaryScanPrefetchResult = + | { status: 'started'; projectDir: string; runId: string; logPath?: string } + | { status: 'running'; projectDir: string; runId?: string } + | { status: 'skipped'; projectDir: string; reason: string } + | { status: 'failed'; projectDir: string; reason: string }; + +export interface KtxPrimaryScanPrefetchPromptAdapter { + select(options: { message: string; options: Array<{ value: string; label: string }> }): Promise; + cancel(message: string): void; +} + +export interface KtxPrimaryScanPrefetchDeps { + prompts?: KtxPrimaryScanPrefetchPromptAdapter; + runIdFactory?: () => string; + now?: () => Date; + spawnPrefetch?: (args: { projectDir: string; runId: string; connectionIds: string[] }) => { logPath?: string } | null; + runContextBuild?: typeof runContextBuild; +} + +const ACTIVE_CONTEXT_STATUSES = new Set(['running', 'detached']); + +function createPromptAdapter(): KtxPrimaryScanPrefetchPromptAdapter { + return { + async select(options) { + const value = await withSetupInterruptConfirmation(() => select(withMenuOptionsSpacing(options))); + if (isCancel(value)) { + cancel('Setup cancelled.'); + return 'wait'; + } + return String(value); + }, + cancel(message) { + cancel(message); + }, + }; +} + +function runIdFactory(): string { + return `setup-context-prefetch-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`; +} + +function resolveKtxEntryScript(): string | null { + const argv1 = process.argv[1]; + if (argv1 && (argv1.endsWith('.js') || argv1.endsWith('.ts') || argv1.endsWith('.mjs'))) { + return argv1; + } + return null; +} + +function spawnPrimaryScanPrefetch(input: { + projectDir: string; + runId: string; + connectionIds: string[]; +}): { logPath: string } | null { + const entryScript = resolveKtxEntryScript(); + if (!entryScript) return null; + + const resolvedDir = resolve(input.projectDir); + const logDir = join(resolvedDir, '.ktx', 'setup'); + mkdirSync(logDir, { recursive: true }); + const logPath = join(logDir, 'context-build.log'); + const logFd = openSync(logPath, 'a'); + const connectionArgs = input.connectionIds.flatMap((connectionId) => [ + '--primary-scan-prefetch-connection-id', + connectionId, + ]); + + const child = spawn( + process.execPath, + [ + entryScript, + 'setup', + '--project-dir', + resolvedDir, + '--no-input', + '--internal-primary-scan-prefetch', + '--primary-scan-prefetch-run-id', + input.runId, + ...connectionArgs, + ], + { detached: true, stdio: ['ignore', logFd, logFd] }, + ); + child.unref(); + return { logPath }; +} + +function missingPrimaryScanCapabilities(config: Awaited>['config']): string[] { + const missing: string[] = []; + if (config.llm.provider.backend === 'none' || !config.llm.models.default) { + missing.push('models'); + } + const embeddings = config.ingest.embeddings; + if ( + embeddings.backend === 'none' || + embeddings.backend === 'deterministic' || + !embeddings.model || + embeddings.dimensions <= 0 + ) { + missing.push('embeddings'); + } + if (config.scan.enrichment.mode === 'none') { + missing.push('scan enrichment'); + } + return missing; +} + +function primaryScanConnectionIds( + project: Awaited>, + preferredConnectionIds: string[] | undefined, +): string[] { + const preferred = preferredConnectionIds && preferredConnectionIds.length > 0 ? new Set(preferredConnectionIds) : null; + try { + const plan = buildPublicIngestPlan(project, { projectDir: project.projectDir, all: true }); + return plan.targets + .filter((target) => target.operation === 'scan') + .filter((target) => !preferred || preferred.has(target.connectionId)) + .map((target) => target.connectionId); + } catch { + return []; + } +} + +function queuedProgress(connectionIds: string[]): ContextBuildSourceProgressUpdate[] { + return connectionIds.map((connectionId) => ({ connectionId, operation: 'scan', status: 'queued' })); +} + +function stateForPrefetch(input: { + projectDir: string; + runId: string; + status: KtxSetupContextState['status']; + now: Date; + primarySourceConnectionIds: string[]; + sourceProgress?: ContextBuildSourceProgressUpdate[]; + reportIds?: string[]; + artifactPaths?: string[]; + failureReason?: string; +}): KtxSetupContextState { + const timestamp = input.now.toISOString(); + return { + runId: input.runId, + status: input.status, + startedAt: timestamp, + updatedAt: timestamp, + primarySourceConnectionIds: input.primarySourceConnectionIds, + contextSourceConnectionIds: [], + reportIds: input.reportIds ?? [], + artifactPaths: input.artifactPaths ?? [], + retryableFailedTargets: input.status === 'failed' ? input.primarySourceConnectionIds : [], + commands: contextBuildCommands(input.projectDir, input.runId), + ...(input.failureReason ? { failureReason: input.failureReason } : {}), + ...(input.sourceProgress ? { sourceProgress: input.sourceProgress } : {}), + }; +} + +async function chooseStartPrefetch( + args: KtxPrimaryScanPrefetchArgs, + io: KtxCliIo, + deps: KtxPrimaryScanPrefetchDeps, +): Promise { + if (args.yes) { + return true; + } + if (args.inputMode === 'disabled') { + return false; + } + if (io.stdout.isTTY !== true && !deps.prompts) { + return false; + } + const prompts = deps.prompts ?? createPromptAdapter(); + const choice = await prompts.select({ + message: + 'Prepare primary source context while you finish setup?\n\n' + + 'KTX can start the enriched primary-source scan now, then finish context sources later.', + options: [ + { value: 'start', label: 'Start in background (recommended)' }, + { value: 'wait', label: 'Wait until Build Context' }, + ], + }); + return choice === 'start'; +} + +export async function startPrimaryScanPrefetch( + args: KtxPrimaryScanPrefetchArgs, + io: KtxCliIo, + deps: KtxPrimaryScanPrefetchDeps = {}, +): Promise { + const existingState = await readKtxSetupContextState(args.projectDir); + if (ACTIVE_CONTEXT_STATUSES.has(existingState.status)) { + return { status: 'running', projectDir: args.projectDir, runId: existingState.runId }; + } + if (existingState.status === 'completed') { + return { status: 'skipped', projectDir: args.projectDir, reason: 'context already built' }; + } + + const project = await loadKtxProject({ projectDir: args.projectDir }); + const missing = missingPrimaryScanCapabilities(project.config); + if (missing.length > 0) { + return { status: 'skipped', projectDir: args.projectDir, reason: `missing ${missing.join(', ')}` }; + } + + const connectionIds = primaryScanConnectionIds(project, args.connectionIds); + if (connectionIds.length === 0) { + return { status: 'skipped', projectDir: args.projectDir, reason: 'no primary sources' }; + } + if (!(await chooseStartPrefetch(args, io, deps))) { + return { status: 'skipped', projectDir: args.projectDir, reason: 'user deferred' }; + } + + const runId = deps.runIdFactory?.() ?? runIdFactory(); + const now = deps.now?.() ?? new Date(); + const initialState = stateForPrefetch({ + projectDir: args.projectDir, + runId, + status: 'detached', + now, + primarySourceConnectionIds: connectionIds, + sourceProgress: queuedProgress(connectionIds), + }); + const spawned = (deps.spawnPrefetch ?? spawnPrimaryScanPrefetch)({ + projectDir: args.projectDir, + runId, + connectionIds, + }); + if (!spawned) { + return { status: 'skipped', projectDir: args.projectDir, reason: 'background runner unavailable' }; + } + await writeKtxSetupContextState(args.projectDir, initialState); + io.stdout.write(`│ Primary source context scan started in the background (${connectionIds.join(', ')}).\n`); + io.stdout.write(`│ Resume: ${contextBuildCommands(args.projectDir, runId).watch}\n`); + return { + status: 'started', + projectDir: args.projectDir, + runId, + ...(spawned.logPath ? { logPath: spawned.logPath } : {}), + }; +} + +export async function runPrimaryScanPrefetchWorker( + args: KtxPrimaryScanPrefetchWorkerArgs, + io: KtxCliIo, + deps: KtxPrimaryScanPrefetchDeps = {}, +): Promise { + const project = await loadKtxProject({ projectDir: args.projectDir }); + const connectionIds = primaryScanConnectionIds( + project, + args.connectionIds ?? project.config.setup?.database_connection_ids, + ); + if (connectionIds.length === 0) { + return 0; + } + + const runId = args.runId ?? deps.runIdFactory?.() ?? runIdFactory(); + const now = deps.now ?? (() => new Date()); + const startedAt = now(); + const runningState = stateForPrefetch({ + projectDir: args.projectDir, + runId, + status: 'running', + now: startedAt, + primarySourceConnectionIds: connectionIds, + sourceProgress: queuedProgress(connectionIds), + }); + await writeKtxSetupContextState(args.projectDir, runningState); + + let lastSourceProgress: ContextBuildSourceProgressUpdate[] | undefined = runningState.sourceProgress; + const contextBuild = deps.runContextBuild ?? runContextBuild; + let result: ContextBuildResult; + try { + result = await contextBuild( + project, + { + projectDir: args.projectDir, + inputMode: 'disabled', + scanMode: 'enriched', + detectRelationships: true, + targetOperations: ['scan'], + targetConnectionIds: connectionIds, + }, + io, + { + onSourceProgress: (sources) => { + lastSourceProgress = sources; + void writeKtxSetupContextState(args.projectDir, { + ...runningState, + updatedAt: now().toISOString(), + sourceProgress: sources, + }); + }, + }, + ); + } catch (error) { + await writeKtxSetupContextState(args.projectDir, { + ...runningState, + status: 'failed', + updatedAt: now().toISOString(), + retryableFailedTargets: connectionIds, + failureReason: error instanceof Error ? error.message : String(error), + ...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}), + }); + return 1; + } + + const completedAt = now().toISOString(); + await writeKtxSetupContextState(args.projectDir, { + ...runningState, + status: result.exitCode === 0 ? 'paused' : 'failed', + updatedAt: completedAt, + reportIds: result.reportIds ?? [], + artifactPaths: result.artifactPaths ?? [], + retryableFailedTargets: result.exitCode === 0 ? [] : connectionIds, + ...(result.exitCode === 0 ? {} : { failureReason: 'Primary source context scan failed.' }), + ...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}), + }); + return result.exitCode; +} diff --git a/packages/cli/src/setup.test.ts b/packages/cli/src/setup.test.ts index e74dca5d..9d121b48 100644 --- a/packages/cli/src/setup.test.ts +++ b/packages/cli/src/setup.test.ts @@ -584,13 +584,13 @@ describe('setup status', () => { expect(projectPrompts.select).toHaveBeenCalledWith( expect.objectContaining({ - message: 'Which KTX project should setup use?', + message: 'Where should KTX create the project?', options: expect.arrayContaining([expect.objectContaining({ value: 'back', label: 'Back' })]), }), ); expect(projectPrompts.select).toHaveBeenCalledWith( expect.objectContaining({ - message: 'Which KTX project should setup use?', + message: 'Where should KTX create the project?', options: expect.not.arrayContaining([expect.objectContaining({ value: 'exit', label: 'Exit' })]), }), ); @@ -920,7 +920,7 @@ describe('setup status', () => { inputMode: 'disabled', yes: false, cliVersion: '0.2.0', - anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', + anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret anthropicModel: 'claude-sonnet-4-6', skipLlm: false, skipEmbeddings: true, @@ -937,7 +937,7 @@ describe('setup status', () => { expect.objectContaining({ projectDir: tempDir, inputMode: 'disabled', - anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', + anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret anthropicModel: 'claude-sonnet-4-6', skipLlm: false, }), @@ -961,11 +961,11 @@ describe('setup status', () => { inputMode: 'disabled', yes: true, cliVersion: '0.2.0', - anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', + anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret anthropicModel: 'claude-sonnet-4-6', skipLlm: false, embeddingBackend: 'openai', - embeddingApiKeyEnv: 'OPENAI_API_KEY', + embeddingApiKeyEnv: 'OPENAI_API_KEY', // pragma: allowlist secret skipEmbeddings: false, databaseSchemas: [], skipDatabases: true, @@ -983,7 +983,7 @@ describe('setup status', () => { cliVersion: '0.2.0', runtimeInstallPolicy: 'auto', embeddingBackend: 'openai', - embeddingApiKeyEnv: 'OPENAI_API_KEY', + embeddingApiKeyEnv: 'OPENAI_API_KEY', // pragma: allowlist secret skipEmbeddings: false, }), testIo.io, @@ -1181,11 +1181,11 @@ describe('setup status', () => { inputMode: 'disabled', yes: false, cliVersion: '0.2.0', - anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', + anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret anthropicModel: 'claude-sonnet-4-6', skipLlm: false, embeddingBackend: 'openai', - embeddingApiKeyEnv: 'OPENAI_API_KEY', + embeddingApiKeyEnv: 'OPENAI_API_KEY', // pragma: allowlist secret skipEmbeddings: false, databaseDrivers: ['postgres'], databaseConnectionId: 'warehouse', @@ -1261,6 +1261,68 @@ describe('setup status', () => { expect(calls).toEqual(['model', 'embeddings', 'databases', 'sources']); }); + it('starts primary source context prefetch after database setup before context source setup', async () => { + const calls: string[] = []; + const io = makeIo(); + await writeFile(join(tempDir, 'ktx.yaml'), ['project: revenue', 'connections: {}', ''].join('\n'), 'utf-8'); + + const primaryScanPrefetch = vi.fn(async () => { + calls.push('prefetch'); + return { status: 'started' as const, projectDir: tempDir, runId: 'setup-context-prefetch-test' }; + }); + + await expect( + runKtxSetup( + { + command: 'run', + projectDir: tempDir, + mode: 'existing', + agents: false, + inputMode: 'auto', + yes: true, + cliVersion: '0.2.0', + skipLlm: true, + skipEmbeddings: true, + skipDatabases: false, + skipSources: true, + skipAgents: true, + databaseSchemas: [], + }, + io.io, + { + model: async () => { + calls.push('model'); + return { status: 'skipped', projectDir: tempDir }; + }, + embeddings: async () => { + calls.push('embeddings'); + return { status: 'skipped', projectDir: tempDir }; + }, + databases: async () => { + calls.push('databases'); + return { status: 'ready', projectDir: tempDir, connectionIds: ['warehouse'] }; + }, + primaryScanPrefetch, + sources: async () => { + calls.push('sources'); + return { status: 'skipped', projectDir: tempDir }; + }, + }, + ), + ).resolves.toBe(0); + + expect(primaryScanPrefetch).toHaveBeenCalledWith( + expect.objectContaining({ + projectDir: tempDir, + inputMode: 'auto', + yes: true, + connectionIds: ['warehouse'], + }), + io.io, + ); + expect(calls).toEqual(['model', 'embeddings', 'databases', 'prefetch', 'sources']); + }); + it.each([ { backend: 'vertex', @@ -2041,7 +2103,7 @@ describe('setup status', () => { inputMode: 'disabled', yes: false, cliVersion: '0.2.0', - anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', + anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret anthropicModel: 'claude-sonnet-4-6', skipLlm: false, skipEmbeddings: false, diff --git a/packages/cli/src/setup.ts b/packages/cli/src/setup.ts index dec0f4d7..83bf6f4d 100644 --- a/packages/cli/src/setup.ts +++ b/packages/cli/src/setup.ts @@ -26,6 +26,13 @@ import { } from './setup-databases.js'; import { type KtxSetupEmbeddingsDeps, runKtxSetupEmbeddingsStep } from './setup-embeddings.js'; import { type KtxSetupModelDeps, isKtxSetupLlmConfigReady, runKtxSetupAnthropicModelStep } from './setup-models.js'; +import { + type KtxPrimaryScanPrefetchArgs, + type KtxPrimaryScanPrefetchResult, + type KtxPrimaryScanPrefetchDeps, + runPrimaryScanPrefetchWorker, + startPrimaryScanPrefetch, +} from './setup-primary-scan-prefetch.js'; import { type KtxSetupProjectDeps, runKtxSetupProjectStep } from './setup-project.js'; import { isKtxPreAgentSetupReady, @@ -55,6 +62,12 @@ export interface KtxSetupStatus { } export type KtxSetupArgs = + | { + command: 'primary-scan-prefetch'; + projectDir: string; + runId?: string; + connectionIds?: string[]; + } | { command: 'run'; projectDir: string; @@ -137,6 +150,8 @@ export interface KtxSetupDeps { io: KtxCliIo, ) => Promise>>; agentsDeps?: KtxSetupAgentsDeps; + primaryScanPrefetch?: (args: KtxPrimaryScanPrefetchArgs, io: KtxCliIo) => Promise; + primaryScanPrefetchDeps?: KtxPrimaryScanPrefetchDeps; context?: (args: Parameters[0], io: KtxCliIo) => Promise; contextDeps?: KtxSetupContextDeps; readyMenuDeps?: KtxSetupReadyMenuDeps; @@ -442,6 +457,10 @@ export async function runKtxSetup(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSet } async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetupDeps = {}): Promise { + if (args.command === 'primary-scan-prefetch') { + return await runPrimaryScanPrefetchWorker(args, io, deps.primaryScanPrefetchDeps); + } + io.stdout.write('KTX setup\n'); let entryAction: KtxSetupEntryAction | undefined; let projectResult: Awaited>; @@ -549,6 +568,7 @@ async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetup } const forcePromptSteps = new Set(); + let autoWatchPrefetchAtContext = false; const isNavigableSetupStep = (step: KtxSetupFlowStep): boolean => { if (step === 'models') return !args.skipLlm && shouldRunModels; if (step === 'embeddings') return !args.skipEmbeddings && shouldRunEmbeddings; @@ -677,9 +697,11 @@ async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetup inputMode: args.inputMode, forcePrompt: forcePromptSteps.has('context') || runOnly === 'context', allowEmpty: true, + ...(autoWatchPrefetchAtContext ? { autoWatch: true } : {}), }, io, ); + autoWatchPrefetchAtContext = false; } else { const agentsRunner = deps.agents ?? ((agentArgs, agentIo) => runKtxSetupAgentsStep(agentArgs, agentIo, deps.agentsDeps)); @@ -725,6 +747,26 @@ async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetup return 0; } } + if (step === 'databases' && stepResult.status === 'ready' && shouldRunContext) { + const databaseResult = stepResult as Awaited>; + const connectionIds = 'connectionIds' in databaseResult ? databaseResult.connectionIds : []; + const primaryScanPrefetch = + deps.primaryScanPrefetch ?? + ((prefetchArgs, prefetchIo) => + startPrimaryScanPrefetch(prefetchArgs, prefetchIo, deps.primaryScanPrefetchDeps)); + const prefetchResult = await primaryScanPrefetch( + { + projectDir: projectResult.projectDir, + inputMode: args.inputMode, + yes: args.yes, + connectionIds, + }, + io, + ); + if (prefetchResult.status === 'started' || prefetchResult.status === 'running') { + autoWatchPrefetchAtContext = true; + } + } forcePromptSteps.delete(step); stepIndex += 1;