diff --git a/packages/cli/src/context-build-view.test.ts b/packages/cli/src/context-build-view.test.ts index a2519d34..3e0c0049 100644 --- a/packages/cli/src/context-build-view.test.ts +++ b/packages/cli/src/context-build-view.test.ts @@ -267,10 +267,11 @@ describe('renderContextBuildView', () => { { connectionId: 'warehouse', driver: 'postgres', operation: 'scan', debugCommand: '', steps: ['scan'] }, ]); state.primarySources[0].status = 'failed'; + state.primarySources[0].failureText = 'KTX lost its connection to PostgreSQL while scanning warehouse.'; const output = renderContextBuildView(state, { styled: false }); expect(output).toContain('✗'); - expect(output).toContain('failed'); + expect(output).toContain('KTX lost its connection to PostgreSQL while scanning warehouse.'); }); it('omits empty groups', () => { @@ -384,6 +385,52 @@ describe('runContextBuild', () => { expect(result).toEqual({ exitCode: 1, detached: false }); }); + it('renders a friendly network failure when target output contains a network error code', async () => { + const io = makeIo(); + const project = projectWithConnections({ + warehouse: { driver: 'postgres' }, + }); + const executeTarget = vi.fn(async (target, _args, targetIo) => { + targetIo.stderr.write('Error: read EADDRNOTAVAIL\n'); + return failedResult(target.connectionId, target.driver, target.operation); + }); + + const result = await runContextBuild( + project, + { projectDir: '/tmp/project', inputMode: 'disabled' }, + io.io, + { executeTarget, now: () => 1000 }, + ); + + expect(result).toEqual({ exitCode: 1, detached: false }); + expect(io.stdout()).toContain('KTX lost its connection to PostgreSQL while scanning warehouse.'); + expect(io.stdout()).toContain('network address unavailable (EADDRNOTAVAIL)'); + expect(io.stdout()).toContain('Retry: ktx setup --project-dir /tmp/project'); + expect(io.stdout()).not.toContain('BoundPool'); + }); + + it('renders a friendly network failure when target execution throws', async () => { + const io = makeIo(); + const project = projectWithConnections({ + warehouse: { driver: 'postgres' }, + }); + const error = Object.assign(new Error('read ECONNRESET'), { code: 'ECONNRESET' }); + const executeTarget = vi.fn(async () => { + throw error; + }); + + const result = await runContextBuild( + project, + { projectDir: '/tmp/project', inputMode: 'disabled' }, + io.io, + { executeTarget, now: () => 1000 }, + ); + + expect(result).toEqual({ exitCode: 1, detached: false }); + expect(io.stdout()).toContain('KTX lost its connection to PostgreSQL while scanning warehouse.'); + expect(io.stdout()).toContain('connection reset (ECONNRESET)'); + }); + it('renders final view for non-TTY output', async () => { const io = makeIo(); const project = projectWithConnections({ diff --git a/packages/cli/src/context-build-view.ts b/packages/cli/src/context-build-view.ts index 0a8d8c04..96a5c173 100644 --- a/packages/cli/src/context-build-view.ts +++ b/packages/cli/src/context-build-view.ts @@ -22,6 +22,7 @@ export interface ContextBuildTargetState { status: 'queued' | 'running' | 'done' | 'failed'; detailLine: string | null; summaryText: string | null; + failureText: string | null; startedAt: number | null; elapsedMs: number; } @@ -133,7 +134,8 @@ function targetDetail(target: ContextBuildTargetState, styled: boolean): string return parts.join(' · '); } if (target.status === 'failed') { - return styled ? red('failed') : 'failed'; + const failureText = target.failureText ?? 'failed'; + return styled ? red(failureText) : failureText; } if (target.status === 'running') { const percent = extractPercent(target.detailLine); @@ -327,6 +329,7 @@ export function viewStateFromSourceProgress( status: s.status, detailLine: null, summaryText: s.summaryText ?? null, + failureText: null, startedAt: s.startedAtMs ?? null, elapsedMs: s.status === 'running' && s.startedAtMs ? now - s.startedAtMs : (s.elapsedMs ?? 0), }); @@ -441,7 +444,83 @@ export function defaultSetupKeystroke(onDetach: () => void, onCtrlC: () => void) // --- Orchestration --- function makeTargetState(target: KtxPublicIngestPlanTarget): ContextBuildTargetState { - return { target, status: 'queued', detailLine: null, summaryText: null, startedAt: null, elapsedMs: 0 }; + return { + target, + status: 'queued', + detailLine: null, + summaryText: null, + failureText: null, + startedAt: null, + elapsedMs: 0, + }; +} + +const NETWORK_ERROR_REASONS: Record = { + EADDRNOTAVAIL: 'network address unavailable', + ECONNRESET: 'connection reset', + ECONNREFUSED: 'connection refused', + ENETUNREACH: 'network unreachable', + ENOTFOUND: 'host not found', + ETIMEDOUT: 'connection timed out', + EHOSTUNREACH: 'host unreachable', +}; + +function unknownErrorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +function networkErrorCodeFromText(text: string): string | null { + for (const code of Object.keys(NETWORK_ERROR_REASONS)) { + if (new RegExp(`\\b${code}\\b`).test(text)) { + return code; + } + } + return null; +} + +function networkErrorCode(error: unknown, capturedOutput = ''): string | null { + const directCode = typeof (error as { code?: unknown })?.code === 'string' + ? (error as { code: string }).code + : null; + if (directCode && NETWORK_ERROR_REASONS[directCode]) { + return directCode; + } + return networkErrorCodeFromText(`${unknownErrorMessage(error)}\n${capturedOutput}`); +} + +function friendlyDriverName(driver: string): string { + const normalized = driver.toLowerCase(); + if (normalized === 'postgres' || normalized === 'postgresql') return 'PostgreSQL'; + if (normalized === 'mysql') return 'MySQL'; + if (normalized === 'sqlserver') return 'SQL Server'; + if (normalized === 'bigquery') return 'BigQuery'; + if (normalized === 'snowflake') return 'Snowflake'; + if (normalized === 'clickhouse') return 'ClickHouse'; + if (normalized === 'sqlite') return 'SQLite'; + return driver || 'the source'; +} + +function failedStepDetail(result: KtxPublicIngestTargetResult): string | null { + return result.steps.find((step) => step.status === 'failed')?.detail ?? null; +} + +function failureTextForTarget(input: { + target: KtxPublicIngestPlanTarget; + projectDir: string; + capturedOutput?: string; + error?: unknown; + fallback?: string | null; +}): string { + const code = networkErrorCode(input.error, input.capturedOutput); + if (code) { + const operation = input.target.operation === 'scan' ? 'scanning' : 'ingesting'; + return [ + `KTX lost its connection to ${friendlyDriverName(input.target.driver)} while ${operation} ${input.target.connectionId}.`, + `Reason: ${NETWORK_ERROR_REASONS[code]} (${code}).`, + `Retry: ${resumeCommand(input.projectDir)}`, + ].join(' '); + } + return input.fallback ?? `${input.target.connectionId} failed.`; } export function initViewState(targets: KtxPublicIngestPlanTarget[]): ContextBuildViewState { @@ -494,6 +573,7 @@ export async function runContextBuild( const artifactPaths = new Set(); let detached = false; + let exiting = false; let cleanupKeystroke: (() => void) | null = null; if (isTTY || deps.setupKeystroke) { @@ -503,6 +583,7 @@ export async function runContextBuild( }; cleanupKeystroke = (deps.setupKeystroke ?? defaultSetupKeystroke)( () => { + detached = true; cleanup(); deps.onDetach?.(); const bg = spawnBackgroundBuild(args.projectDir); @@ -510,12 +591,14 @@ export async function runContextBuild( if (bg) io.stdout.write(`Log: ${bg.logPath}\n`); io.stdout.write(`Resume: ${resumeCommand(args.projectDir)}\n`); io.stdout.write(`Status: ktx setup context status --project-dir ${resolve(args.projectDir)}\n`); + exiting = true; process.exit(0); }, () => { cleanup(); io.stdout.write('\n\nContext build stopped. Nothing is running in the background.\n'); io.stdout.write(`Resume: ${resumeCommand(args.projectDir)}\n`); + exiting = true; process.exit(130); }, ); @@ -549,14 +632,23 @@ export async function runContextBuild( false, ); - const result = await execTarget(targetState.target, runArgs, capture.io, {}); + let result: KtxPublicIngestTargetResult | null = null; + let thrownError: unknown = null; + try { + result = await execTarget(targetState.target, runArgs, capture.io, {}); + } catch (error) { + if (exiting) { + throw error; + } + thrownError = error; + } targetState.elapsedMs = nowFn() - (targetState.startedAt ?? nowFn()); - const failed = result.steps.some((s) => s.status === 'failed'); + const failed = thrownError !== null || result?.steps.some((s) => s.status === 'failed') === true; targetState.status = failed ? 'failed' : 'done'; targetState.detailLine = null; + const capturedOutput = capture.captured(); if (!failed) { - const capturedOutput = capture.captured(); const metadata = collectOutputMetadata(capturedOutput, targetState.target.operation); for (const reportId of metadata.reportIds) reportIds.add(reportId); for (const artifactPath of metadata.artifactPaths) artifactPaths.add(artifactPath); @@ -564,6 +656,14 @@ export async function runContextBuild( targetState.target.operation === 'scan' ? parseScanSummary(capturedOutput) : parseIngestSummary(capturedOutput); + } else { + targetState.failureText = failureTextForTarget({ + target: targetState.target, + projectDir: args.projectDir, + capturedOutput, + error: thrownError, + fallback: result ? failedStepDetail(result) : null, + }); } if (failed) hasFailure = true; diff --git a/packages/cli/src/setup-databases.test.ts b/packages/cli/src/setup-databases.test.ts index ea7717f4..b0fbe585 100644 --- a/packages/cli/src/setup-databases.test.ts +++ b/packages/cli/src/setup-databases.test.ts @@ -923,7 +923,13 @@ describe('setup databases step', () => { expect(io.stdout()).toContain( [ '◇ Scanning postgres-warehouse', - '│ ✓ Structural scan completed', + '│ Running structural scan…', + '│', + ].join('\n'), + ); + expect(io.stdout()).toContain( + [ + '◇ Scan complete for postgres-warehouse', '│ Changes: 2 new tables', '│ Report: raw-sources/postgres-warehouse/live-database/.../scan-report.json', '│', diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index 820dbd12..9c196e2a 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -1008,7 +1008,8 @@ async function maybeConfigureSchemaScope(input: { let selected: string[]; if (input.args.inputMode === 'disabled' || discovered.length === 1) { - selected = discovered; + const preconfigured = configuredScopeValues(connection, spec).filter((v) => discovered.includes(v)); + selected = preconfigured.length > 0 ? preconfigured : discovered; } else { const preconfigured = configuredScopeValues(connection, spec).filter((v) => discovered.includes(v)); const initialValues = preconfigured.length > 0 ? preconfigured : spec.defaultSelection(discovered); @@ -1161,6 +1162,9 @@ async function validateAndScanConnection(input: { io: input.io, deps: input.deps, }); + writeSetupSection(input.io, `Scanning ${input.connectionId}`, [ + 'Running structural scan…', + ]); const scanIo = createBufferedCommandIo(); const scanCode = await scanConnection(input.projectDir, input.connectionId, scanIo); if (scanCode !== 0) { @@ -1173,9 +1177,8 @@ async function validateAndScanConnection(input: { const reportPath = readOutputValue(scanOutput, 'Report'); writeSetupSection( input.io, - `Scanning ${input.connectionId}`, + `Scan complete for ${input.connectionId}`, [ - '✓ Structural scan completed', `Changes: ${summarizeScanChanges(scanOutput)}`, ...(reportPath ? [`Report: ${shortenScanReportPath(reportPath)}`] : []), ], diff --git a/packages/connector-postgres/src/connector.test.ts b/packages/connector-postgres/src/connector.test.ts index 3bdfc109..2d94430b 100644 --- a/packages/connector-postgres/src/connector.test.ts +++ b/packages/connector-postgres/src/connector.test.ts @@ -339,4 +339,38 @@ describe('KtxPostgresScanConnector', () => { expect(snapshot.tables.length).toBeGreaterThan(0); expect(endCalled).toBe(true); }); + + it('attaches an error listener to the pg pool', async () => { + const on = vi.fn(); + const poolFactory: KtxPostgresPoolFactory = { + createPool() { + return { + on, + async connect() { + return { + query: vi.fn(async () => ({ rows: [{ '?column?': 1 }], fields: [{ name: '?column?', dataTypeID: 23 }] })), + release: vi.fn(), + }; + }, + end: vi.fn(async () => undefined), + }; + }, + }; + const connector = new KtxPostgresScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'postgres', + host: 'db.example.test', + database: 'analytics', + username: 'reader', + password: 'test-password', // pragma: allowlist secret + readonly: true, + }, + poolFactory, + }); + + await expect(connector.testConnection()).resolves.toEqual({ success: true }); + + expect(on).toHaveBeenCalledWith('error', expect.any(Function)); + }); }); diff --git a/packages/connector-postgres/src/connector.ts b/packages/connector-postgres/src/connector.ts index a780663f..096b8704 100644 --- a/packages/connector-postgres/src/connector.ts +++ b/packages/connector-postgres/src/connector.ts @@ -89,6 +89,7 @@ interface KtxPostgresClient { interface KtxPostgresPool { connect(): Promise; end(): Promise; + on?(event: 'error', listener: (error: Error) => void): void; } export interface KtxPostgresPoolFactory { @@ -349,6 +350,7 @@ export class KtxPostgresScanConnector implements KtxScanConnector { private readonly now: () => Date; private readonly dialect = new KtxPostgresDialect(); private pool: KtxPostgresPool | null = null; + private lastIdlePoolError: Error | null = null; private resolvedEndpoint: KtxPostgresResolvedEndpoint | null = null; constructor(options: KtxPostgresScanConnectorOptions) { @@ -667,11 +669,15 @@ export class KtxPostgresScanConnector implements KtxScanConnector { config = { ...config, host: endpoint.host, port: endpoint.port }; } this.pool = this.poolFactory.createPool(config); + this.pool.on?.('error', (error) => { + this.lastIdlePoolError = error; + }); } return this.pool; } private async queryRaw(sql: string, params?: unknown[]): Promise { + this.throwIdlePoolErrorIfPresent(); const pool = await this.getPool(); const client = await pool.connect(); try { @@ -683,6 +689,7 @@ export class KtxPostgresScanConnector implements KtxScanConnector { } private async query(sql: string, params?: Record | unknown[]): Promise { + this.throwIdlePoolErrorIfPresent(); const pool = await this.getPool(); const client = await pool.connect(); try { @@ -704,4 +711,13 @@ export class KtxPostgresScanConnector implements KtxScanConnector { throw new Error(`PostgreSQL connector ${this.connectionId} cannot run scan for ${connectionId}`); } } + + private throwIdlePoolErrorIfPresent(): void { + if (!this.lastIdlePoolError) { + return; + } + const error = this.lastIdlePoolError; + this.lastIdlePoolError = null; + throw error; + } }