feat(cli): prefetch primary scans during setup

This commit is contained in:
Luca Martial 2026-05-12 18:25:14 -07:00
parent fefbabab5f
commit 01e1fe5569
8 changed files with 693 additions and 56 deletions

View file

@ -311,6 +311,14 @@ export function registerSetupCommands(program: Command, context: KtxCliCommandCo
)
.option('--skip-initial-source-ingest', 'Validate source setup without building source context during setup', false)
.option('--skip-sources', 'Mark optional source setup complete with no sources', false)
.addOption(new Option('--internal-primary-scan-prefetch', 'Run the internal setup primary scan prefetch worker').hideHelp().default(false))
.addOption(new Option('--primary-scan-prefetch-run-id <id>', 'Internal setup primary scan prefetch run id').hideHelp())
.addOption(
new Option('--primary-scan-prefetch-connection-id <id>', 'Internal setup primary scan target connection id')
.hideHelp()
.argParser((value, previous: string[]) => [...previous, value])
.default([] as string[]),
)
.showHelpAfterError();
setup.hook('preAction', (_thisCommand, actionCommand) => {
@ -318,6 +326,17 @@ export function registerSetupCommands(program: Command, context: KtxCliCommandCo
});
setup.action(async (options, command) => {
if (options.internalPrimaryScanPrefetch) {
await runSetupArgs(context, {
command: 'primary-scan-prefetch',
projectDir: resolveCommandProjectDir(command),
...(options.primaryScanPrefetchRunId ? { runId: options.primaryScanPrefetchRunId } : {}),
...(options.primaryScanPrefetchConnectionId.length > 0
? { connectionIds: options.primaryScanPrefetchConnectionId }
: {}),
});
return;
}
if (options.anthropicApiKeyEnv && options.anthropicApiKeyFile) {
context.io.stderr.write(
'Choose only one Anthropic credential source: --anthropic-api-key-env or --anthropic-api-key-file.\n',

View file

@ -474,6 +474,39 @@ describe('runContextBuild', () => {
);
});
it('skips targets already completed by an earlier primary scan prefetch', async () => {
const io = makeIo();
const project = projectWithConnections({
warehouse: { driver: 'postgres' },
dbt_main: { driver: 'dbt' },
});
const executeTarget = vi.fn(async (target) => successResult(target.connectionId, target.driver, target.operation));
const result = await runContextBuild(
project,
{
projectDir: '/tmp/project',
inputMode: 'disabled',
completedSourceProgress: [
{ connectionId: 'warehouse', operation: 'scan', status: 'done', elapsedMs: 120000, summaryText: '42 tables' },
],
},
io.io,
{ executeTarget, now: () => 1000 },
);
expect(result).toEqual({ exitCode: 0, detached: false });
expect(executeTarget).toHaveBeenCalledTimes(1);
expect(executeTarget).toHaveBeenCalledWith(
expect.objectContaining({ connectionId: 'dbt_main', operation: 'source-ingest' }),
expect.anything(),
expect.anything(),
{},
);
expect(io.stdout()).toContain('warehouse');
expect(io.stdout()).toContain('42 tables');
});
it('exits immediately with paused message when d is pressed', async () => {
const mockExit = vi.spyOn(process, 'exit').mockImplementation(() => {
throw new Error('process.exit');

View file

@ -40,6 +40,9 @@ export interface ContextBuildArgs {
inputMode: 'auto' | 'disabled';
scanMode?: 'structural' | 'enriched';
detectRelationships?: boolean;
targetOperations?: Array<'scan' | 'source-ingest'>;
targetConnectionIds?: string[];
completedSourceProgress?: ContextBuildSourceProgressUpdate[];
}
export interface ContextBuildResult {
@ -523,10 +526,33 @@ function failureTextForTarget(input: {
return input.fallback ?? `${input.target.connectionId} failed.`;
}
export function initViewState(targets: KtxPublicIngestPlanTarget[]): ContextBuildViewState {
function progressKey(input: Pick<ContextBuildSourceProgressUpdate, 'connectionId' | 'operation'>): string {
return `${input.operation}:${input.connectionId}`;
}
export function initViewState(
targets: KtxPublicIngestPlanTarget[],
completedSourceProgress: ContextBuildSourceProgressUpdate[] = [],
): ContextBuildViewState {
const completedByKey = new Map(
completedSourceProgress.filter((source) => source.status === 'done').map((source) => [progressKey(source), source]),
);
const makeTargetWithProgress = (target: KtxPublicIngestPlanTarget): ContextBuildTargetState => {
const completed = completedByKey.get(progressKey(target));
const state = makeTargetState(target);
if (!completed) {
return state;
}
return {
...state,
status: 'done',
elapsedMs: completed.elapsedMs ?? 0,
summaryText: completed.summaryText ?? null,
};
};
return {
primarySources: targets.filter((t) => t.operation === 'scan').map(makeTargetState),
contextSources: targets.filter((t) => t.operation === 'source-ingest').map(makeTargetState),
primarySources: targets.filter((t) => t.operation === 'scan').map(makeTargetWithProgress),
contextSources: targets.filter((t) => t.operation === 'source-ingest').map(makeTargetWithProgress),
frame: 0,
startedAt: null,
totalElapsedMs: 0,
@ -540,7 +566,12 @@ export async function runContextBuild(
deps: ContextBuildDeps = {},
): Promise<ContextBuildResult> {
const plan = buildPublicIngestPlan(project, { projectDir: args.projectDir, all: true });
const state = initViewState(plan.targets);
const targetOperations = new Set(args.targetOperations ?? ['scan', 'source-ingest']);
const targetConnectionIds = args.targetConnectionIds ? new Set(args.targetConnectionIds) : null;
const targets = plan.targets.filter(
(target) => targetOperations.has(target.operation) && (!targetConnectionIds || targetConnectionIds.has(target.connectionId)),
);
const state = initViewState(targets, args.completedSourceProgress);
const isTTY = io.stdout.isTTY === true;
const nowFn = deps.now ?? (() => Date.now());
@ -618,6 +649,7 @@ export async function runContextBuild(
try {
for (const targetState of orderedTargets) {
if (detached) break;
if (targetState.status === 'done') continue;
targetState.status = 'running';
targetState.startedAt = nowFn();

View file

@ -3,6 +3,7 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { readKtxSetupState } from '@ktx/context/project';
import {
contextBuildCommands,
readKtxSetupContextState,
@ -203,7 +204,7 @@ describe('setup context build state', () => {
expect.objectContaining({ onDetach: expect.any(Function) }),
);
expect(verifyContextReady).toHaveBeenCalledWith(tempDir);
expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).toContain(' - context');
expect(await readKtxSetupState(tempDir)).toMatchObject({ completed_steps: expect.arrayContaining(['context']) });
await expect(readKtxSetupContextState(tempDir)).resolves.toMatchObject({
runId: 'setup-context-local-abc123',
status: 'completed',
@ -284,7 +285,7 @@ describe('setup context build state', () => {
).resolves.toEqual({ status: 'ready', projectDir: tempDir, runId: 'setup-context-local-existing' });
expect(runContextBuildMock).not.toHaveBeenCalled();
expect(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')).toContain(' - context');
expect(await readKtxSetupState(tempDir)).toMatchObject({ completed_steps: expect.arrayContaining(['context']) });
await expect(readKtxSetupContextState(tempDir)).resolves.toMatchObject({
runId: 'setup-context-local-existing',
status: 'completed',
@ -486,6 +487,87 @@ describe('setup context build state', () => {
expect(io.stdout()).toContain('KTX context built: yes');
});
it('continues the full context build after auto-watching a completed primary scan prefetch', async () => {
await writeReadyProject(tempDir);
await writeKtxSetupContextState(tempDir, {
runId: 'setup-context-local-prefetch',
status: 'detached',
startedAt: '2026-05-09T10:00:00.000Z',
updatedAt: '2026-05-09T10:00:00.000Z',
primarySourceConnectionIds: ['warehouse'],
contextSourceConnectionIds: [],
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(tempDir, 'setup-context-local-prefetch'),
sourceProgress: [
{ connectionId: 'warehouse', operation: 'scan' as const, status: 'running' as const, startedAtMs: Date.now() },
],
});
const io = makeIo();
const completePrefetch = async () => {
await writeKtxSetupContextState(tempDir, {
runId: 'setup-context-local-prefetch',
status: 'paused',
startedAt: '2026-05-09T10:00:00.000Z',
updatedAt: '2026-05-09T10:02:00.000Z',
primarySourceConnectionIds: ['warehouse'],
contextSourceConnectionIds: [],
reportIds: ['warehouse-report'],
artifactPaths: ['raw-sources/warehouse/live-database/sync-1/scan-report.json'],
retryableFailedTargets: [],
commands: contextBuildCommands(tempDir, 'setup-context-local-prefetch'),
sourceProgress: [
{ connectionId: 'warehouse', operation: 'scan' as const, status: 'done' as const, elapsedMs: 120000 },
],
});
};
const runContextBuildMock = vi.fn(async () => ({
exitCode: 0,
detached: false,
reportIds: ['docs-report'],
artifactPaths: ['raw-sources/docs/notion/sync-1/ingest-report.json'],
}));
const verifyContextReady = vi.fn(async () => ({
ready: true,
agentContextReady: true,
semanticSearchReady: true,
details: ['warehouse: enriched scan complete', 'docs: memory update complete'],
}));
const select = vi.fn(async () => {
throw new Error('should not prompt while auto-watching a prefetch');
});
await expect(
runKtxSetupContextStep(
{ projectDir: tempDir, inputMode: 'auto', autoWatch: true },
io.io,
{
prompts: { select, cancel: vi.fn() },
sleep: completePrefetch,
watchIntervalMs: 1,
runIdFactory: () => 'setup-context-local-final',
now: () => new Date('2026-05-09T10:03:00.000Z'),
runContextBuild: runContextBuildMock,
verifyContextReady,
},
),
).resolves.toEqual({ status: 'ready', projectDir: tempDir, runId: 'setup-context-local-final' });
expect(select).not.toHaveBeenCalled();
expect(runContextBuildMock).toHaveBeenCalledWith(
expect.objectContaining({ projectDir: tempDir }),
expect.objectContaining({
projectDir: tempDir,
completedSourceProgress: [
{ connectionId: 'warehouse', operation: 'scan', status: 'done', elapsedMs: 120000 },
],
}),
io.io,
expect.anything(),
);
});
it('renders the progress view when watching a build with sourceProgress', async () => {
await writeReadyProject(tempDir);
await writeKtxSetupContextState(tempDir, {

View file

@ -551,10 +551,12 @@ async function runBuild(
deps: KtxSetupContextDeps,
project: KtxLocalProject,
targets: KtxSetupContextTargets,
existingState?: KtxSetupContextState,
): Promise<KtxSetupContextResult> {
const now = deps.now ?? (() => new Date());
const runId = deps.runIdFactory?.() ?? runIdFactory();
const startedAt = now().toISOString();
const completedSourceProgress = existingState?.sourceProgress?.filter((source) => source.status === 'done') ?? [];
const runningState: KtxSetupContextState = {
runId,
status: 'running',
@ -566,10 +568,12 @@ async function runBuild(
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(args.projectDir, runId),
...(completedSourceProgress.length > 0 ? { sourceProgress: completedSourceProgress } : {}),
};
await writeKtxSetupContextState(args.projectDir, runningState);
let lastSourceProgress: ContextBuildSourceProgressUpdate[] | undefined;
let lastSourceProgress: ContextBuildSourceProgressUpdate[] | undefined =
completedSourceProgress.length > 0 ? completedSourceProgress : undefined;
const contextBuild = deps.runContextBuild ?? runContextBuild;
const buildResult = await contextBuild(
project,
@ -578,6 +582,7 @@ async function runBuild(
inputMode: args.inputMode,
scanMode: 'enriched',
detectRelationships: true,
...(completedSourceProgress.length > 0 ? { completedSourceProgress } : {}),
},
io,
{
@ -609,8 +614,10 @@ async function runBuild(
},
},
);
const completedReportIds = buildResult.reportIds ?? [];
const completedArtifactPaths = buildResult.artifactPaths ?? [];
const completedReportIds = [...new Set([...(existingState?.reportIds ?? []), ...(buildResult.reportIds ?? [])])];
const completedArtifactPaths = [
...new Set([...(existingState?.artifactPaths ?? []), ...(buildResult.artifactPaths ?? [])]),
];
if (buildResult.detached) {
const updatedAt = now().toISOString();
await writeKtxSetupContextState(args.projectDir, {
@ -713,7 +720,7 @@ export async function runKtxSetupContextStep(
): Promise<KtxSetupContextResult> {
try {
const project = await loadKtxProject({ projectDir: args.projectDir });
const existingState = await readKtxSetupContextState(args.projectDir);
let existingState = await readKtxSetupContextState(args.projectDir);
const completedSteps = ktxSetupCompletedSteps(project.config, await readKtxSetupState(args.projectDir));
if (completedSteps.includes('context') && existingState.status === 'completed') {
return { status: 'ready', projectDir: args.projectDir, runId: existingState.runId ?? 'setup-context-completed' };
@ -734,41 +741,52 @@ export async function runKtxSetupContextStep(
io,
deps,
);
return setupResultFromWatchedState(args.projectDir, watched.state);
if (watched.state.status !== 'paused') {
return setupResultFromWatchedState(args.projectDir, watched.state);
}
existingState = watched.state;
}
const prompts = deps.prompts ?? createPromptAdapter();
const choice = await prompts.select({
message:
'A context build is running in the background.\n\n' +
'You can watch it until it finishes, check its status once, or start a fresh build.',
options: [
{ value: 'watch', label: 'Watch progress' },
{ value: 'status', label: 'Check status' },
{ value: 'rebuild', label: 'Start a fresh context build' },
{ value: 'back', label: 'Back' },
],
});
if (choice === 'watch') {
const watched = await watchContextStatus(
{
projectDir: args.projectDir,
...(existingState.runId ? { runId: existingState.runId } : {}),
inputMode: args.inputMode,
},
existingState,
io,
deps,
);
return setupResultFromWatchedState(args.projectDir, watched.state);
}
if (choice === 'status') {
const commands = contextBuildCommands(args.projectDir, existingState.runId);
io.stdout.write(`\nRun: ${commands.status}\n`);
io.stdout.write(`Log: ${join(resolve(args.projectDir), '.ktx', 'setup', 'context-build.log')}\n`);
return { status: 'detached', projectDir: args.projectDir, runId: existingState.runId ?? '' };
}
if (choice === 'back') {
return { status: 'back', projectDir: args.projectDir };
if (existingState.status === 'running' || existingState.status === 'detached') {
const prompts = deps.prompts ?? createPromptAdapter();
const choice = await prompts.select({
message:
'A context build is running in the background.\n\n' +
'You can watch it until it finishes, check its status once, or start a fresh build.',
options: [
{ value: 'watch', label: 'Watch progress' },
{ value: 'status', label: 'Check status' },
{ value: 'rebuild', label: 'Start a fresh context build' },
{ value: 'back', label: 'Back' },
],
});
if (choice === 'watch') {
const watched = await watchContextStatus(
{
projectDir: args.projectDir,
...(existingState.runId ? { runId: existingState.runId } : {}),
inputMode: args.inputMode,
},
existingState,
io,
deps,
);
if (watched.state.status !== 'paused') {
return setupResultFromWatchedState(args.projectDir, watched.state);
}
existingState = watched.state;
}
if (choice === 'status') {
const commands = contextBuildCommands(args.projectDir, existingState.runId);
io.stdout.write(`\nRun: ${commands.status}\n`);
io.stdout.write(`Log: ${join(resolve(args.projectDir), '.ktx', 'setup', 'context-build.log')}\n`);
return { status: 'detached', projectDir: args.projectDir, runId: existingState.runId ?? '' };
}
if (choice === 'back') {
return { status: 'back', projectDir: args.projectDir };
}
if (choice === 'rebuild') {
existingState = notStartedState(args.projectDir);
}
}
}
@ -797,7 +815,7 @@ export async function runKtxSetupContextStep(
}
}
if (args.inputMode !== 'disabled' && args.prompt !== false) {
if (args.inputMode !== 'disabled' && args.prompt !== false && existingState.status !== 'paused') {
const choice = await promptForBuild(deps.prompts ?? createPromptAdapter());
if (choice === 'back') {
return { status: 'back', projectDir: args.projectDir };
@ -808,7 +826,7 @@ export async function runKtxSetupContextStep(
}
}
return await runBuild(args, io, deps, project, targets);
return await runBuild(args, io, deps, project, targets, existingState);
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
return { status: 'failed', projectDir: args.projectDir };

View file

@ -0,0 +1,349 @@
import { spawn } from 'node:child_process';
import { mkdirSync, openSync } from 'node:fs';
import { join, resolve } from 'node:path';
import { cancel, isCancel, select } from '@clack/prompts';
import { loadKtxProject } from '@ktx/context/project';
import type { KtxCliIo } from './cli-runtime.js';
import {
contextBuildCommands,
readKtxSetupContextState,
writeKtxSetupContextState,
type KtxSetupContextState,
} from './setup-context.js';
import { buildPublicIngestPlan } from './public-ingest.js';
import {
type ContextBuildSourceProgressUpdate,
type ContextBuildResult,
runContextBuild,
} from './context-build-view.js';
import { withMenuOptionsSpacing } from './prompt-navigation.js';
import { withSetupInterruptConfirmation } from './setup-interrupt.js';
export interface KtxPrimaryScanPrefetchArgs {
projectDir: string;
inputMode: 'auto' | 'disabled';
yes: boolean;
connectionIds?: string[];
}
export interface KtxPrimaryScanPrefetchWorkerArgs {
projectDir: string;
runId?: string;
connectionIds?: string[];
}
export type KtxPrimaryScanPrefetchResult =
| { status: 'started'; projectDir: string; runId: string; logPath?: string }
| { status: 'running'; projectDir: string; runId?: string }
| { status: 'skipped'; projectDir: string; reason: string }
| { status: 'failed'; projectDir: string; reason: string };
export interface KtxPrimaryScanPrefetchPromptAdapter {
select(options: { message: string; options: Array<{ value: string; label: string }> }): Promise<string>;
cancel(message: string): void;
}
export interface KtxPrimaryScanPrefetchDeps {
prompts?: KtxPrimaryScanPrefetchPromptAdapter;
runIdFactory?: () => string;
now?: () => Date;
spawnPrefetch?: (args: { projectDir: string; runId: string; connectionIds: string[] }) => { logPath?: string } | null;
runContextBuild?: typeof runContextBuild;
}
const ACTIVE_CONTEXT_STATUSES = new Set(['running', 'detached']);
function createPromptAdapter(): KtxPrimaryScanPrefetchPromptAdapter {
return {
async select(options) {
const value = await withSetupInterruptConfirmation(() => select(withMenuOptionsSpacing(options)));
if (isCancel(value)) {
cancel('Setup cancelled.');
return 'wait';
}
return String(value);
},
cancel(message) {
cancel(message);
},
};
}
function runIdFactory(): string {
return `setup-context-prefetch-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
}
function resolveKtxEntryScript(): string | null {
const argv1 = process.argv[1];
if (argv1 && (argv1.endsWith('.js') || argv1.endsWith('.ts') || argv1.endsWith('.mjs'))) {
return argv1;
}
return null;
}
function spawnPrimaryScanPrefetch(input: {
projectDir: string;
runId: string;
connectionIds: string[];
}): { logPath: string } | null {
const entryScript = resolveKtxEntryScript();
if (!entryScript) return null;
const resolvedDir = resolve(input.projectDir);
const logDir = join(resolvedDir, '.ktx', 'setup');
mkdirSync(logDir, { recursive: true });
const logPath = join(logDir, 'context-build.log');
const logFd = openSync(logPath, 'a');
const connectionArgs = input.connectionIds.flatMap((connectionId) => [
'--primary-scan-prefetch-connection-id',
connectionId,
]);
const child = spawn(
process.execPath,
[
entryScript,
'setup',
'--project-dir',
resolvedDir,
'--no-input',
'--internal-primary-scan-prefetch',
'--primary-scan-prefetch-run-id',
input.runId,
...connectionArgs,
],
{ detached: true, stdio: ['ignore', logFd, logFd] },
);
child.unref();
return { logPath };
}
function missingPrimaryScanCapabilities(config: Awaited<ReturnType<typeof loadKtxProject>>['config']): string[] {
const missing: string[] = [];
if (config.llm.provider.backend === 'none' || !config.llm.models.default) {
missing.push('models');
}
const embeddings = config.ingest.embeddings;
if (
embeddings.backend === 'none' ||
embeddings.backend === 'deterministic' ||
!embeddings.model ||
embeddings.dimensions <= 0
) {
missing.push('embeddings');
}
if (config.scan.enrichment.mode === 'none') {
missing.push('scan enrichment');
}
return missing;
}
function primaryScanConnectionIds(
project: Awaited<ReturnType<typeof loadKtxProject>>,
preferredConnectionIds: string[] | undefined,
): string[] {
const preferred = preferredConnectionIds && preferredConnectionIds.length > 0 ? new Set(preferredConnectionIds) : null;
try {
const plan = buildPublicIngestPlan(project, { projectDir: project.projectDir, all: true });
return plan.targets
.filter((target) => target.operation === 'scan')
.filter((target) => !preferred || preferred.has(target.connectionId))
.map((target) => target.connectionId);
} catch {
return [];
}
}
function queuedProgress(connectionIds: string[]): ContextBuildSourceProgressUpdate[] {
return connectionIds.map((connectionId) => ({ connectionId, operation: 'scan', status: 'queued' }));
}
function stateForPrefetch(input: {
projectDir: string;
runId: string;
status: KtxSetupContextState['status'];
now: Date;
primarySourceConnectionIds: string[];
sourceProgress?: ContextBuildSourceProgressUpdate[];
reportIds?: string[];
artifactPaths?: string[];
failureReason?: string;
}): KtxSetupContextState {
const timestamp = input.now.toISOString();
return {
runId: input.runId,
status: input.status,
startedAt: timestamp,
updatedAt: timestamp,
primarySourceConnectionIds: input.primarySourceConnectionIds,
contextSourceConnectionIds: [],
reportIds: input.reportIds ?? [],
artifactPaths: input.artifactPaths ?? [],
retryableFailedTargets: input.status === 'failed' ? input.primarySourceConnectionIds : [],
commands: contextBuildCommands(input.projectDir, input.runId),
...(input.failureReason ? { failureReason: input.failureReason } : {}),
...(input.sourceProgress ? { sourceProgress: input.sourceProgress } : {}),
};
}
async function chooseStartPrefetch(
args: KtxPrimaryScanPrefetchArgs,
io: KtxCliIo,
deps: KtxPrimaryScanPrefetchDeps,
): Promise<boolean> {
if (args.yes) {
return true;
}
if (args.inputMode === 'disabled') {
return false;
}
if (io.stdout.isTTY !== true && !deps.prompts) {
return false;
}
const prompts = deps.prompts ?? createPromptAdapter();
const choice = await prompts.select({
message:
'Prepare primary source context while you finish setup?\n\n' +
'KTX can start the enriched primary-source scan now, then finish context sources later.',
options: [
{ value: 'start', label: 'Start in background (recommended)' },
{ value: 'wait', label: 'Wait until Build Context' },
],
});
return choice === 'start';
}
export async function startPrimaryScanPrefetch(
args: KtxPrimaryScanPrefetchArgs,
io: KtxCliIo,
deps: KtxPrimaryScanPrefetchDeps = {},
): Promise<KtxPrimaryScanPrefetchResult> {
const existingState = await readKtxSetupContextState(args.projectDir);
if (ACTIVE_CONTEXT_STATUSES.has(existingState.status)) {
return { status: 'running', projectDir: args.projectDir, runId: existingState.runId };
}
if (existingState.status === 'completed') {
return { status: 'skipped', projectDir: args.projectDir, reason: 'context already built' };
}
const project = await loadKtxProject({ projectDir: args.projectDir });
const missing = missingPrimaryScanCapabilities(project.config);
if (missing.length > 0) {
return { status: 'skipped', projectDir: args.projectDir, reason: `missing ${missing.join(', ')}` };
}
const connectionIds = primaryScanConnectionIds(project, args.connectionIds);
if (connectionIds.length === 0) {
return { status: 'skipped', projectDir: args.projectDir, reason: 'no primary sources' };
}
if (!(await chooseStartPrefetch(args, io, deps))) {
return { status: 'skipped', projectDir: args.projectDir, reason: 'user deferred' };
}
const runId = deps.runIdFactory?.() ?? runIdFactory();
const now = deps.now?.() ?? new Date();
const initialState = stateForPrefetch({
projectDir: args.projectDir,
runId,
status: 'detached',
now,
primarySourceConnectionIds: connectionIds,
sourceProgress: queuedProgress(connectionIds),
});
const spawned = (deps.spawnPrefetch ?? spawnPrimaryScanPrefetch)({
projectDir: args.projectDir,
runId,
connectionIds,
});
if (!spawned) {
return { status: 'skipped', projectDir: args.projectDir, reason: 'background runner unavailable' };
}
await writeKtxSetupContextState(args.projectDir, initialState);
io.stdout.write(`│ Primary source context scan started in the background (${connectionIds.join(', ')}).\n`);
io.stdout.write(`│ Resume: ${contextBuildCommands(args.projectDir, runId).watch}\n`);
return {
status: 'started',
projectDir: args.projectDir,
runId,
...(spawned.logPath ? { logPath: spawned.logPath } : {}),
};
}
export async function runPrimaryScanPrefetchWorker(
args: KtxPrimaryScanPrefetchWorkerArgs,
io: KtxCliIo,
deps: KtxPrimaryScanPrefetchDeps = {},
): Promise<number> {
const project = await loadKtxProject({ projectDir: args.projectDir });
const connectionIds = primaryScanConnectionIds(
project,
args.connectionIds ?? project.config.setup?.database_connection_ids,
);
if (connectionIds.length === 0) {
return 0;
}
const runId = args.runId ?? deps.runIdFactory?.() ?? runIdFactory();
const now = deps.now ?? (() => new Date());
const startedAt = now();
const runningState = stateForPrefetch({
projectDir: args.projectDir,
runId,
status: 'running',
now: startedAt,
primarySourceConnectionIds: connectionIds,
sourceProgress: queuedProgress(connectionIds),
});
await writeKtxSetupContextState(args.projectDir, runningState);
let lastSourceProgress: ContextBuildSourceProgressUpdate[] | undefined = runningState.sourceProgress;
const contextBuild = deps.runContextBuild ?? runContextBuild;
let result: ContextBuildResult;
try {
result = await contextBuild(
project,
{
projectDir: args.projectDir,
inputMode: 'disabled',
scanMode: 'enriched',
detectRelationships: true,
targetOperations: ['scan'],
targetConnectionIds: connectionIds,
},
io,
{
onSourceProgress: (sources) => {
lastSourceProgress = sources;
void writeKtxSetupContextState(args.projectDir, {
...runningState,
updatedAt: now().toISOString(),
sourceProgress: sources,
});
},
},
);
} catch (error) {
await writeKtxSetupContextState(args.projectDir, {
...runningState,
status: 'failed',
updatedAt: now().toISOString(),
retryableFailedTargets: connectionIds,
failureReason: error instanceof Error ? error.message : String(error),
...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}),
});
return 1;
}
const completedAt = now().toISOString();
await writeKtxSetupContextState(args.projectDir, {
...runningState,
status: result.exitCode === 0 ? 'paused' : 'failed',
updatedAt: completedAt,
reportIds: result.reportIds ?? [],
artifactPaths: result.artifactPaths ?? [],
retryableFailedTargets: result.exitCode === 0 ? [] : connectionIds,
...(result.exitCode === 0 ? {} : { failureReason: 'Primary source context scan failed.' }),
...(lastSourceProgress ? { sourceProgress: lastSourceProgress } : {}),
});
return result.exitCode;
}

View file

@ -584,13 +584,13 @@ describe('setup status', () => {
expect(projectPrompts.select).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Which KTX project should setup use?',
message: 'Where should KTX create the project?',
options: expect.arrayContaining([expect.objectContaining({ value: 'back', label: 'Back' })]),
}),
);
expect(projectPrompts.select).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Which KTX project should setup use?',
message: 'Where should KTX create the project?',
options: expect.not.arrayContaining([expect.objectContaining({ value: 'exit', label: 'Exit' })]),
}),
);
@ -920,7 +920,7 @@ describe('setup status', () => {
inputMode: 'disabled',
yes: false,
cliVersion: '0.2.0',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret
anthropicModel: 'claude-sonnet-4-6',
skipLlm: false,
skipEmbeddings: true,
@ -937,7 +937,7 @@ describe('setup status', () => {
expect.objectContaining({
projectDir: tempDir,
inputMode: 'disabled',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret
anthropicModel: 'claude-sonnet-4-6',
skipLlm: false,
}),
@ -961,11 +961,11 @@ describe('setup status', () => {
inputMode: 'disabled',
yes: true,
cliVersion: '0.2.0',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret
anthropicModel: 'claude-sonnet-4-6',
skipLlm: false,
embeddingBackend: 'openai',
embeddingApiKeyEnv: 'OPENAI_API_KEY',
embeddingApiKeyEnv: 'OPENAI_API_KEY', // pragma: allowlist secret
skipEmbeddings: false,
databaseSchemas: [],
skipDatabases: true,
@ -983,7 +983,7 @@ describe('setup status', () => {
cliVersion: '0.2.0',
runtimeInstallPolicy: 'auto',
embeddingBackend: 'openai',
embeddingApiKeyEnv: 'OPENAI_API_KEY',
embeddingApiKeyEnv: 'OPENAI_API_KEY', // pragma: allowlist secret
skipEmbeddings: false,
}),
testIo.io,
@ -1181,11 +1181,11 @@ describe('setup status', () => {
inputMode: 'disabled',
yes: false,
cliVersion: '0.2.0',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret
anthropicModel: 'claude-sonnet-4-6',
skipLlm: false,
embeddingBackend: 'openai',
embeddingApiKeyEnv: 'OPENAI_API_KEY',
embeddingApiKeyEnv: 'OPENAI_API_KEY', // pragma: allowlist secret
skipEmbeddings: false,
databaseDrivers: ['postgres'],
databaseConnectionId: 'warehouse',
@ -1261,6 +1261,68 @@ describe('setup status', () => {
expect(calls).toEqual(['model', 'embeddings', 'databases', 'sources']);
});
it('starts primary source context prefetch after database setup before context source setup', async () => {
const calls: string[] = [];
const io = makeIo();
await writeFile(join(tempDir, 'ktx.yaml'), ['project: revenue', 'connections: {}', ''].join('\n'), 'utf-8');
const primaryScanPrefetch = vi.fn(async () => {
calls.push('prefetch');
return { status: 'started' as const, projectDir: tempDir, runId: 'setup-context-prefetch-test' };
});
await expect(
runKtxSetup(
{
command: 'run',
projectDir: tempDir,
mode: 'existing',
agents: false,
inputMode: 'auto',
yes: true,
cliVersion: '0.2.0',
skipLlm: true,
skipEmbeddings: true,
skipDatabases: false,
skipSources: true,
skipAgents: true,
databaseSchemas: [],
},
io.io,
{
model: async () => {
calls.push('model');
return { status: 'skipped', projectDir: tempDir };
},
embeddings: async () => {
calls.push('embeddings');
return { status: 'skipped', projectDir: tempDir };
},
databases: async () => {
calls.push('databases');
return { status: 'ready', projectDir: tempDir, connectionIds: ['warehouse'] };
},
primaryScanPrefetch,
sources: async () => {
calls.push('sources');
return { status: 'skipped', projectDir: tempDir };
},
},
),
).resolves.toBe(0);
expect(primaryScanPrefetch).toHaveBeenCalledWith(
expect.objectContaining({
projectDir: tempDir,
inputMode: 'auto',
yes: true,
connectionIds: ['warehouse'],
}),
io.io,
);
expect(calls).toEqual(['model', 'embeddings', 'databases', 'prefetch', 'sources']);
});
it.each([
{
backend: 'vertex',
@ -2041,7 +2103,7 @@ describe('setup status', () => {
inputMode: 'disabled',
yes: false,
cliVersion: '0.2.0',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY',
anthropicApiKeyEnv: 'ANTHROPIC_API_KEY', // pragma: allowlist secret
anthropicModel: 'claude-sonnet-4-6',
skipLlm: false,
skipEmbeddings: false,

View file

@ -26,6 +26,13 @@ import {
} from './setup-databases.js';
import { type KtxSetupEmbeddingsDeps, runKtxSetupEmbeddingsStep } from './setup-embeddings.js';
import { type KtxSetupModelDeps, isKtxSetupLlmConfigReady, runKtxSetupAnthropicModelStep } from './setup-models.js';
import {
type KtxPrimaryScanPrefetchArgs,
type KtxPrimaryScanPrefetchResult,
type KtxPrimaryScanPrefetchDeps,
runPrimaryScanPrefetchWorker,
startPrimaryScanPrefetch,
} from './setup-primary-scan-prefetch.js';
import { type KtxSetupProjectDeps, runKtxSetupProjectStep } from './setup-project.js';
import {
isKtxPreAgentSetupReady,
@ -55,6 +62,12 @@ export interface KtxSetupStatus {
}
export type KtxSetupArgs =
| {
command: 'primary-scan-prefetch';
projectDir: string;
runId?: string;
connectionIds?: string[];
}
| {
command: 'run';
projectDir: string;
@ -137,6 +150,8 @@ export interface KtxSetupDeps {
io: KtxCliIo,
) => Promise<Awaited<ReturnType<typeof runKtxSetupAgentsStep>>>;
agentsDeps?: KtxSetupAgentsDeps;
primaryScanPrefetch?: (args: KtxPrimaryScanPrefetchArgs, io: KtxCliIo) => Promise<KtxPrimaryScanPrefetchResult>;
primaryScanPrefetchDeps?: KtxPrimaryScanPrefetchDeps;
context?: (args: Parameters<typeof runKtxSetupContextStep>[0], io: KtxCliIo) => Promise<KtxSetupContextResult>;
contextDeps?: KtxSetupContextDeps;
readyMenuDeps?: KtxSetupReadyMenuDeps;
@ -442,6 +457,10 @@ export async function runKtxSetup(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSet
}
async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetupDeps = {}): Promise<number> {
if (args.command === 'primary-scan-prefetch') {
return await runPrimaryScanPrefetchWorker(args, io, deps.primaryScanPrefetchDeps);
}
io.stdout.write('KTX setup\n');
let entryAction: KtxSetupEntryAction | undefined;
let projectResult: Awaited<ReturnType<typeof runKtxSetupProjectStep>>;
@ -549,6 +568,7 @@ async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetup
}
const forcePromptSteps = new Set<KtxSetupFlowStep>();
let autoWatchPrefetchAtContext = false;
const isNavigableSetupStep = (step: KtxSetupFlowStep): boolean => {
if (step === 'models') return !args.skipLlm && shouldRunModels;
if (step === 'embeddings') return !args.skipEmbeddings && shouldRunEmbeddings;
@ -677,9 +697,11 @@ async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetup
inputMode: args.inputMode,
forcePrompt: forcePromptSteps.has('context') || runOnly === 'context',
allowEmpty: true,
...(autoWatchPrefetchAtContext ? { autoWatch: true } : {}),
},
io,
);
autoWatchPrefetchAtContext = false;
} else {
const agentsRunner =
deps.agents ?? ((agentArgs, agentIo) => runKtxSetupAgentsStep(agentArgs, agentIo, deps.agentsDeps));
@ -725,6 +747,26 @@ async function runKtxSetupInner(args: KtxSetupArgs, io: KtxCliIo, deps: KtxSetup
return 0;
}
}
if (step === 'databases' && stepResult.status === 'ready' && shouldRunContext) {
const databaseResult = stepResult as Awaited<ReturnType<typeof runKtxSetupDatabasesStep>>;
const connectionIds = 'connectionIds' in databaseResult ? databaseResult.connectionIds : [];
const primaryScanPrefetch =
deps.primaryScanPrefetch ??
((prefetchArgs, prefetchIo) =>
startPrimaryScanPrefetch(prefetchArgs, prefetchIo, deps.primaryScanPrefetchDeps));
const prefetchResult = await primaryScanPrefetch(
{
projectDir: projectResult.projectDir,
inputMode: args.inputMode,
yes: args.yes,
connectionIds,
},
io,
);
if (prefetchResult.status === 'started' || prefetchResult.status === 'running') {
autoWatchPrefetchAtContext = true;
}
}
forcePromptSteps.delete(step);
stepIndex += 1;