fix(cli): preserve setup prefetch progress

This commit is contained in:
Luca Martial 2026-05-12 21:38:04 -07:00
parent 01e1fe5569
commit a834e6989f
6 changed files with 282 additions and 13 deletions

View file

@ -292,8 +292,8 @@ describe('renderContextBuildView', () => {
const output = renderContextBuildView(state, { styled: false, showHint: true, projectDir: '/tmp/project' });
expect(output).toContain('d to detach');
expect(output).toContain('ktx setup --project-dir /tmp/project');
expect(output).toContain('to resume');
expect(output).not.toContain('ktx setup --project-dir /tmp/project');
expect(output).not.toContain('to resume');
});
it('omits detach hint when all targets are done', () => {

View file

@ -220,7 +220,7 @@ export function renderContextBuildView(
}
if (options.showHint && hasActive) {
const hintContent = options.hintText ?? `d to detach · ${resumeCommand(options.projectDir)} to resume`;
const hintContent = options.hintText ?? 'd to detach';
const hint = ` ${hintContent}`;
lines.push(styled ? dim(hint) : hint);
lines.push('');

View file

@ -568,6 +568,48 @@ describe('setup context build state', () => {
);
});
it('shows newly configured context sources while watching an active primary scan prefetch', async () => {
await writeReadyProject(tempDir);
await writeKtxSetupContextState(tempDir, {
runId: 'setup-context-local-prefetch-watch',
status: 'detached',
startedAt: '2026-05-09T10:00:00.000Z',
updatedAt: '2026-05-09T10:00:00.000Z',
primarySourceConnectionIds: ['warehouse'],
contextSourceConnectionIds: [],
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(tempDir, 'setup-context-local-prefetch-watch'),
sourceProgress: [
{ connectionId: 'warehouse', operation: 'scan' as const, status: 'running' as const, startedAtMs: Date.now() },
],
});
const io = makeIo();
let triggerDetach: (() => void) | null = null;
await expect(
runKtxSetupContextStep(
{ projectDir: tempDir, inputMode: 'auto', autoWatch: true },
io.io,
{
sleep: async () => { triggerDetach?.(); },
watchIntervalMs: 1,
setupKeystroke: (onDetach) => {
triggerDetach = onDetach;
return () => {};
},
},
),
).resolves.toMatchObject({ status: 'detached' });
const output = io.stdout();
expect(output).toContain('Primary sources:');
expect(output).toContain('warehouse');
expect(output).toContain('Context sources:');
expect(output).toContain('docs');
});
it('renders the progress view when watching a build with sourceProgress', async () => {
await writeReadyProject(tempDir);
await writeKtxSetupContextState(tempDir, {

View file

@ -1,4 +1,4 @@
import { mkdirSync, writeFileSync } from 'node:fs';
import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs';
import { access, mkdir, readdir, readFile, writeFile } from 'node:fs/promises';
import { join, resolve } from 'node:path';
import { cancel, isCancel, select } from '@clack/prompts';
@ -272,6 +272,25 @@ export async function writeKtxSetupContextState(projectDir: string, state: KtxSe
await writeFile(statePath(resolvedProjectDir), `${JSON.stringify(normalized, null, 2)}\n`, 'utf-8');
}
export function readKtxSetupContextStateSync(projectDir: string): KtxSetupContextState {
const resolvedProjectDir = resolve(projectDir);
const filePath = statePath(resolvedProjectDir);
if (!existsSync(filePath)) {
return notStartedState(resolvedProjectDir);
}
return normalizeState(resolvedProjectDir, JSON.parse(readFileSync(filePath, 'utf-8')) as unknown);
}
export function writeKtxSetupContextStateSync(projectDir: string, state: KtxSetupContextState): void {
const resolvedProjectDir = resolve(projectDir);
mkdirSync(join(resolvedProjectDir, '.ktx', 'setup'), { recursive: true });
const normalized = normalizeState(resolvedProjectDir, {
...state,
commands: contextBuildCommands(resolvedProjectDir, state.runId),
});
writeFileSync(statePath(resolvedProjectDir), `${JSON.stringify(normalized, null, 2)}\n`);
}
export function setupContextStatusFromState(
state: KtxSetupContextState,
options: { completedStep: boolean } = { completedStep: false },
@ -307,6 +326,53 @@ function listContextTargets(project: KtxLocalProject): KtxSetupContextTargets {
};
}
function sourceProgressKey(source: Pick<ContextBuildSourceProgressUpdate, 'connectionId' | 'operation'>): string {
return `${source.operation}:${source.connectionId}`;
}
function sourceProgressWithTargets(
sourceProgress: ContextBuildSourceProgressUpdate[] | undefined,
targets: KtxSetupContextTargets,
): ContextBuildSourceProgressUpdate[] | undefined {
if (!sourceProgress || sourceProgress.length === 0) {
return undefined;
}
const merged = [...sourceProgress];
const seen = new Set(merged.map(sourceProgressKey));
for (const connectionId of targets.primarySourceConnectionIds) {
const key = sourceProgressKey({ connectionId, operation: 'scan' });
if (!seen.has(key)) {
merged.push({ connectionId, operation: 'scan', status: 'queued' });
}
}
for (const connectionId of targets.contextSourceConnectionIds) {
const key = sourceProgressKey({ connectionId, operation: 'source-ingest' });
if (!seen.has(key)) {
merged.push({ connectionId, operation: 'source-ingest', status: 'queued' });
}
}
return merged;
}
async function activeStateWithCurrentTargets(
projectDir: string,
state: KtxSetupContextState,
targets: KtxSetupContextTargets,
): Promise<KtxSetupContextState> {
const sourceProgress = sourceProgressWithTargets(state.sourceProgress, targets);
if (!sourceProgress) {
return state;
}
const nextState = {
...state,
primarySourceConnectionIds: targets.primarySourceConnectionIds,
contextSourceConnectionIds: targets.contextSourceConnectionIds,
sourceProgress,
};
await writeKtxSetupContextState(projectDir, nextState);
return nextState;
}
function missingCapabilities(project: KtxLocalProject): string[] {
const missing: string[] = [];
const llm = project.config.llm;
@ -721,6 +787,7 @@ export async function runKtxSetupContextStep(
try {
const project = await loadKtxProject({ projectDir: args.projectDir });
let existingState = await readKtxSetupContextState(args.projectDir);
const targets = listContextTargets(project);
const completedSteps = ktxSetupCompletedSteps(project.config, await readKtxSetupState(args.projectDir));
if (completedSteps.includes('context') && existingState.status === 'completed') {
return { status: 'ready', projectDir: args.projectDir, runId: existingState.runId ?? 'setup-context-completed' };
@ -730,6 +797,7 @@ export async function runKtxSetupContextStep(
(existingState.status === 'running' || existingState.status === 'detached') &&
args.inputMode !== 'disabled'
) {
existingState = await activeStateWithCurrentTargets(args.projectDir, existingState, targets);
if (args.autoWatch) {
const watched = await watchContextStatus(
{
@ -790,7 +858,6 @@ export async function runKtxSetupContextStep(
}
}
const targets = listContextTargets(project);
if (targets.primarySourceConnectionIds.length === 0 && targets.contextSourceConnectionIds.length === 0) {
if (args.allowEmpty === true) {
return { status: 'skipped', projectDir: args.projectDir };

View file

@ -0,0 +1,124 @@
import { mkdir, mkdtemp, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
runPrimaryScanPrefetchWorker,
startPrimaryScanPrefetch,
} from './setup-primary-scan-prefetch.js';
function makeIo() {
let stdout = '';
let stderr = '';
return {
io: {
stdout: {
write: (chunk: string) => {
stdout += chunk;
},
},
stderr: {
write: (chunk: string) => {
stderr += chunk;
},
},
},
stdout: () => stdout,
stderr: () => stderr,
};
}
async function writeReadyProject(projectDir: string) {
await writeFile(
join(projectDir, 'ktx.yaml'),
[
'project: revenue',
'setup:',
' database_connection_ids:',
' - warehouse',
' completed_steps:',
' - project',
' - llm',
' - embeddings',
' - databases',
'connections:',
' warehouse:',
' driver: postgres',
' url: env:DATABASE_URL',
'llm:',
' provider:',
' backend: anthropic',
' models:',
' default: claude-sonnet-4-6',
'ingest:',
' embeddings:',
' backend: openai',
' model: text-embedding-3-small',
' dimensions: 1536',
'scan:',
' enrichment:',
' mode: llm',
'',
].join('\n'),
'utf-8',
);
}
describe('setup primary scan prefetch', () => {
let tempDir: string;
beforeEach(async () => {
tempDir = await mkdtemp(join(tmpdir(), 'ktx-primary-prefetch-'));
await writeReadyProject(tempDir);
});
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true });
});
it('starts the background scan without printing a resume command', async () => {
const io = makeIo();
await expect(
startPrimaryScanPrefetch(
{ projectDir: tempDir, inputMode: 'auto', yes: true, connectionIds: ['warehouse'] },
io.io,
{
runIdFactory: () => 'setup-context-prefetch-test',
now: () => new Date('2026-05-09T10:00:00.000Z'),
spawnPrefetch: () => ({ logPath: join(tempDir, '.ktx', 'setup', 'context-build.log') }),
},
),
).resolves.toMatchObject({ status: 'started', runId: 'setup-context-prefetch-test' });
expect(io.stdout()).toContain('Primary source context scan started in the background (warehouse).');
expect(io.stdout()).not.toContain('Resume:');
});
it('does not crash on progress state write failures', async () => {
const io = makeIo();
const setupPath = join(tempDir, '.ktx', 'setup');
const runContextBuild = vi.fn(async (_project, _args, _io, hooks) => {
await rm(setupPath, { recursive: true, force: true });
await writeFile(setupPath, 'not a directory', 'utf-8');
hooks.onSourceProgress?.([
{ connectionId: 'warehouse', operation: 'scan' as const, status: 'running' as const, startedAtMs: 1000 },
]);
await rm(setupPath, { force: true });
await mkdir(setupPath, { recursive: true });
return { exitCode: 0, detached: false };
});
await expect(
runPrimaryScanPrefetchWorker(
{ projectDir: tempDir, runId: 'setup-context-prefetch-write-failure', connectionIds: ['warehouse'] },
io.io,
{
now: () => new Date('2026-05-09T10:00:00.000Z'),
runContextBuild,
},
),
).resolves.toBe(0);
});
});

View file

@ -7,7 +7,9 @@ import type { KtxCliIo } from './cli-runtime.js';
import {
contextBuildCommands,
readKtxSetupContextState,
readKtxSetupContextStateSync,
writeKtxSetupContextState,
writeKtxSetupContextStateSync,
type KtxSetupContextState,
} from './setup-context.js';
import { buildPublicIngestPlan } from './public-ingest.js';
@ -158,6 +160,33 @@ function queuedProgress(connectionIds: string[]): ContextBuildSourceProgressUpda
return connectionIds.map((connectionId) => ({ connectionId, operation: 'scan', status: 'queued' }));
}
function sourceProgressKey(source: Pick<ContextBuildSourceProgressUpdate, 'connectionId' | 'operation'>): string {
return `${source.operation}:${source.connectionId}`;
}
function mergeSourceProgress(
latest: ContextBuildSourceProgressUpdate[],
current: ContextBuildSourceProgressUpdate[] | undefined,
): ContextBuildSourceProgressUpdate[] {
const latestKeys = new Set(latest.map(sourceProgressKey));
return [...latest, ...(current ?? []).filter((source) => !latestKeys.has(sourceProgressKey(source)))];
}
function currentStateWithProgress(projectDir: string, fallback: KtxSetupContextState, latest: ContextBuildSourceProgressUpdate[]) {
try {
const current = readKtxSetupContextStateSync(projectDir);
return {
contextSourceConnectionIds: current.contextSourceConnectionIds,
sourceProgress: mergeSourceProgress(latest, current.sourceProgress),
};
} catch {
return {
contextSourceConnectionIds: fallback.contextSourceConnectionIds,
sourceProgress: latest,
};
}
}
function stateForPrefetch(input: {
projectDir: string;
runId: string;
@ -260,7 +289,6 @@ export async function startPrimaryScanPrefetch(
}
await writeKtxSetupContextState(args.projectDir, initialState);
io.stdout.write(`│ Primary source context scan started in the background (${connectionIds.join(', ')}).\n`);
io.stdout.write(`│ Resume: ${contextBuildCommands(args.projectDir, runId).watch}\n`);
return {
status: 'started',
projectDir: args.projectDir,
@ -313,12 +341,18 @@ export async function runPrimaryScanPrefetchWorker(
io,
{
onSourceProgress: (sources) => {
lastSourceProgress = sources;
void writeKtxSetupContextState(args.projectDir, {
...runningState,
updatedAt: now().toISOString(),
sourceProgress: sources,
});
const current = currentStateWithProgress(args.projectDir, runningState, sources);
lastSourceProgress = current.sourceProgress;
try {
writeKtxSetupContextStateSync(args.projectDir, {
...runningState,
contextSourceConnectionIds: current.contextSourceConnectionIds,
updatedAt: now().toISOString(),
sourceProgress: current.sourceProgress,
});
} catch {
// Progress reporting is supplementary; the worker should keep scanning.
}
},
},
);
@ -335,15 +369,17 @@ export async function runPrimaryScanPrefetchWorker(
}
const completedAt = now().toISOString();
const current = currentStateWithProgress(args.projectDir, runningState, lastSourceProgress ?? []);
await writeKtxSetupContextState(args.projectDir, {
...runningState,
contextSourceConnectionIds: current.contextSourceConnectionIds,
status: result.exitCode === 0 ? 'paused' : 'failed',
updatedAt: completedAt,
reportIds: result.reportIds ?? [],
artifactPaths: result.artifactPaths ?? [],
retryableFailedTargets: result.exitCode === 0 ? [] : connectionIds,
...(result.exitCode === 0 ? {} : { failureReason: 'Primary source context scan failed.' }),
...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}),
...(current.sourceProgress.length > 0 ? { sourceProgress: current.sourceProgress } : {}),
});
return result.exitCode;
}