diff --git a/packages/cli/src/context-build-view.test.ts b/packages/cli/src/context-build-view.test.ts index e81948af..b126f27c 100644 --- a/packages/cli/src/context-build-view.test.ts +++ b/packages/cli/src/context-build-view.test.ts @@ -292,8 +292,8 @@ describe('renderContextBuildView', () => { const output = renderContextBuildView(state, { styled: false, showHint: true, projectDir: '/tmp/project' }); expect(output).toContain('d to detach'); - expect(output).toContain('ktx setup --project-dir /tmp/project'); - expect(output).toContain('to resume'); + expect(output).not.toContain('ktx setup --project-dir /tmp/project'); + expect(output).not.toContain('to resume'); }); it('omits detach hint when all targets are done', () => { diff --git a/packages/cli/src/context-build-view.ts b/packages/cli/src/context-build-view.ts index 7c91260c..f8014754 100644 --- a/packages/cli/src/context-build-view.ts +++ b/packages/cli/src/context-build-view.ts @@ -220,7 +220,7 @@ export function renderContextBuildView( } if (options.showHint && hasActive) { - const hintContent = options.hintText ?? `d to detach · ${resumeCommand(options.projectDir)} to resume`; + const hintContent = options.hintText ?? 'd to detach'; const hint = ` ${hintContent}`; lines.push(styled ? dim(hint) : hint); lines.push(''); diff --git a/packages/cli/src/setup-context.test.ts b/packages/cli/src/setup-context.test.ts index afab99db..cfb840ec 100644 --- a/packages/cli/src/setup-context.test.ts +++ b/packages/cli/src/setup-context.test.ts @@ -568,6 +568,48 @@ describe('setup context build state', () => { ); }); + it('shows newly configured context sources while watching an active primary scan prefetch', async () => { + await writeReadyProject(tempDir); + await writeKtxSetupContextState(tempDir, { + runId: 'setup-context-local-prefetch-watch', + status: 'detached', + startedAt: '2026-05-09T10:00:00.000Z', + updatedAt: '2026-05-09T10:00:00.000Z', + primarySourceConnectionIds: ['warehouse'], + contextSourceConnectionIds: [], + reportIds: [], + artifactPaths: [], + retryableFailedTargets: [], + commands: contextBuildCommands(tempDir, 'setup-context-local-prefetch-watch'), + sourceProgress: [ + { connectionId: 'warehouse', operation: 'scan' as const, status: 'running' as const, startedAtMs: Date.now() }, + ], + }); + const io = makeIo(); + let triggerDetach: (() => void) | null = null; + + await expect( + runKtxSetupContextStep( + { projectDir: tempDir, inputMode: 'auto', autoWatch: true }, + io.io, + { + sleep: async () => { triggerDetach?.(); }, + watchIntervalMs: 1, + setupKeystroke: (onDetach) => { + triggerDetach = onDetach; + return () => {}; + }, + }, + ), + ).resolves.toMatchObject({ status: 'detached' }); + + const output = io.stdout(); + expect(output).toContain('Primary sources:'); + expect(output).toContain('warehouse'); + expect(output).toContain('Context sources:'); + expect(output).toContain('docs'); + }); + it('renders the progress view when watching a build with sourceProgress', async () => { await writeReadyProject(tempDir); await writeKtxSetupContextState(tempDir, { diff --git a/packages/cli/src/setup-context.ts b/packages/cli/src/setup-context.ts index ba2b9c3e..41a4d85b 100644 --- a/packages/cli/src/setup-context.ts +++ b/packages/cli/src/setup-context.ts @@ -1,4 +1,4 @@ -import { mkdirSync, writeFileSync } from 'node:fs'; +import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs'; import { access, mkdir, readdir, readFile, writeFile } from 'node:fs/promises'; import { join, resolve } from 'node:path'; import { cancel, isCancel, select } from '@clack/prompts'; @@ -272,6 +272,25 @@ export async function writeKtxSetupContextState(projectDir: string, state: KtxSe await writeFile(statePath(resolvedProjectDir), `${JSON.stringify(normalized, null, 2)}\n`, 'utf-8'); } +export function readKtxSetupContextStateSync(projectDir: string): KtxSetupContextState { + const resolvedProjectDir = resolve(projectDir); + const filePath = statePath(resolvedProjectDir); + if (!existsSync(filePath)) { + return notStartedState(resolvedProjectDir); + } + return normalizeState(resolvedProjectDir, JSON.parse(readFileSync(filePath, 'utf-8')) as unknown); +} + +export function writeKtxSetupContextStateSync(projectDir: string, state: KtxSetupContextState): void { + const resolvedProjectDir = resolve(projectDir); + mkdirSync(join(resolvedProjectDir, '.ktx', 'setup'), { recursive: true }); + const normalized = normalizeState(resolvedProjectDir, { + ...state, + commands: contextBuildCommands(resolvedProjectDir, state.runId), + }); + writeFileSync(statePath(resolvedProjectDir), `${JSON.stringify(normalized, null, 2)}\n`); +} + export function setupContextStatusFromState( state: KtxSetupContextState, options: { completedStep: boolean } = { completedStep: false }, @@ -307,6 +326,53 @@ function listContextTargets(project: KtxLocalProject): KtxSetupContextTargets { }; } +function sourceProgressKey(source: Pick): string { + return `${source.operation}:${source.connectionId}`; +} + +function sourceProgressWithTargets( + sourceProgress: ContextBuildSourceProgressUpdate[] | undefined, + targets: KtxSetupContextTargets, +): ContextBuildSourceProgressUpdate[] | undefined { + if (!sourceProgress || sourceProgress.length === 0) { + return undefined; + } + const merged = [...sourceProgress]; + const seen = new Set(merged.map(sourceProgressKey)); + for (const connectionId of targets.primarySourceConnectionIds) { + const key = sourceProgressKey({ connectionId, operation: 'scan' }); + if (!seen.has(key)) { + merged.push({ connectionId, operation: 'scan', status: 'queued' }); + } + } + for (const connectionId of targets.contextSourceConnectionIds) { + const key = sourceProgressKey({ connectionId, operation: 'source-ingest' }); + if (!seen.has(key)) { + merged.push({ connectionId, operation: 'source-ingest', status: 'queued' }); + } + } + return merged; +} + +async function activeStateWithCurrentTargets( + projectDir: string, + state: KtxSetupContextState, + targets: KtxSetupContextTargets, +): Promise { + const sourceProgress = sourceProgressWithTargets(state.sourceProgress, targets); + if (!sourceProgress) { + return state; + } + const nextState = { + ...state, + primarySourceConnectionIds: targets.primarySourceConnectionIds, + contextSourceConnectionIds: targets.contextSourceConnectionIds, + sourceProgress, + }; + await writeKtxSetupContextState(projectDir, nextState); + return nextState; +} + function missingCapabilities(project: KtxLocalProject): string[] { const missing: string[] = []; const llm = project.config.llm; @@ -721,6 +787,7 @@ export async function runKtxSetupContextStep( try { const project = await loadKtxProject({ projectDir: args.projectDir }); let existingState = await readKtxSetupContextState(args.projectDir); + const targets = listContextTargets(project); const completedSteps = ktxSetupCompletedSteps(project.config, await readKtxSetupState(args.projectDir)); if (completedSteps.includes('context') && existingState.status === 'completed') { return { status: 'ready', projectDir: args.projectDir, runId: existingState.runId ?? 'setup-context-completed' }; @@ -730,6 +797,7 @@ export async function runKtxSetupContextStep( (existingState.status === 'running' || existingState.status === 'detached') && args.inputMode !== 'disabled' ) { + existingState = await activeStateWithCurrentTargets(args.projectDir, existingState, targets); if (args.autoWatch) { const watched = await watchContextStatus( { @@ -790,7 +858,6 @@ export async function runKtxSetupContextStep( } } - const targets = listContextTargets(project); if (targets.primarySourceConnectionIds.length === 0 && targets.contextSourceConnectionIds.length === 0) { if (args.allowEmpty === true) { return { status: 'skipped', projectDir: args.projectDir }; diff --git a/packages/cli/src/setup-primary-scan-prefetch.test.ts b/packages/cli/src/setup-primary-scan-prefetch.test.ts new file mode 100644 index 00000000..8f73c7b5 --- /dev/null +++ b/packages/cli/src/setup-primary-scan-prefetch.test.ts @@ -0,0 +1,124 @@ +import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { + runPrimaryScanPrefetchWorker, + startPrimaryScanPrefetch, +} from './setup-primary-scan-prefetch.js'; + +function makeIo() { + let stdout = ''; + let stderr = ''; + return { + io: { + stdout: { + write: (chunk: string) => { + stdout += chunk; + }, + }, + stderr: { + write: (chunk: string) => { + stderr += chunk; + }, + }, + }, + stdout: () => stdout, + stderr: () => stderr, + }; +} + +async function writeReadyProject(projectDir: string) { + await writeFile( + join(projectDir, 'ktx.yaml'), + [ + 'project: revenue', + 'setup:', + ' database_connection_ids:', + ' - warehouse', + ' completed_steps:', + ' - project', + ' - llm', + ' - embeddings', + ' - databases', + 'connections:', + ' warehouse:', + ' driver: postgres', + ' url: env:DATABASE_URL', + 'llm:', + ' provider:', + ' backend: anthropic', + ' models:', + ' default: claude-sonnet-4-6', + 'ingest:', + ' embeddings:', + ' backend: openai', + ' model: text-embedding-3-small', + ' dimensions: 1536', + 'scan:', + ' enrichment:', + ' mode: llm', + '', + ].join('\n'), + 'utf-8', + ); +} + +describe('setup primary scan prefetch', () => { + let tempDir: string; + + beforeEach(async () => { + tempDir = await mkdtemp(join(tmpdir(), 'ktx-primary-prefetch-')); + await writeReadyProject(tempDir); + }); + + afterEach(async () => { + await rm(tempDir, { recursive: true, force: true }); + }); + + it('starts the background scan without printing a resume command', async () => { + const io = makeIo(); + + await expect( + startPrimaryScanPrefetch( + { projectDir: tempDir, inputMode: 'auto', yes: true, connectionIds: ['warehouse'] }, + io.io, + { + runIdFactory: () => 'setup-context-prefetch-test', + now: () => new Date('2026-05-09T10:00:00.000Z'), + spawnPrefetch: () => ({ logPath: join(tempDir, '.ktx', 'setup', 'context-build.log') }), + }, + ), + ).resolves.toMatchObject({ status: 'started', runId: 'setup-context-prefetch-test' }); + + expect(io.stdout()).toContain('Primary source context scan started in the background (warehouse).'); + expect(io.stdout()).not.toContain('Resume:'); + }); + + it('does not crash on progress state write failures', async () => { + const io = makeIo(); + const setupPath = join(tempDir, '.ktx', 'setup'); + const runContextBuild = vi.fn(async (_project, _args, _io, hooks) => { + await rm(setupPath, { recursive: true, force: true }); + await writeFile(setupPath, 'not a directory', 'utf-8'); + hooks.onSourceProgress?.([ + { connectionId: 'warehouse', operation: 'scan' as const, status: 'running' as const, startedAtMs: 1000 }, + ]); + await rm(setupPath, { force: true }); + await mkdir(setupPath, { recursive: true }); + return { exitCode: 0, detached: false }; + }); + + await expect( + runPrimaryScanPrefetchWorker( + { projectDir: tempDir, runId: 'setup-context-prefetch-write-failure', connectionIds: ['warehouse'] }, + io.io, + { + now: () => new Date('2026-05-09T10:00:00.000Z'), + runContextBuild, + }, + ), + ).resolves.toBe(0); + }); +}); diff --git a/packages/cli/src/setup-primary-scan-prefetch.ts b/packages/cli/src/setup-primary-scan-prefetch.ts index f4ecdeac..9d888bac 100644 --- a/packages/cli/src/setup-primary-scan-prefetch.ts +++ b/packages/cli/src/setup-primary-scan-prefetch.ts @@ -7,7 +7,9 @@ import type { KtxCliIo } from './cli-runtime.js'; import { contextBuildCommands, readKtxSetupContextState, + readKtxSetupContextStateSync, writeKtxSetupContextState, + writeKtxSetupContextStateSync, type KtxSetupContextState, } from './setup-context.js'; import { buildPublicIngestPlan } from './public-ingest.js'; @@ -158,6 +160,33 @@ function queuedProgress(connectionIds: string[]): ContextBuildSourceProgressUpda return connectionIds.map((connectionId) => ({ connectionId, operation: 'scan', status: 'queued' })); } +function sourceProgressKey(source: Pick): string { + return `${source.operation}:${source.connectionId}`; +} + +function mergeSourceProgress( + latest: ContextBuildSourceProgressUpdate[], + current: ContextBuildSourceProgressUpdate[] | undefined, +): ContextBuildSourceProgressUpdate[] { + const latestKeys = new Set(latest.map(sourceProgressKey)); + return [...latest, ...(current ?? []).filter((source) => !latestKeys.has(sourceProgressKey(source)))]; +} + +function currentStateWithProgress(projectDir: string, fallback: KtxSetupContextState, latest: ContextBuildSourceProgressUpdate[]) { + try { + const current = readKtxSetupContextStateSync(projectDir); + return { + contextSourceConnectionIds: current.contextSourceConnectionIds, + sourceProgress: mergeSourceProgress(latest, current.sourceProgress), + }; + } catch { + return { + contextSourceConnectionIds: fallback.contextSourceConnectionIds, + sourceProgress: latest, + }; + } +} + function stateForPrefetch(input: { projectDir: string; runId: string; @@ -260,7 +289,6 @@ export async function startPrimaryScanPrefetch( } await writeKtxSetupContextState(args.projectDir, initialState); io.stdout.write(`│ Primary source context scan started in the background (${connectionIds.join(', ')}).\n`); - io.stdout.write(`│ Resume: ${contextBuildCommands(args.projectDir, runId).watch}\n`); return { status: 'started', projectDir: args.projectDir, @@ -313,12 +341,18 @@ export async function runPrimaryScanPrefetchWorker( io, { onSourceProgress: (sources) => { - lastSourceProgress = sources; - void writeKtxSetupContextState(args.projectDir, { - ...runningState, - updatedAt: now().toISOString(), - sourceProgress: sources, - }); + const current = currentStateWithProgress(args.projectDir, runningState, sources); + lastSourceProgress = current.sourceProgress; + try { + writeKtxSetupContextStateSync(args.projectDir, { + ...runningState, + contextSourceConnectionIds: current.contextSourceConnectionIds, + updatedAt: now().toISOString(), + sourceProgress: current.sourceProgress, + }); + } catch { + // Progress reporting is supplementary; the worker should keep scanning. + } }, }, ); @@ -335,15 +369,17 @@ export async function runPrimaryScanPrefetchWorker( } const completedAt = now().toISOString(); + const current = currentStateWithProgress(args.projectDir, runningState, lastSourceProgress ?? []); await writeKtxSetupContextState(args.projectDir, { ...runningState, + contextSourceConnectionIds: current.contextSourceConnectionIds, status: result.exitCode === 0 ? 'paused' : 'failed', updatedAt: completedAt, reportIds: result.reportIds ?? [], artifactPaths: result.artifactPaths ?? [], retryableFailedTargets: result.exitCode === 0 ? [] : connectionIds, ...(result.exitCode === 0 ? {} : { failureReason: 'Primary source context scan failed.' }), - ...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}), + ...(current.sourceProgress.length > 0 ? { sourceProgress: current.sourceProgress } : {}), }); return result.exitCode; }