fix(ingest): polish foreground retry copy

This commit is contained in:
Andrey Avtomonov 2026-05-13 19:08:23 +02:00
parent 8bffd2829b
commit f54107ac70
4 changed files with 111 additions and 8 deletions

View file

@ -528,6 +528,57 @@ describe('runContextBuild', () => {
expect(io.stdout()).toContain('connection reset (ECONNRESET)');
});
it('uses direct ingest retry guidance for public ingest failures', async () => {
const io = makeIo();
const project = projectWithConnections({
warehouse: { driver: 'postgres' },
});
const executeTarget = vi.fn(async (target) => failedResult(target.connectionId, target.driver, target.operation));
await runContextBuild(
project,
{
projectDir: '/tmp/project',
inputMode: 'disabled',
targetConnectionId: 'warehouse',
all: false,
entrypoint: 'ingest',
},
io.io,
{ executeTarget, now: () => 1000 },
);
expect(io.stdout()).toContain('Retry: ktx ingest warehouse --project-dir /tmp/project');
expect(io.stdout()).not.toContain('Retry: ktx setup');
});
it('renders query-history progress without the historic-sql adapter key', async () => {
const io = makeIo();
const project = projectWithConnections({
warehouse: { driver: 'postgres', context: { queryHistory: { enabled: true } } },
});
const executeTarget = vi.fn(async (target, _args, _targetIo, deps) => {
deps.ingestProgress?.({ percent: 5, message: 'Fetching source files for warehouse/historic-sql' });
return successResult(target.connectionId, target.driver, target.operation);
});
await runContextBuild(
project,
{
projectDir: '/tmp/project',
inputMode: 'disabled',
targetConnectionId: 'warehouse',
all: false,
entrypoint: 'ingest',
},
io.io,
{ executeTarget, now: () => 1000, sourceProgressThrottleMs: 0 },
);
expect(io.stdout()).toContain('Fetching query history for warehouse');
expect(io.stdout()).not.toContain('historic-sql');
});
it('renders final view for non-TTY output', async () => {
const io = makeIo();
const project = projectWithConnections({

View file

@ -41,6 +41,7 @@ export interface ContextBuildArgs {
inputMode: 'auto' | 'disabled';
targetConnectionId?: string;
all?: boolean;
entrypoint?: 'setup' | 'ingest';
depth?: Extract<KtxPublicIngestArgs, { command: 'run' }>['depth'];
queryHistory?: Extract<KtxPublicIngestArgs, { command: 'run' }>['queryHistory'];
queryHistoryWindowDays?: number;
@ -197,8 +198,18 @@ function renderTargetGroup(
return ['', ` ${label}:`, ...targets.map((t) => renderTargetLine(t, frame, styled, width))];
}
function resumeCommand(projectDir?: string): string {
return projectDir ? `ktx setup --project-dir ${projectDir}` : 'ktx setup';
function retryCommand(input: {
projectDir?: string;
entrypoint?: 'setup' | 'ingest';
connectionId?: string;
depth?: 'fast' | 'deep';
}): string {
const projectPart = input.projectDir ? ` --project-dir ${input.projectDir}` : '';
if (input.entrypoint === 'ingest' && input.connectionId) {
const depthPart = input.depth ? ` --${input.depth}` : '';
return `ktx ingest ${input.connectionId}${projectPart}${depthPart}`;
}
return input.projectDir ? `ktx setup --project-dir ${input.projectDir}` : 'ktx setup';
}
export function renderContextBuildView(
@ -510,6 +521,7 @@ function failedStepDetail(result: KtxPublicIngestTargetResult): string | null {
function failureTextForTarget(input: {
target: KtxPublicIngestPlanTarget;
projectDir: string;
entrypoint?: 'setup' | 'ingest';
capturedOutput?: string;
error?: unknown;
fallback?: string | null;
@ -520,10 +532,24 @@ function failureTextForTarget(input: {
return [
`KTX lost its connection to ${friendlyDriverName(input.target.driver)} while ${operation} ${input.target.connectionId}.`,
`Reason: ${NETWORK_ERROR_REASONS[code]} (${code}).`,
`Retry: ${resumeCommand(input.projectDir)}`,
`Retry: ${retryCommand({
projectDir: input.projectDir,
entrypoint: input.entrypoint,
connectionId: input.target.connectionId,
depth: input.target.databaseDepth,
})}`,
].join(' ');
}
return input.fallback ?? `${input.target.connectionId} failed.`;
const fallback = input.fallback ?? `${input.target.connectionId} failed.`;
if (input.entrypoint === 'ingest') {
return `${fallback} Retry: ${retryCommand({
projectDir: input.projectDir,
entrypoint: input.entrypoint,
connectionId: input.target.connectionId,
depth: input.target.databaseDepth,
})}`;
}
return fallback;
}
export function initViewState(targets: KtxPublicIngestPlanTarget[]): ContextBuildViewState {
@ -536,9 +562,26 @@ export function initViewState(targets: KtxPublicIngestPlanTarget[]): ContextBuil
};
}
function formatProgressDetail(update: Pick<KtxIngestProgressUpdate, 'percent' | 'message'>): string {
function publicProgressMessage(message: string, target: KtxPublicIngestPlanTarget): string {
if (!target.steps.includes('query-history')) {
return message;
}
return message
.replace(
new RegExp(`Fetching source files for ${target.connectionId}/historic-sql`, 'i'),
`Fetching query history for ${target.connectionId}`,
)
.replace(`${target.connectionId}/historic-sql`, `${target.connectionId} query history`)
.replace(/\bhistoric-sql\b/g, 'query history')
.replace(/\bhistoric SQL\b/gi, 'query history');
}
function formatProgressDetail(
update: Pick<KtxIngestProgressUpdate, 'percent' | 'message'>,
target: KtxPublicIngestPlanTarget,
): string {
const percent = Math.max(0, Math.min(100, Math.round(update.percent)));
return `[${percent}%] ${update.message}`;
return `[${percent}%] ${publicProgressMessage(update.message, target)}`;
}
function createContextBuildProgressPort(
@ -649,16 +692,22 @@ export async function runContextBuild(
let hasPendingProgressPublish = false;
const updateTargetProgress = (update: KtxIngestProgressUpdate) => {
targetState.detailLine = formatProgressDetail(update);
targetState.detailLine = formatProgressDetail(update, targetState.target);
targetState.progressUpdatedAtMs = nowFn();
if (!repainter) {
io.stdout.write(`${targetState.detailLine}\n`);
}
paint(true);
hasPendingProgressPublish = !publishSourceProgress(false);
};
const capture = createCaptureIo(
(message) => {
targetState.detailLine = message;
targetState.detailLine = publicProgressMessage(message, targetState.target);
targetState.progressUpdatedAtMs = nowFn();
if (!repainter) {
io.stdout.write(`${targetState.detailLine}\n`);
}
paint(true);
hasPendingProgressPublish = !publishSourceProgress(false);
},
@ -698,6 +747,7 @@ export async function runContextBuild(
targetState.failureText = failureTextForTarget({
target: targetState.target,
projectDir: args.projectDir,
entrypoint: args.entrypoint,
capturedOutput,
error: thrownError,
fallback: result ? failedStepDetail(result) : null,

View file

@ -436,6 +436,7 @@ describe('runKtxPublicIngest', () => {
projectDir: '/tmp/project',
targetConnectionId: 'warehouse',
all: false,
entrypoint: 'ingest',
depth: 'fast',
queryHistory: 'default',
}),

View file

@ -630,6 +630,7 @@ export async function runKtxPublicIngest(
projectDir: args.projectDir,
...(args.targetConnectionId ? { targetConnectionId: args.targetConnectionId } : {}),
all: args.all,
entrypoint: 'ingest',
inputMode: args.inputMode,
...(args.depth ? { depth: args.depth } : {}),
...(args.queryHistory ? { queryHistory: args.queryHistory } : {}),