Handle Postgres network scan failures

This commit is contained in:
Luca Martial 2026-05-11 14:35:39 -07:00
parent 72a4ace13c
commit 8b342b760c
6 changed files with 216 additions and 10 deletions

View file

@ -267,10 +267,11 @@ describe('renderContextBuildView', () => {
{ connectionId: 'warehouse', driver: 'postgres', operation: 'scan', debugCommand: '', steps: ['scan'] },
]);
state.primarySources[0].status = 'failed';
state.primarySources[0].failureText = 'KTX lost its connection to PostgreSQL while scanning warehouse.';
const output = renderContextBuildView(state, { styled: false });
expect(output).toContain('✗');
expect(output).toContain('failed');
expect(output).toContain('KTX lost its connection to PostgreSQL while scanning warehouse.');
});
it('omits empty groups', () => {
@ -384,6 +385,52 @@ describe('runContextBuild', () => {
expect(result).toEqual({ exitCode: 1, detached: false });
});
it('renders a friendly network failure when target output contains a network error code', async () => {
const io = makeIo();
const project = projectWithConnections({
warehouse: { driver: 'postgres' },
});
const executeTarget = vi.fn(async (target, _args, targetIo) => {
targetIo.stderr.write('Error: read EADDRNOTAVAIL\n');
return failedResult(target.connectionId, target.driver, target.operation);
});
const result = await runContextBuild(
project,
{ projectDir: '/tmp/project', inputMode: 'disabled' },
io.io,
{ executeTarget, now: () => 1000 },
);
expect(result).toEqual({ exitCode: 1, detached: false });
expect(io.stdout()).toContain('KTX lost its connection to PostgreSQL while scanning warehouse.');
expect(io.stdout()).toContain('network address unavailable (EADDRNOTAVAIL)');
expect(io.stdout()).toContain('Retry: ktx setup --project-dir /tmp/project');
expect(io.stdout()).not.toContain('BoundPool');
});
it('renders a friendly network failure when target execution throws', async () => {
const io = makeIo();
const project = projectWithConnections({
warehouse: { driver: 'postgres' },
});
const error = Object.assign(new Error('read ECONNRESET'), { code: 'ECONNRESET' });
const executeTarget = vi.fn(async () => {
throw error;
});
const result = await runContextBuild(
project,
{ projectDir: '/tmp/project', inputMode: 'disabled' },
io.io,
{ executeTarget, now: () => 1000 },
);
expect(result).toEqual({ exitCode: 1, detached: false });
expect(io.stdout()).toContain('KTX lost its connection to PostgreSQL while scanning warehouse.');
expect(io.stdout()).toContain('connection reset (ECONNRESET)');
});
it('renders final view for non-TTY output', async () => {
const io = makeIo();
const project = projectWithConnections({

View file

@ -22,6 +22,7 @@ export interface ContextBuildTargetState {
status: 'queued' | 'running' | 'done' | 'failed';
detailLine: string | null;
summaryText: string | null;
failureText: string | null;
startedAt: number | null;
elapsedMs: number;
}
@ -133,7 +134,8 @@ function targetDetail(target: ContextBuildTargetState, styled: boolean): string
return parts.join(' · ');
}
if (target.status === 'failed') {
return styled ? red('failed') : 'failed';
const failureText = target.failureText ?? 'failed';
return styled ? red(failureText) : failureText;
}
if (target.status === 'running') {
const percent = extractPercent(target.detailLine);
@ -327,6 +329,7 @@ export function viewStateFromSourceProgress(
status: s.status,
detailLine: null,
summaryText: s.summaryText ?? null,
failureText: null,
startedAt: s.startedAtMs ?? null,
elapsedMs: s.status === 'running' && s.startedAtMs ? now - s.startedAtMs : (s.elapsedMs ?? 0),
});
@ -441,7 +444,83 @@ export function defaultSetupKeystroke(onDetach: () => void, onCtrlC: () => void)
// --- Orchestration ---
function makeTargetState(target: KtxPublicIngestPlanTarget): ContextBuildTargetState {
return { target, status: 'queued', detailLine: null, summaryText: null, startedAt: null, elapsedMs: 0 };
return {
target,
status: 'queued',
detailLine: null,
summaryText: null,
failureText: null,
startedAt: null,
elapsedMs: 0,
};
}
const NETWORK_ERROR_REASONS: Record<string, string> = {
EADDRNOTAVAIL: 'network address unavailable',
ECONNRESET: 'connection reset',
ECONNREFUSED: 'connection refused',
ENETUNREACH: 'network unreachable',
ENOTFOUND: 'host not found',
ETIMEDOUT: 'connection timed out',
EHOSTUNREACH: 'host unreachable',
};
function unknownErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function networkErrorCodeFromText(text: string): string | null {
for (const code of Object.keys(NETWORK_ERROR_REASONS)) {
if (new RegExp(`\\b${code}\\b`).test(text)) {
return code;
}
}
return null;
}
function networkErrorCode(error: unknown, capturedOutput = ''): string | null {
const directCode = typeof (error as { code?: unknown })?.code === 'string'
? (error as { code: string }).code
: null;
if (directCode && NETWORK_ERROR_REASONS[directCode]) {
return directCode;
}
return networkErrorCodeFromText(`${unknownErrorMessage(error)}\n${capturedOutput}`);
}
function friendlyDriverName(driver: string): string {
const normalized = driver.toLowerCase();
if (normalized === 'postgres' || normalized === 'postgresql') return 'PostgreSQL';
if (normalized === 'mysql') return 'MySQL';
if (normalized === 'sqlserver') return 'SQL Server';
if (normalized === 'bigquery') return 'BigQuery';
if (normalized === 'snowflake') return 'Snowflake';
if (normalized === 'clickhouse') return 'ClickHouse';
if (normalized === 'sqlite') return 'SQLite';
return driver || 'the source';
}
function failedStepDetail(result: KtxPublicIngestTargetResult): string | null {
return result.steps.find((step) => step.status === 'failed')?.detail ?? null;
}
function failureTextForTarget(input: {
target: KtxPublicIngestPlanTarget;
projectDir: string;
capturedOutput?: string;
error?: unknown;
fallback?: string | null;
}): string {
const code = networkErrorCode(input.error, input.capturedOutput);
if (code) {
const operation = input.target.operation === 'scan' ? 'scanning' : 'ingesting';
return [
`KTX lost its connection to ${friendlyDriverName(input.target.driver)} while ${operation} ${input.target.connectionId}.`,
`Reason: ${NETWORK_ERROR_REASONS[code]} (${code}).`,
`Retry: ${resumeCommand(input.projectDir)}`,
].join(' ');
}
return input.fallback ?? `${input.target.connectionId} failed.`;
}
export function initViewState(targets: KtxPublicIngestPlanTarget[]): ContextBuildViewState {
@ -494,6 +573,7 @@ export async function runContextBuild(
const artifactPaths = new Set<string>();
let detached = false;
let exiting = false;
let cleanupKeystroke: (() => void) | null = null;
if (isTTY || deps.setupKeystroke) {
@ -503,6 +583,7 @@ export async function runContextBuild(
};
cleanupKeystroke = (deps.setupKeystroke ?? defaultSetupKeystroke)(
() => {
detached = true;
cleanup();
deps.onDetach?.();
const bg = spawnBackgroundBuild(args.projectDir);
@ -510,12 +591,14 @@ export async function runContextBuild(
if (bg) io.stdout.write(`Log: ${bg.logPath}\n`);
io.stdout.write(`Resume: ${resumeCommand(args.projectDir)}\n`);
io.stdout.write(`Status: ktx setup context status --project-dir ${resolve(args.projectDir)}\n`);
exiting = true;
process.exit(0);
},
() => {
cleanup();
io.stdout.write('\n\nContext build stopped. Nothing is running in the background.\n');
io.stdout.write(`Resume: ${resumeCommand(args.projectDir)}\n`);
exiting = true;
process.exit(130);
},
);
@ -549,14 +632,23 @@ export async function runContextBuild(
false,
);
const result = await execTarget(targetState.target, runArgs, capture.io, {});
let result: KtxPublicIngestTargetResult | null = null;
let thrownError: unknown = null;
try {
result = await execTarget(targetState.target, runArgs, capture.io, {});
} catch (error) {
if (exiting) {
throw error;
}
thrownError = error;
}
targetState.elapsedMs = nowFn() - (targetState.startedAt ?? nowFn());
const failed = result.steps.some((s) => s.status === 'failed');
const failed = thrownError !== null || result?.steps.some((s) => s.status === 'failed') === true;
targetState.status = failed ? 'failed' : 'done';
targetState.detailLine = null;
const capturedOutput = capture.captured();
if (!failed) {
const capturedOutput = capture.captured();
const metadata = collectOutputMetadata(capturedOutput, targetState.target.operation);
for (const reportId of metadata.reportIds) reportIds.add(reportId);
for (const artifactPath of metadata.artifactPaths) artifactPaths.add(artifactPath);
@ -564,6 +656,14 @@ export async function runContextBuild(
targetState.target.operation === 'scan'
? parseScanSummary(capturedOutput)
: parseIngestSummary(capturedOutput);
} else {
targetState.failureText = failureTextForTarget({
target: targetState.target,
projectDir: args.projectDir,
capturedOutput,
error: thrownError,
fallback: result ? failedStepDetail(result) : null,
});
}
if (failed) hasFailure = true;

View file

@ -923,7 +923,13 @@ describe('setup databases step', () => {
expect(io.stdout()).toContain(
[
'◇ Scanning postgres-warehouse',
'│ ✓ Structural scan completed',
'│ Running structural scan…',
'│',
].join('\n'),
);
expect(io.stdout()).toContain(
[
'◇ Scan complete for postgres-warehouse',
'│ Changes: 2 new tables',
'│ Report: raw-sources/postgres-warehouse/live-database/.../scan-report.json',
'│',

View file

@ -1008,7 +1008,8 @@ async function maybeConfigureSchemaScope(input: {
let selected: string[];
if (input.args.inputMode === 'disabled' || discovered.length === 1) {
selected = discovered;
const preconfigured = configuredScopeValues(connection, spec).filter((v) => discovered.includes(v));
selected = preconfigured.length > 0 ? preconfigured : discovered;
} else {
const preconfigured = configuredScopeValues(connection, spec).filter((v) => discovered.includes(v));
const initialValues = preconfigured.length > 0 ? preconfigured : spec.defaultSelection(discovered);
@ -1161,6 +1162,9 @@ async function validateAndScanConnection(input: {
io: input.io,
deps: input.deps,
});
writeSetupSection(input.io, `Scanning ${input.connectionId}`, [
'Running structural scan…',
]);
const scanIo = createBufferedCommandIo();
const scanCode = await scanConnection(input.projectDir, input.connectionId, scanIo);
if (scanCode !== 0) {
@ -1173,9 +1177,8 @@ async function validateAndScanConnection(input: {
const reportPath = readOutputValue(scanOutput, 'Report');
writeSetupSection(
input.io,
`Scanning ${input.connectionId}`,
`Scan complete for ${input.connectionId}`,
[
'✓ Structural scan completed',
`Changes: ${summarizeScanChanges(scanOutput)}`,
...(reportPath ? [`Report: ${shortenScanReportPath(reportPath)}`] : []),
],

View file

@ -339,4 +339,38 @@ describe('KtxPostgresScanConnector', () => {
expect(snapshot.tables.length).toBeGreaterThan(0);
expect(endCalled).toBe(true);
});
it('attaches an error listener to the pg pool', async () => {
const on = vi.fn();
const poolFactory: KtxPostgresPoolFactory = {
createPool() {
return {
on,
async connect() {
return {
query: vi.fn(async () => ({ rows: [{ '?column?': 1 }], fields: [{ name: '?column?', dataTypeID: 23 }] })),
release: vi.fn(),
};
},
end: vi.fn(async () => undefined),
};
},
};
const connector = new KtxPostgresScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'postgres',
host: 'db.example.test',
database: 'analytics',
username: 'reader',
password: 'test-password', // pragma: allowlist secret
readonly: true,
},
poolFactory,
});
await expect(connector.testConnection()).resolves.toEqual({ success: true });
expect(on).toHaveBeenCalledWith('error', expect.any(Function));
});
});

View file

@ -89,6 +89,7 @@ interface KtxPostgresClient {
interface KtxPostgresPool {
connect(): Promise<KtxPostgresClient>;
end(): Promise<void>;
on?(event: 'error', listener: (error: Error) => void): void;
}
export interface KtxPostgresPoolFactory {
@ -349,6 +350,7 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
private readonly now: () => Date;
private readonly dialect = new KtxPostgresDialect();
private pool: KtxPostgresPool | null = null;
private lastIdlePoolError: Error | null = null;
private resolvedEndpoint: KtxPostgresResolvedEndpoint | null = null;
constructor(options: KtxPostgresScanConnectorOptions) {
@ -667,11 +669,15 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
config = { ...config, host: endpoint.host, port: endpoint.port };
}
this.pool = this.poolFactory.createPool(config);
this.pool.on?.('error', (error) => {
this.lastIdlePoolError = error;
});
}
return this.pool;
}
private async queryRaw<T>(sql: string, params?: unknown[]): Promise<T[]> {
this.throwIdlePoolErrorIfPresent();
const pool = await this.getPool();
const client = await pool.connect();
try {
@ -683,6 +689,7 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
}
private async query(sql: string, params?: Record<string, unknown> | unknown[]): Promise<KtxQueryResult> {
this.throwIdlePoolErrorIfPresent();
const pool = await this.getPool();
const client = await pool.connect();
try {
@ -704,4 +711,13 @@ export class KtxPostgresScanConnector implements KtxScanConnector {
throw new Error(`PostgreSQL connector ${this.connectionId} cannot run scan for ${connectionId}`);
}
}
private throwIdlePoolErrorIfPresent(): void {
if (!this.lastIdlePoolError) {
return;
}
const error = this.lastIdlePoolError;
this.lastIdlePoolError = null;
throw error;
}
}