feat(cli): consistent connection setup recovery and build-time gate (#257)

* feat(cli): block context build when a required connection fails its live test

A context build can take several minutes, so a connection that is
unreachable or misconfigured should stop the build up front instead of
failing partway through. Before the build starts, run a live connection
test for every primary- and context-source connection the build depends
on.

Each test's output is captured in a discarded buffer so raw error text
(and database paths) never reach the user; failures are surfaced only by
connection id and connector type, with a pointer to `ktx connection test
<id>` for the underlying error.

- Interactive setup lets the user fix the connection and retry without
  restarting, re-resolving targets so an added/removed/reconfigured
  connection is honored.
- `--no-input` exits non-zero and writes a failed context state with a
  failureReason, so scripts stop early and setup never reads as ready.

Extract the buffered command IO helper out of setup-databases into
src/io/buffered-command-io.ts so both call sites share one implementation.

* feat(cli): use recovery primitive for database setup

* feat(cli): use recovery primitive for source setup

* docs: document setup connection recovery

* fix(cli): close database recovery gaps

* fix(cli): target failing project in gate hint and preserve missing-input

Address two review findings on the connection-recovery work:

- The connection-gate failure hint emitted `ktx connection test <id>` with no
  --project-dir, so a setup run started with `--project-dir ./analytics` pointed
  users at cwd/KTX_PROJECT_DIR instead of the project that just failed. Emit the
  resolved project dir, matching the contextBuildCommands convention.

- The non-interactive database configure path returned `cancelled`, which the
  recovery primitive collapses to `failed`. Sibling paths still report
  `missing-input` for absent flags, so incomplete-flag runs were
  indistinguishable from real connection failures. The database wrapper now
  tracks the configure missing-input signal and restores the `missing-input`
  step status; the shared primitive keeps its four outcomes.
This commit is contained in:
Andrey Avtomonov 2026-06-03 13:08:46 +02:00 committed by GitHub
parent f5dea9a089
commit ce1516b357
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 1531 additions and 354 deletions

View file

@ -104,6 +104,6 @@ configured connection and exit non-zero if any probe fails.
| Error | Cause | Recovery | | Error | Cause | Recovery |
|-------|-------|----------| |-------|-------|----------|
| No connections configured | The project has no entries under `connections` | Run `ktx setup` and add a database or context-source connection | | No connections configured | The project has no entries under `connections` | Run `ktx setup` and add a database or context-source connection |
| Connection test fails | Credentials, network access, database, warehouse, or schema is invalid | Verify the same URL with the database's native client, then rerun `ktx setup` and reconfigure the connection | | Connection test fails | Credentials, network access, database, warehouse, or schema is invalid | Use the setup recovery menu to retry or re-enter details; if it still fails, verify the same URL with the database's native client |
| Mapping validation fails during setup | BI database mappings do not point at valid warehouse connections | Rerun `ktx setup` and update the context-source mapping selections | | Mapping validation fails during setup | BI database mappings do not point at valid warehouse connections | Use the setup recovery menu to retry validation or re-enter mapping selections; rerun `ktx setup` if you already exited |
| Notion page picker cannot run | The terminal is non-interactive or Notion discovery failed | Rerun interactive `ktx setup`, or use non-interactive setup flags with explicit root page ids | | Notion page picker cannot run | The terminal is non-interactive or Notion discovery failed | Rerun interactive `ktx setup`, or use non-interactive setup flags with explicit root page ids |

View file

@ -295,6 +295,26 @@ Context sources:
dbt_main: memory update complete dbt_main: memory update complete
``` ```
Before the build starts, **ktx** runs a live test for every connection the
build depends on. A context build can take several minutes, so if any required
connection is unreachable or misconfigured the build is blocked up front and
**ktx** names the failing connection by id and connector type:
```text
KTX cannot build context: a required connection failed its live test.
Failed connections:
warehouse (postgres)
Each connection must be reachable before KTX builds context.
Run `ktx connection test <id>` to see the error, fix the connection, then retry.
```
Run `ktx connection test <connection-id>` to see the underlying error, fix the
connection, then continue. In interactive setup you can retry without
restarting; with `--no-input` the build exits non-zero and names the failing
connection so scripts can stop early.
## Connect a coding agent ## Connect a coding agent
The setup wizard installs project-local agent rules in the last step. To The setup wizard installs project-local agent rules in the last step. To
@ -354,7 +374,8 @@ surface.
| `ktx: command not found` | Reinstall `@kaelio/ktx` and open a new shell | | `ktx: command not found` | Reinstall `@kaelio/ktx` and open a new shell |
| Setup resumes the wrong project | Pass `--project-dir <path>` | | Setup resumes the wrong project | Pass `--project-dir <path>` |
| LLM or embeddings health check fails | Rerun setup and pick a different credential, model, or backend | | LLM or embeddings health check fails | Rerun setup and pick a different credential, model, or backend |
| Database test fails | Verify the same connection with the database's native client, then rerun setup | | Database test fails | Use the setup recovery menu to retry or re-enter details; if it still fails, verify the same connection with the database's native client |
| Context build blocked: a connection failed its live test | Run `ktx connection test <connection-id>` to see the error, fix the connection, then retry the build |
| Agent integration is incomplete | Run `ktx setup --agents --target <target>` | | Agent integration is incomplete | Run `ktx setup --agents --target <target>` |
## Next steps ## Next steps

View file

@ -0,0 +1,132 @@
import type { KtxCliIo } from './cli-runtime.js';
import type { KtxSetupPromptOption } from './setup-prompts.js';
export type RecoveryOutcome = 'ready' | 'skip' | 'back' | 'failed';
/** @internal */
export interface RecoveryAction {
value: string;
label: string;
run: () => Promise<void>;
}
export type ConfigureResult = 'configured' | 'back' | 'cancelled';
export type ValidateResult =
| { status: 'ok' }
| { status: 'back' }
| { status: 'failed'; extraActions?: RecoveryAction[] };
export interface ConnectionRecoveryInput {
label: string;
interactive: boolean;
allowSkip: boolean;
io: KtxCliIo;
prompts: {
select(options: { message: string; options: KtxSetupPromptOption[] }): Promise<string>;
};
snapshot: () => Promise<() => Promise<void>>;
configure: () => Promise<ConfigureResult>;
validate: () => Promise<ValidateResult>;
}
async function runRollbackOnce(input: {
rollback: () => Promise<void>;
state: { rolledBack: boolean };
}): Promise<void> {
if (input.state.rolledBack) {
return;
}
input.state.rolledBack = true;
await input.rollback();
}
function recoveryOptions(input: {
allowSkip: boolean;
extraActions?: RecoveryAction[];
}): KtxSetupPromptOption[] {
return [
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
...(input.extraActions ?? []).map((action) => ({
value: action.value,
label: action.label,
})),
...(input.allowSkip ? [{ value: 'skip', label: 'Skip this connection' }] : []),
{ value: 'back', label: 'Back' },
];
}
export async function runConnectionSetupWithRecovery(
input: ConnectionRecoveryInput,
): Promise<RecoveryOutcome> {
const rollback = await input.snapshot();
const rollbackState = { rolledBack: false };
const firstConfig = await input.configure();
if (firstConfig === 'back') {
await runRollbackOnce({ rollback, state: rollbackState });
return 'back';
}
if (firstConfig === 'cancelled') {
await runRollbackOnce({ rollback, state: rollbackState });
return 'failed';
}
let validation = await input.validate();
while (validation.status !== 'ok') {
if (validation.status === 'back') {
await runRollbackOnce({ rollback, state: rollbackState });
return 'back';
}
if (!input.interactive) {
return 'failed';
}
const action = await input.prompts.select({
message: `Connection setup failed for ${input.label}`,
options: recoveryOptions({
allowSkip: input.allowSkip,
extraActions: validation.extraActions,
}),
});
if (action === 'back') {
await runRollbackOnce({ rollback, state: rollbackState });
return 'back';
}
if (action === 'skip' && input.allowSkip) {
await runRollbackOnce({ rollback, state: rollbackState });
return 'skip';
}
if (action === 're-enter') {
const nextConfig = await input.configure();
if (nextConfig === 'back') {
await runRollbackOnce({ rollback, state: rollbackState });
return 'back';
}
if (nextConfig === 'cancelled') {
await runRollbackOnce({ rollback, state: rollbackState });
return 'failed';
}
validation = await input.validate();
continue;
}
if (action === 'retry') {
validation = await input.validate();
continue;
}
const extraAction = validation.extraActions?.find((candidate) => candidate.value === action);
if (extraAction) {
await extraAction.run();
validation = await input.validate();
continue;
}
validation = await input.validate();
}
return 'ready';
}

View file

@ -0,0 +1,35 @@
import type { KtxCliIo } from '../cli-runtime.js';
export interface BufferedCommandIo extends KtxCliIo {
stdoutText(): string;
stderrText(): string;
}
/**
* Captures stdout/stderr from a command (e.g. `runKtxConnection`) into buffers
* instead of the terminal. Callers decide whether to flush the captured text to
* the user or discard it.
*/
export function createBufferedCommandIo(): BufferedCommandIo {
let stdout = '';
let stderr = '';
return {
stdout: {
isTTY: false,
write(chunk: string) {
stdout += chunk;
},
},
stderr: {
write(chunk: string) {
stderr += chunk;
},
},
stdoutText() {
return stdout;
},
stderrText() {
return stderr;
},
};
}

View file

@ -8,6 +8,8 @@ import type { KtxCliIo } from './cli-runtime.js';
import { errorMessage, writePrefixedLines } from './clack.js'; import { errorMessage, writePrefixedLines } from './clack.js';
import { formatErrorDetail } from './telemetry/scrubber.js'; import { formatErrorDetail } from './telemetry/scrubber.js';
import { buildPublicIngestPlan } from './public-ingest.js'; import { buildPublicIngestPlan } from './public-ingest.js';
import { runKtxConnection } from './connection.js';
import { type BufferedCommandIo, createBufferedCommandIo } from './io/buffered-command-io.js';
import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js'; import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js';
import { import {
type ContextBuildSourceProgressUpdate, type ContextBuildSourceProgressUpdate,
@ -91,6 +93,7 @@ export interface KtxSetupContextDeps {
now?: () => Date; now?: () => Date;
runContextBuild?: typeof runContextBuild; runContextBuild?: typeof runContextBuild;
verifyContextReady?: (projectDir: string) => Promise<KtxSetupContextReadiness>; verifyContextReady?: (projectDir: string) => Promise<KtxSetupContextReadiness>;
testConnection?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise<number>;
} }
interface KtxSetupContextTargets { interface KtxSetupContextTargets {
@ -277,6 +280,140 @@ function listContextTargets(project: KtxLocalProject): KtxSetupContextTargets {
}; };
} }
interface ConnectionGateFailure {
connectionId: string;
driver: string;
}
type ConnectionGateResult = { ok: true } | { ok: false; failures: ConnectionGateFailure[] };
type PreparedBuild =
| { kind: 'ready'; project: KtxLocalProject; targets: KtxSetupContextTargets }
| { kind: 'result'; result: KtxSetupContextResult };
function requiredConnectionIds(targets: KtxSetupContextTargets): string[] {
return [...targets.primarySourceConnectionIds, ...targets.contextSourceConnectionIds];
}
function connectorTypeLabel(project: KtxLocalProject, connectionId: string): string {
const driver = String(project.config.connections[connectionId]?.driver ?? '')
.trim()
.toLowerCase();
return driver.length > 0 ? driver : 'unknown';
}
async function defaultGateTestConnection(
projectDir: string,
connectionId: string,
io: KtxCliIo,
): Promise<number> {
return await runKtxConnection({ command: 'test', projectDir, connectionId }, io);
}
/**
* Runs a live connection test for every connection the build depends on. Each
* test's output is captured in a buffer and discarded so raw error text never
* reaches the user callers surface only the connection id and connector type.
*/
async function testRequiredConnections(
projectDir: string,
project: KtxLocalProject,
targets: KtxSetupContextTargets,
testConnection: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise<number>,
): Promise<ConnectionGateResult> {
const failures: ConnectionGateFailure[] = [];
for (const connectionId of requiredConnectionIds(targets)) {
const buffered: BufferedCommandIo = createBufferedCommandIo();
const exitCode = await testConnection(projectDir, connectionId, buffered);
if (exitCode !== 0) {
failures.push({ connectionId, driver: connectorTypeLabel(project, connectionId) });
}
}
return failures.length === 0 ? { ok: true } : { ok: false, failures };
}
/**
* Loads the project and resolves the connections the build depends on, applying
* the empty-targets and preflight-capability checks. Used both on first entry
* and on interactive retry so a fix that adds, removes, or reconfigures a
* connection is honored.
*/
async function prepareBuildTargets(args: KtxSetupContextStepArgs, io: KtxCliIo): Promise<PreparedBuild> {
const project = await loadKtxProject({ projectDir: args.projectDir });
const targets = listContextTargets(project);
if (targets.primarySourceConnectionIds.length === 0 && targets.contextSourceConnectionIds.length === 0) {
if (args.allowEmpty === true) {
return { kind: 'result', result: { status: 'skipped', projectDir: args.projectDir } };
}
io.stderr.write('No databases or context sources are configured for a KTX context build.\n');
return { kind: 'result', result: { status: 'failed', projectDir: args.projectDir } };
}
const preflightPlan = buildPublicIngestPlan(project, { projectDir: project.projectDir, all: true });
const preflightFailures = preflightPlan.targets.flatMap((target) =>
target.preflightFailure ? [`${target.connectionId}: ${target.preflightFailure}`] : [],
);
if (preflightFailures.length > 0) {
if (args.allowEmpty === true) {
return { kind: 'result', result: { status: 'skipped', projectDir: args.projectDir } };
}
writeMissingCapabilities(preflightFailures, io);
return { kind: 'result', result: { status: 'missing-input', projectDir: args.projectDir } };
}
return { kind: 'ready', project, targets };
}
function writeConnectionGateFailureLines(
io: KtxCliIo,
projectDir: string,
failures: ConnectionGateFailure[],
): void {
io.stderr.write('KTX cannot build context: a required connection failed its live test.\n\n');
io.stderr.write('Failed connections:\n');
for (const failure of failures) {
io.stderr.write(` ${failure.connectionId} (${failure.driver})\n`);
}
io.stderr.write('\nEach connection must be reachable before KTX builds context.\n');
io.stderr.write(
`Run \`ktx connection test <id> --project-dir ${resolve(projectDir)}\` to see the error, fix the connection, then retry.\n`,
);
}
function connectionGateFailureReason(failures: ConnectionGateFailure[]): string {
const names = failures.map((failure) => `${failure.connectionId} (${failure.driver})`).join(', ');
return `Required connections failed their live test: ${names}.`;
}
async function writeConnectionGateFailedState(
args: KtxSetupContextStepArgs,
deps: KtxSetupContextDeps,
targets: KtxSetupContextTargets,
failures: ConnectionGateFailure[],
): Promise<void> {
const at = (deps.now ?? (() => new Date()))().toISOString();
await writeKtxSetupContextState(args.projectDir, {
status: 'failed',
startedAt: at,
updatedAt: at,
primarySourceConnectionIds: targets.primarySourceConnectionIds,
contextSourceConnectionIds: targets.contextSourceConnectionIds,
reportIds: [],
artifactPaths: [],
retryableFailedTargets: [],
commands: contextBuildCommands(args.projectDir),
failureReason: connectionGateFailureReason(failures),
});
}
async function promptConnectionGateRetry(prompts: KtxSetupContextPromptAdapter): Promise<'retry' | 'back'> {
return (await prompts.select({
message: 'Fix the failing connection, then choose how to proceed.',
options: [
{ value: 'retry', label: 'Retry connection tests' },
{ value: 'back', label: 'Back' },
],
})) as 'retry' | 'back';
}
async function hasFileWithExtension( async function hasFileWithExtension(
root: string, root: string,
extensions: Set<string>, extensions: Set<string>,
@ -641,7 +778,6 @@ export async function runKtxSetupContextStep(
deps: KtxSetupContextDeps = {}, deps: KtxSetupContextDeps = {},
): Promise<KtxSetupContextResult> { ): Promise<KtxSetupContextResult> {
try { try {
const project = await loadKtxProject({ projectDir: args.projectDir });
const prompts = deps.prompts ?? createPromptAdapter(); const prompts = deps.prompts ?? createPromptAdapter();
const existingState = await readKtxSetupContextState(args.projectDir); const existingState = await readKtxSetupContextState(args.projectDir);
const completedSteps = (await readKtxSetupState(args.projectDir)).completed_steps; const completedSteps = (await readKtxSetupState(args.projectDir)).completed_steps;
@ -659,26 +795,12 @@ export async function runKtxSetupContextStep(
io.stdout.write('Previous context build state is stale; starting a fresh foreground build.\n'); io.stdout.write('Previous context build state is stale; starting a fresh foreground build.\n');
} }
const targets = listContextTargets(project); const prepared = await prepareBuildTargets(args, io);
if (targets.primarySourceConnectionIds.length === 0 && targets.contextSourceConnectionIds.length === 0) { if (prepared.kind === 'result') {
if (args.allowEmpty === true) { return prepared.result;
return { status: 'skipped', projectDir: args.projectDir };
}
io.stderr.write('No databases or context sources are configured for a KTX context build.\n');
return { status: 'failed', projectDir: args.projectDir };
}
const preflightPlan = buildPublicIngestPlan(project, { projectDir: project.projectDir, all: true });
const preflightFailures = preflightPlan.targets.flatMap((target) =>
target.preflightFailure ? [`${target.connectionId}: ${target.preflightFailure}`] : [],
);
if (preflightFailures.length > 0) {
if (args.allowEmpty === true) {
return { status: 'skipped', projectDir: args.projectDir };
}
writeMissingCapabilities(preflightFailures, io);
return { status: 'missing-input', projectDir: args.projectDir };
} }
let { project, targets } = prepared;
const interactive = args.inputMode !== 'disabled' && args.prompt !== false;
if (args.forcePrompt !== true && args.prompt !== false && deps.verifyContextReady === undefined) { if (args.forcePrompt !== true && args.prompt !== false && deps.verifyContextReady === undefined) {
const existingContextResult = await completeExistingContext(args, io, deps, targets); const existingContextResult = await completeExistingContext(args, io, deps, targets);
@ -687,7 +809,7 @@ export async function runKtxSetupContextStep(
} }
} }
if (args.inputMode !== 'disabled' && args.prompt !== false) { if (interactive) {
const choice = await promptForBuild(prompts); const choice = await promptForBuild(prompts);
if (choice === 'back') { if (choice === 'back') {
return { status: 'back', projectDir: args.projectDir }; return { status: 'back', projectDir: args.projectDir };
@ -698,7 +820,32 @@ export async function runKtxSetupContextStep(
} }
} }
return await runBuild(args, io, deps, project, targets); // Live-connection gate: every connection the build depends on must pass a
// live test before the (expensive) build starts. A red connection is a hard
// stop — we surface only the connection id and connector type, never raw
// error text.
const testConnection = deps.testConnection ?? defaultGateTestConnection;
while (true) {
const gate = await testRequiredConnections(args.projectDir, project, targets, testConnection);
if (gate.ok) {
return await runBuild(args, io, deps, project, targets);
}
writeConnectionGateFailureLines(io, args.projectDir, gate.failures);
if (!interactive) {
await writeConnectionGateFailedState(args, deps, targets, gate.failures);
return { status: 'failed', projectDir: args.projectDir };
}
const choice = await promptConnectionGateRetry(prompts);
if (choice === 'back') {
return { status: 'back', projectDir: args.projectDir };
}
const reprepared = await prepareBuildTargets(args, io);
if (reprepared.kind === 'result') {
return reprepared.result;
}
project = reprepared.project;
targets = reprepared.targets;
}
} catch (error) { } catch (error) {
writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error)); writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error));
return { status: 'failed', projectDir: args.projectDir, errorDetail: formatErrorDetail(error) }; return { status: 'failed', projectDir: args.projectDir, errorDetail: formatErrorDetail(error) };

View file

@ -22,6 +22,13 @@ import {
writePrefixedLines, writePrefixedLines,
} from './clack.js'; } from './clack.js';
import { runKtxConnection } from './connection.js'; import { runKtxConnection } from './connection.js';
import { createBufferedCommandIo } from './io/buffered-command-io.js';
import {
runConnectionSetupWithRecovery,
type ConfigureResult,
type RecoveryOutcome,
type ValidateResult,
} from './connection-recovery.js';
import { import {
pickDatabaseScope as defaultPickDatabaseScope, pickDatabaseScope as defaultPickDatabaseScope,
type DatabaseScopePickResult, type DatabaseScopePickResult,
@ -227,7 +234,6 @@ const SCOPE_DISCOVERY_SPECS: Partial<Record<KtxSetupDatabaseDriver, ScopeDiscove
}; };
type UrlDriverType = Extract<KtxSetupDatabaseDriver, 'postgres' | 'mysql' | 'clickhouse' | 'sqlserver'>; type UrlDriverType = Extract<KtxSetupDatabaseDriver, 'postgres' | 'mysql' | 'clickhouse' | 'sqlserver'>;
type ConnectionSetupStatus = 'ready' | 'back' | 'failed' | 'failed-query-history-unavailable';
const DRIVER_CONNECTION_DEFAULTS: Record<UrlDriverType, { port: string }> = { const DRIVER_CONNECTION_DEFAULTS: Record<UrlDriverType, { port: string }> = {
postgres: { port: '5432' }, postgres: { port: '5432' },
@ -994,35 +1000,6 @@ async function defaultScanConnection(projectDir: string, connectionId: string, i
); );
} }
interface BufferedCommandIo extends KtxCliIo {
stdoutText(): string;
stderrText(): string;
}
function createBufferedCommandIo(): BufferedCommandIo {
let stdout = '';
let stderr = '';
return {
stdout: {
isTTY: false,
write(chunk: string) {
stdout += chunk;
},
},
stderr: {
write(chunk: string) {
stderr += chunk;
},
},
stdoutText() {
return stdout;
},
stderrText() {
return stderr;
},
};
}
function envWithCurrentNodeFirst(env: NodeJS.ProcessEnv = process.env): NodeJS.ProcessEnv { function envWithCurrentNodeFirst(env: NodeJS.ProcessEnv = process.env): NodeJS.ProcessEnv {
return { return {
...env, ...env,
@ -1203,6 +1180,31 @@ async function disableConnectionQueryHistory(projectDir: string, connectionId: s
}); });
} }
function okValidateResult(): ValidateResult {
return { status: 'ok' };
}
function backValidateResult(): ValidateResult {
return { status: 'back' };
}
function failedValidateResult(): ValidateResult {
return { status: 'failed' };
}
function queryHistoryUnavailableResult(projectDir: string, connectionId: string): ValidateResult {
return {
status: 'failed',
extraActions: [
{
value: 'disable-query-history',
label: 'Disable query history and retry',
run: () => disableConnectionQueryHistory(projectDir, connectionId),
},
],
};
}
async function createConnectionConfigRollback(projectDir: string, connectionId: string): Promise<() => Promise<void>> { async function createConnectionConfigRollback(projectDir: string, connectionId: string): Promise<() => Promise<void>> {
const project = await loadKtxProject({ projectDir }); const project = await loadKtxProject({ projectDir });
const previousConnection = project.config.connections[connectionId]; const previousConnection = project.config.connections[connectionId];
@ -1330,11 +1332,11 @@ async function maybeConfigureDatabaseScope(input: {
io: KtxCliIo; io: KtxCliIo;
prompts: KtxSetupDatabasesPromptAdapter; prompts: KtxSetupDatabasesPromptAdapter;
forcePrompt?: boolean; forcePrompt?: boolean;
}): Promise<ConnectionSetupStatus> { }): Promise<ValidateResult> {
const project = await loadKtxProject({ projectDir: input.projectDir }); const project = await loadKtxProject({ projectDir: input.projectDir });
const connection = project.config.connections[input.connectionId]; const connection = project.config.connections[input.connectionId];
const driver = normalizeDriver(connection?.driver); const driver = normalizeDriver(connection?.driver);
if (!driver || driver === 'sqlite') return 'ready'; if (!driver || driver === 'sqlite') return okValidateResult();
const spec = SCOPE_DISCOVERY_SPECS[driver]; const spec = SCOPE_DISCOVERY_SPECS[driver];
const existingTables = connection?.enabled_tables; const existingTables = connection?.enabled_tables;
@ -1343,7 +1345,7 @@ async function maybeConfigureDatabaseScope(input: {
const hasExistingScope = !spec || existingScope.length > 0; const hasExistingScope = !spec || existingScope.length > 0;
if (hasExistingTables && hasExistingScope && input.forcePrompt !== true) { if (hasExistingTables && hasExistingScope && input.forcePrompt !== true) {
return 'ready'; return okValidateResult();
} }
const cliSchemas = input.args.databaseSchemas; const cliSchemas = input.args.databaseSchemas;
@ -1361,7 +1363,7 @@ async function maybeConfigureDatabaseScope(input: {
input.io.stderr.write( input.io.stderr.write(
`Could not discover ${spec.promptLabel.toLowerCase()} for ${input.connectionId}; ${detail}\n`, `Could not discover ${spec.promptLabel.toLowerCase()} for ${input.connectionId}; ${detail}\n`,
); );
return 'ready'; return okValidateResult();
} }
} }
if (scopeToWrite.length > 0) { if (scopeToWrite.length > 0) {
@ -1377,7 +1379,7 @@ async function maybeConfigureDatabaseScope(input: {
]); ]);
} }
} }
return 'ready'; return okValidateResult();
} }
if (spec && cliSchemas.length > 0) { if (spec && cliSchemas.length > 0) {
@ -1413,7 +1415,7 @@ async function maybeConfigureDatabaseScope(input: {
connectionId: input.connectionId, connectionId: input.connectionId,
spec, spec,
}); });
if (typed === undefined) return 'back'; if (typed === undefined) return backValidateResult();
effectiveCliSchemas = typed; effectiveCliSchemas = typed;
listedSchemas = typed; listedSchemas = typed;
if (typed.length > 0) { if (typed.length > 0) {
@ -1428,7 +1430,7 @@ async function maybeConfigureDatabaseScope(input: {
} }
const schemas = unique(listedSchemas); const schemas = unique(listedSchemas);
if (spec && schemas.length === 0) { if (spec && schemas.length === 0) {
return 'ready'; return okValidateResult();
} }
const schemaSuggestion = const schemaSuggestion =
effectiveCliSchemas.length > 0 effectiveCliSchemas.length > 0
@ -1465,10 +1467,10 @@ async function maybeConfigureDatabaseScope(input: {
? `Could not discover tables for ${input.connectionId}; edit was not saved. ${detail}` ? `Could not discover tables for ${input.connectionId}; edit was not saved. ${detail}`
: `Could not discover tables for ${input.connectionId}; continuing without table filter. ${detail}`, : `Could not discover tables for ${input.connectionId}; continuing without table filter. ${detail}`,
); );
return input.forcePrompt === true ? 'failed' : 'ready'; return input.forcePrompt === true ? failedValidateResult() : okValidateResult();
} }
if (pickResult.kind === 'back') { if (pickResult.kind === 'back') {
return 'back'; return backValidateResult();
} }
const enabledTables = pickResult.enabledTables; const enabledTables = pickResult.enabledTables;
const activeSchemas = pickResult.activeSchemas; const activeSchemas = pickResult.activeSchemas;
@ -1483,7 +1485,7 @@ async function maybeConfigureDatabaseScope(input: {
} }
const refreshedProject = await loadKtxProject({ projectDir: input.projectDir }); const refreshedProject = await loadKtxProject({ projectDir: input.projectDir });
const currentConnection = refreshedProject.config.connections[input.connectionId]; const currentConnection = refreshedProject.config.connections[input.connectionId];
if (!currentConnection) return 'ready'; if (!currentConnection) return okValidateResult();
await writeConnectionConfig({ await writeConnectionConfig({
projectDir: input.projectDir, projectDir: input.projectDir,
connectionId: input.connectionId, connectionId: input.connectionId,
@ -1500,7 +1502,7 @@ async function maybeConfigureDatabaseScope(input: {
writeSetupSection(input.io, `Tables enabled for ${input.connectionId}`, [ writeSetupSection(input.io, `Tables enabled for ${input.connectionId}`, [
`${enabledTables.length} tables enabled`, `${enabledTables.length} tables enabled`,
]); ]);
return 'ready'; return okValidateResult();
} }
async function ensureHistoricSqlIngestDefaults(projectDir: string): Promise<void> { async function ensureHistoricSqlIngestDefaults(projectDir: string): Promise<void> {
@ -1628,7 +1630,7 @@ async function validateAndScanConnection(input: {
args: KtxSetupDatabasesArgs; args: KtxSetupDatabasesArgs;
prompts: KtxSetupDatabasesPromptAdapter; prompts: KtxSetupDatabasesPromptAdapter;
forceScopeAndTables?: boolean; forceScopeAndTables?: boolean;
}): Promise<ConnectionSetupStatus> { }): Promise<ValidateResult> {
const testConnection = input.deps.testConnection ?? defaultTestConnection; const testConnection = input.deps.testConnection ?? defaultTestConnection;
const scanConnection = input.deps.scanConnection ?? defaultScanConnection; const scanConnection = input.deps.scanConnection ?? defaultScanConnection;
const project = await loadKtxProject({ projectDir: input.projectDir }); const project = await loadKtxProject({ projectDir: input.projectDir });
@ -1642,7 +1644,7 @@ async function validateAndScanConnection(input: {
(chunk) => input.io.stderr.write(chunk), (chunk) => input.io.stderr.write(chunk),
`Connection test failed for ${input.connectionId}.`, `Connection test failed for ${input.connectionId}.`,
); );
return 'failed'; return failedValidateResult();
} }
const testOutput = testIo.stdoutText(); const testOutput = testIo.stdoutText();
const outputDriver = normalizeDriver(readOutputValue(testOutput, 'Driver')); const outputDriver = normalizeDriver(readOutputValue(testOutput, 'Driver'));
@ -1651,7 +1653,7 @@ async function validateAndScanConnection(input: {
writeSetupSection(input.io, `Testing ${input.connectionId}`, testLines); writeSetupSection(input.io, `Testing ${input.connectionId}`, testLines);
const scopeStatus = await maybeConfigureDatabaseScope({ ...input, forcePrompt: input.forceScopeAndTables }); const scopeStatus = await maybeConfigureDatabaseScope({ ...input, forcePrompt: input.forceScopeAndTables });
if (scopeStatus !== 'ready') { if (scopeStatus.status !== 'ok') {
return scopeStatus; return scopeStatus;
} }
@ -1712,7 +1714,9 @@ async function validateAndScanConnection(input: {
); );
} }
if (scanCode !== 0) { if (scanCode !== 0) {
return queryHistoryAvailable ? 'failed' : 'failed-query-history-unavailable'; return queryHistoryAvailable
? failedValidateResult()
: queryHistoryUnavailableResult(input.projectDir, input.connectionId);
} }
} }
const scanOutput = scanIo.stdoutText(); const scanOutput = scanIo.stdoutText();
@ -1724,7 +1728,7 @@ async function validateAndScanConnection(input: {
writeSetupSection(input.io, 'Database ready', [ writeSetupSection(input.io, 'Database ready', [
`${input.connectionId} · ${driverDisplay} · schema context complete`, `${input.connectionId} · ${driverDisplay} · schema context complete`,
]); ]);
return 'ready'; return okValidateResult();
} }
async function chooseDrivers( async function chooseDrivers(
@ -1847,6 +1851,149 @@ async function choosePrimarySourceToEdit(input: {
return choice === 'back' ? 'back' : choice; return choice === 'back' ? 'back' : choice;
} }
async function configureDatabaseConnection(input: {
projectDir: string;
connectionId: string;
driver: KtxSetupDatabaseDriver;
args: KtxSetupDatabasesArgs;
prompts: KtxSetupDatabasesPromptAdapter;
io: KtxCliIo;
canReturnToDriverSelection: boolean;
editBaseline?: KtxProjectConnectionConfig;
}): Promise<ConfigureResult> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const latestConnection = project.config.connections[input.connectionId];
let connection = await buildConnectionConfig({
driver: input.driver,
connectionId: input.connectionId,
args: input.args,
prompts: input.prompts,
existingConnection: latestConnection,
});
while (!connection && input.args.inputMode !== 'disabled') {
const action = await input.prompts.select(
missingConnectionDetailsPrompt(driverLabel(input.driver), input.canReturnToDriverSelection),
);
if (action === 'back') {
return 'back';
}
connection = await buildConnectionConfig({
driver: input.driver,
connectionId: input.connectionId,
args: input.args,
prompts: input.prompts,
existingConnection: latestConnection,
});
}
if (connection === 'back') {
return 'back';
}
if (!connection) {
input.io.stderr.write(`Missing connection details for ${driverLabel(input.driver)}.\n`);
return 'cancelled';
}
const withHistoricSql = await maybeApplyHistoricSqlConfig({
connection,
driver: input.driver,
args: input.args,
prompts: input.prompts,
});
if (withHistoricSql === 'back') {
return 'back';
}
await writeConnectionConfig({
projectDir: input.projectDir,
connectionId: input.connectionId,
connection: input.editBaseline
? withExistingPrimaryEditPromptDefaults({
previous: input.editBaseline,
next: withHistoricSql,
driver: input.driver,
})
: withHistoricSql,
io: input.io,
});
return 'configured';
}
async function runDatabaseConnectionSetupWithRecovery(input: {
projectDir: string;
connectionId: string;
driver: KtxSetupDatabaseDriver;
args: KtxSetupDatabasesArgs;
prompts: KtxSetupDatabasesPromptAdapter;
io: KtxCliIo;
deps: KtxSetupDatabasesDeps;
canReturnToDriverSelection: boolean;
allowSkip: boolean;
interactive?: boolean;
forceScopeAndTables?: boolean;
editBaseline?: KtxProjectConnectionConfig;
reuseExistingOnFirstConfigure?: boolean;
}): Promise<RecoveryOutcome | 'missing-input'> {
let configureCalls = 0;
// `configureDatabaseConnection` returns 'cancelled' only when required
// connection details are absent in non-interactive mode. The recovery
// primitive collapses that into 'failed', so we track it here to restore the
// distinct 'missing-input' outcome the surrounding step reports for
// incomplete flags (vs. a real connection/probe failure).
let sawMissingInput = false;
const outcome = await runConnectionSetupWithRecovery({
label: input.connectionId,
interactive: input.interactive ?? input.args.inputMode !== 'disabled',
allowSkip: input.allowSkip,
io: input.io,
prompts: input.prompts,
snapshot: () => createConnectionConfigRollback(input.projectDir, input.connectionId),
configure: async () => {
configureCalls += 1;
if (input.reuseExistingOnFirstConfigure && configureCalls === 1) {
const historicSqlResult = await applyHistoricSqlConfigToExistingConnection({
projectDir: input.projectDir,
connectionId: input.connectionId,
args: input.args,
prompts: input.prompts,
});
return historicSqlResult === 'back' ? 'back' : 'configured';
}
const configured = await configureDatabaseConnection({
projectDir: input.projectDir,
connectionId: input.connectionId,
driver: input.driver,
args: input.args,
prompts: input.prompts,
io: input.io,
canReturnToDriverSelection: input.canReturnToDriverSelection,
editBaseline: input.editBaseline,
});
if (configured === 'cancelled') {
sawMissingInput = true;
}
return configured;
},
validate: () =>
validateAndScanConnection({
projectDir: input.projectDir,
connectionId: input.connectionId,
io: input.io,
deps: input.deps,
args: input.args,
prompts: input.prompts,
forceScopeAndTables: input.forceScopeAndTables,
}),
});
if (outcome === 'failed' && sawMissingInput) {
return 'missing-input';
}
return outcome;
}
async function runPrimarySourceFullEdit(input: { async function runPrimarySourceFullEdit(input: {
projectDir: string; projectDir: string;
connectionId: string; connectionId: string;
@ -1854,7 +2001,7 @@ async function runPrimarySourceFullEdit(input: {
prompts: KtxSetupDatabasesPromptAdapter; prompts: KtxSetupDatabasesPromptAdapter;
io: KtxCliIo; io: KtxCliIo;
deps: KtxSetupDatabasesDeps; deps: KtxSetupDatabasesDeps;
}): Promise<'ready' | 'back' | 'failed'> { }): Promise<'ready' | 'back' | 'failed' | 'missing-input'> {
const project = await loadKtxProject({ projectDir: input.projectDir }); const project = await loadKtxProject({ projectDir: input.projectDir });
const existing = project.config.connections[input.connectionId]; const existing = project.config.connections[input.connectionId];
const driver = normalizeDriver(existing?.driver); const driver = normalizeDriver(existing?.driver);
@ -1866,59 +2013,21 @@ async function runPrimarySourceFullEdit(input: {
return 'failed'; return 'failed';
} }
const rollback = await createConnectionConfigRollback(input.projectDir, input.connectionId); const outcome = await runDatabaseConnectionSetupWithRecovery({
const replacement = await buildConnectionConfig({ projectDir: input.projectDir,
driver,
connectionId: input.connectionId, connectionId: input.connectionId,
args: input.args,
prompts: input.prompts,
existingConnection: existing,
});
if (replacement === 'back') {
await rollback();
return 'back';
}
if (!replacement) {
await rollback();
return 'failed';
}
const withHistoricSql = await maybeApplyHistoricSqlConfig({
connection: replacement,
driver, driver,
args: input.args, args: input.args,
prompts: input.prompts, prompts: input.prompts,
});
if (withHistoricSql === 'back') {
await rollback();
return 'back';
}
await writeConnectionConfig({
projectDir: input.projectDir,
connectionId: input.connectionId,
connection: withExistingPrimaryEditPromptDefaults({
previous: existing,
next: withHistoricSql,
driver,
}),
io: input.io,
});
const validated = await validateAndScanConnection({
projectDir: input.projectDir,
connectionId: input.connectionId,
io: input.io, io: input.io,
deps: input.deps, deps: input.deps,
args: input.args, canReturnToDriverSelection: true,
prompts: input.prompts, allowSkip: false,
forceScopeAndTables: true, forceScopeAndTables: true,
editBaseline: existing,
}); });
if (validated !== 'ready') {
await rollback(); return outcome === 'skip' ? 'back' : outcome;
return validated === 'failed-query-history-unavailable' ? 'failed' : validated;
}
return 'ready';
} }
export async function runKtxSetupDatabasesStep( export async function runKtxSetupDatabasesStep(
@ -1936,28 +2045,37 @@ export async function runKtxSetupDatabasesStep(
if (args.databaseConnectionIds && args.databaseConnectionIds.length > 0) { if (args.databaseConnectionIds && args.databaseConnectionIds.length > 0) {
const selectedConnectionIds: string[] = []; const selectedConnectionIds: string[] = [];
for (const connectionId of unique(args.databaseConnectionIds)) { for (const connectionId of unique(args.databaseConnectionIds)) {
const historicSqlResult = await applyHistoricSqlConfigToExistingConnection({ const project = await loadKtxProject({ projectDir: args.projectDir });
projectDir: args.projectDir, const driver = normalizeDriver(project.config.connections[connectionId]?.driver);
connectionId, if (!driver) {
args, writePrefixedLines((chunk) => io.stderr.write(chunk), `Connection "${connectionId}" is not configured.`);
prompts,
});
if (historicSqlResult === 'back') return { status: 'back', projectDir: args.projectDir };
const setupStatus = await validateAndScanConnection({
projectDir: args.projectDir,
connectionId,
io,
deps,
args,
prompts,
});
if (setupStatus === 'back') {
return { status: 'back', projectDir: args.projectDir };
}
if (setupStatus === 'failed') {
return { status: 'failed', projectDir: args.projectDir }; return { status: 'failed', projectDir: args.projectDir };
} }
selectedConnectionIds.push(connectionId); const setupOutcome = await runDatabaseConnectionSetupWithRecovery({
projectDir: args.projectDir,
connectionId,
driver,
args,
prompts,
io,
deps,
canReturnToDriverSelection: false,
allowSkip: false,
interactive: false,
reuseExistingOnFirstConfigure: true,
});
if (setupOutcome === 'back') {
return { status: 'back', projectDir: args.projectDir };
}
if (setupOutcome === 'missing-input') {
return { status: 'missing-input', projectDir: args.projectDir };
}
if (setupOutcome === 'failed') {
return { status: 'failed', projectDir: args.projectDir };
}
if (setupOutcome === 'ready') {
selectedConnectionIds.push(connectionId);
}
} }
await markDatabasesComplete(args.projectDir, selectedConnectionIds); await markDatabasesComplete(args.projectDir, selectedConnectionIds);
return { status: 'ready', projectDir: args.projectDir, connectionIds: selectedConnectionIds }; return { status: 'ready', projectDir: args.projectDir, connectionIds: selectedConnectionIds };
@ -2009,6 +2127,9 @@ export async function runKtxSetupDatabasesStep(
showConfiguredPrimaryMenu = true; showConfiguredPrimaryMenu = true;
continue; continue;
} }
if (editResult === 'missing-input') {
return { status: 'missing-input', projectDir: args.projectDir };
}
if (editResult === 'failed') { if (editResult === 'failed') {
return { status: 'failed', projectDir: args.projectDir }; return { status: 'failed', projectDir: args.projectDir };
} }
@ -2064,7 +2185,6 @@ export async function runKtxSetupDatabasesStep(
return { status: 'missing-input', projectDir: args.projectDir }; return { status: 'missing-input', projectDir: args.projectDir };
} }
let connectionAlreadyValidated = false;
if (connectionChoice.kind === 'edit') { if (connectionChoice.kind === 'edit') {
const editResult = await runPrimarySourceFullEdit({ const editResult = await runPrimarySourceFullEdit({
projectDir: args.projectDir, projectDir: args.projectDir,
@ -2079,176 +2199,41 @@ export async function runKtxSetupDatabasesStep(
returnToDriverSelection = true; returnToDriverSelection = true;
break; break;
} }
if (editResult === 'missing-input') {
return { status: 'missing-input', projectDir: args.projectDir };
}
if (editResult === 'failed') { if (editResult === 'failed') {
return { status: 'failed', projectDir: args.projectDir }; return { status: 'failed', projectDir: args.projectDir };
} }
connectionAlreadyValidated = true; } else {
} else if (connectionChoice.kind === 'new') { const setupOutcome = await runDatabaseConnectionSetupWithRecovery({
let connection = await buildConnectionConfig({ projectDir: args.projectDir,
driver,
connectionId: connectionChoice.connectionId, connectionId: connectionChoice.connectionId,
driver,
args, args,
prompts, prompts,
io,
deps,
canReturnToDriverSelection,
allowSkip: true,
reuseExistingOnFirstConfigure: connectionChoice.kind === 'existing',
}); });
if (connection === 'back') { if (setupOutcome === 'back') {
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir }; if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
returnToDriverSelection = true; returnToDriverSelection = true;
break; break;
} }
while (!connection && args.inputMode !== 'disabled') { if (setupOutcome === 'missing-input') {
const label = driverLabel(driver);
const action = await prompts.select(missingConnectionDetailsPrompt(label, canReturnToDriverSelection));
if (action === 'back') {
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
returnToDriverSelection = true;
break;
}
connection = await buildConnectionConfig({
driver,
connectionId: connectionChoice.connectionId,
args,
prompts,
});
if (connection === 'back') {
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
returnToDriverSelection = true;
break;
}
}
if (returnToDriverSelection) {
break;
}
if (connection === 'back') {
break;
}
if (!connection) {
io.stderr.write(`Missing connection details for ${driverLabel(driver)}.\n`);
return { status: 'missing-input', projectDir: args.projectDir }; return { status: 'missing-input', projectDir: args.projectDir };
} }
const withHistoricSql = await maybeApplyHistoricSqlConfig({ connection, driver, args, prompts }); if (setupOutcome === 'failed') {
if (withHistoricSql === 'back') { return { status: 'failed', projectDir: args.projectDir };
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
returnToDriverSelection = true;
break;
} }
await writeConnectionConfig({ if (setupOutcome === 'skip') {
projectDir: args.projectDir, continue;
connectionId: connectionChoice.connectionId,
connection: withHistoricSql,
io,
});
} else {
const existing = project.config.connections[connectionChoice.connectionId];
const withHistoricSql = await maybeApplyHistoricSqlConfig({ connection: existing, driver, args, prompts });
if (withHistoricSql === 'back') {
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
returnToDriverSelection = true;
break;
}
await writeConnectionConfig({
projectDir: args.projectDir,
connectionId: connectionChoice.connectionId,
connection: withHistoricSql,
io,
});
}
let connectionSkipped = false;
let setupStatus: ConnectionSetupStatus = connectionAlreadyValidated
? 'ready'
: await validateAndScanConnection({
projectDir: args.projectDir,
connectionId: connectionChoice.connectionId,
io,
deps,
args,
prompts,
});
while (!connectionAlreadyValidated && setupStatus !== 'ready') {
if (setupStatus === 'back') {
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
returnToDriverSelection = true;
break;
}
if (args.inputMode === 'disabled') return { status: 'failed', projectDir: args.projectDir };
const failureOptions = [
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
...(setupStatus === 'failed-query-history-unavailable'
? [{ value: 'disable-query-history', label: 'Disable query history and retry' }]
: []),
{ value: 'skip', label: 'Skip this database' },
{ value: 'back', label: 'Back' },
];
const action = await prompts.select({
message: `Database setup failed for ${connectionChoice.connectionId}`,
options: failureOptions,
});
if (action === 'back') {
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
returnToDriverSelection = true;
break;
}
if (action === 'skip') {
connectionSkipped = true;
break;
}
if (action === 'retry') {
setupStatus = await validateAndScanConnection({
projectDir: args.projectDir,
connectionId: connectionChoice.connectionId,
io,
deps,
args,
prompts,
});
} else if (action === 'disable-query-history') {
await disableConnectionQueryHistory(args.projectDir, connectionChoice.connectionId);
setupStatus = await validateAndScanConnection({
projectDir: args.projectDir,
connectionId: connectionChoice.connectionId,
io,
deps,
args,
prompts,
});
} else if (action === 're-enter') {
const connection = await buildConnectionConfig({
driver,
connectionId: connectionChoice.connectionId,
args,
prompts,
});
if (connection === 'back') {
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
returnToDriverSelection = true;
break;
}
if (!connection) continue;
const withHistoricSql = await maybeApplyHistoricSqlConfig({ connection, driver, args, prompts });
if (withHistoricSql === 'back') {
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
returnToDriverSelection = true;
break;
}
await writeConnectionConfig({
projectDir: args.projectDir,
connectionId: connectionChoice.connectionId,
connection: withHistoricSql,
io,
});
setupStatus = await validateAndScanConnection({
projectDir: args.projectDir,
connectionId: connectionChoice.connectionId,
io,
deps,
args,
prompts,
});
} }
} }
if (returnToDriverSelection) break; if (returnToDriverSelection) break;
if (connectionSkipped) continue;
pushUniqueConnectionId(selectedConnectionIds, connectionChoice.connectionId); pushUniqueConnectionId(selectedConnectionIds, connectionChoice.connectionId);
} }

View file

@ -20,6 +20,12 @@ import type { KtxCliIo } from './cli-runtime.js';
import { errorMessage, writePrefixedLines } from './clack.js'; import { errorMessage, writePrefixedLines } from './clack.js';
import { pickNotionRootPages } from './notion-page-picker.js'; import { pickNotionRootPages } from './notion-page-picker.js';
import { runKtxSourceMapping } from './source-mapping.js'; import { runKtxSourceMapping } from './source-mapping.js';
import {
runConnectionSetupWithRecovery,
type ConfigureResult,
type RecoveryOutcome,
type ValidateResult,
} from './connection-recovery.js';
import { withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js'; import { withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js';
import { runKtxPublicIngest } from './public-ingest.js'; import { runKtxPublicIngest } from './public-ingest.js';
import { writeProjectLocalSecretReference } from './setup-secrets.js'; import { writeProjectLocalSecretReference } from './setup-secrets.js';
@ -866,8 +872,7 @@ type InteractiveSourceConnectionChoice =
type SourceSetupChoiceResult = type SourceSetupChoiceResult =
| { status: 'ready'; connectionId: string } | { status: 'ready'; connectionId: string }
| { status: 'back' } | { status: Exclude<RecoveryOutcome, 'ready'> };
| { status: 'failed' };
async function runSourcePromptSteps( async function runSourcePromptSteps(
initialState: SourcePromptState, initialState: SourcePromptState,
@ -1758,6 +1763,58 @@ async function validateSource(
return await (deps.validateNotion ?? defaultValidateNotion)(args.connection); return await (deps.validateNotion ?? defaultValidateNotion)(args.connection);
} }
async function createSourceSetupRollback(projectDir: string): Promise<() => Promise<void>> {
const project = await loadKtxProject({ projectDir });
const previousConfig = project.config;
const configPath = project.configPath;
return async () => {
await writeFile(configPath, serializeKtxProjectConfig(previousConfig), 'utf-8');
};
}
function sourceConnectionId(input: {
source: KtxSetupSourceType;
sourceChoice: Exclude<InteractiveSourceConnectionChoice, 'back'>;
}): string {
return input.sourceChoice.kind === 'existing' || input.sourceChoice.kind === 'edited'
? input.sourceChoice.connectionId
: (input.sourceChoice.args.sourceConnectionId ?? `${input.source}-main`);
}
async function validateSourceConnectionAndMapping(input: {
args: KtxSetupSourcesArgs;
source: KtxSetupSourceType;
connectionId: string;
connection: KtxProjectConnectionConfig;
prompts: KtxSetupSourcesPromptAdapter;
io: KtxCliIo;
deps: KtxSetupSourcesDeps;
}): Promise<ValidateResult> {
const validation = await validateSource(
input.source,
{ projectDir: input.args.projectDir, connectionId: input.connectionId, connection: input.connection },
input.deps,
);
if (!validation.ok) {
input.io.stderr.write(`${validation.message}\n`);
return { status: 'failed' };
}
if (input.source === 'metabase' || input.source === 'looker') {
input.prompts.log?.(`Validating ${sourceLabel(input.source)} mapping...`);
const mappingCode = await (input.deps.runMapping ?? defaultRunMapping)(
input.args.projectDir,
input.connectionId,
createSetupPrefixedIo(input.io),
);
if (mappingCode !== 0) {
return { status: 'failed' };
}
}
return { status: 'ok' };
}
async function saveValidateAndMaybeBuildSource(input: { async function saveValidateAndMaybeBuildSource(input: {
args: KtxSetupSourcesArgs; args: KtxSetupSourcesArgs;
source: KtxSetupSourceType; source: KtxSetupSourceType;
@ -1766,76 +1823,121 @@ async function saveValidateAndMaybeBuildSource(input: {
io: KtxCliIo; io: KtxCliIo;
deps: KtxSetupSourcesDeps; deps: KtxSetupSourcesDeps;
}): Promise<SourceSetupChoiceResult> { }): Promise<SourceSetupChoiceResult> {
const connectionId = let latestChoice = input.sourceChoice;
input.sourceChoice.kind === 'existing' let latestConnectionId = sourceConnectionId({ source: input.source, sourceChoice: latestChoice });
? input.sourceChoice.connectionId let latestConnection =
: input.sourceChoice.kind === 'edited' latestChoice.kind === 'existing'
? input.sourceChoice.connectionId ? latestChoice.connection
: (input.sourceChoice.args.sourceConnectionId ?? `${input.source}-main`); : buildConnection(input.source, latestChoice.args);
const connection = let configureCount = 0;
input.sourceChoice.kind === 'existing' let rollbackAfterConfigure: (() => Promise<void>) | undefined;
? input.sourceChoice.connection
: buildConnection(input.source, input.sourceChoice.args);
const rollback =
input.sourceChoice.kind === 'existing'
? undefined
: await writeSourceConnection(
input.args.projectDir,
connectionId,
connection,
sourceAdapter(input.source),
input.io,
);
if (input.sourceChoice.kind === 'existing') { const outcome = await runConnectionSetupWithRecovery({
await ensureSourceAdapterEnabled(input.args.projectDir, input.source); label: latestConnectionId,
} interactive: input.args.inputMode !== 'disabled',
allowSkip: true,
io: input.io,
prompts: input.prompts,
snapshot: async () => {
rollbackAfterConfigure = await createSourceSetupRollback(input.args.projectDir);
return rollbackAfterConfigure;
},
configure: async (): Promise<ConfigureResult> => {
configureCount += 1;
if (latestChoice.kind === 'existing' && configureCount === 1) {
await ensureSourceAdapterEnabled(input.args.projectDir, input.source);
return 'configured';
}
const validation = await validateSource( const project = await loadKtxProject({ projectDir: input.args.projectDir });
input.source, const currentConnection = project.config.connections[latestConnectionId] ?? latestConnection;
{ projectDir: input.args.projectDir, connectionId, connection }, const useAlreadyPromptedArgs = configureCount === 1 && latestChoice.kind !== 'existing';
input.deps, const sourceArgs =
); useAlreadyPromptedArgs && latestChoice.kind !== 'existing'
if (!validation.ok) { ? latestChoice.args
await rollback?.(); : input.args.inputMode === 'disabled'
input.io.stderr.write(`${validation.message}\n`); ? sourceArgsFromExistingConnection({
return { status: 'failed' }; args: input.args,
} source: input.source,
connectionId: latestConnectionId,
connection: currentConnection,
})
: await promptForInteractiveSource(
sourceArgsFromExistingConnection({
args: input.args,
source: input.source,
connectionId: latestConnectionId,
connection: currentConnection,
}),
input.source,
input.prompts,
input.io,
{
pickNotionRootPages: input.deps.pickNotionRootPages,
discoverMetabaseDatabases: input.deps.discoverMetabaseDatabases,
},
latestConnectionId,
input.deps.testGitRepo,
input.deps.discoverMetabaseDatabases,
);
if (input.source === 'metabase' || input.source === 'looker') { if (sourceArgs === 'back') {
input.prompts.log?.(`Validating ${sourceLabel(input.source)} mapping…`); return 'back';
const mappingCode = await (input.deps.runMapping ?? defaultRunMapping)( }
input.args.projectDir,
connectionId, latestConnectionId = sourceArgs.sourceConnectionId ?? latestConnectionId;
createSetupPrefixedIo(input.io), latestConnection = buildConnection(input.source, sourceArgs);
); latestChoice =
if (mappingCode !== 0) { latestChoice.kind === 'new'
await rollback?.(); ? { kind: 'new', args: sourceArgs }
return { status: 'failed' }; : { kind: 'edited', connectionId: latestConnectionId, args: sourceArgs };
}
await writeSourceConnection(
input.args.projectDir,
latestConnectionId,
latestConnection,
sourceAdapter(input.source),
input.io,
);
return 'configured';
},
validate: () =>
validateSourceConnectionAndMapping({
args: input.args,
source: input.source,
connectionId: latestConnectionId,
connection: latestConnection,
prompts: input.prompts,
io: input.io,
deps: input.deps,
}),
});
if (outcome !== 'ready') {
return { status: outcome };
} }
if (input.args.runInitialSourceIngest) { if (input.args.runInitialSourceIngest) {
const ingestResult = await runInitialSourceIngestWithRecovery({ const ingestResult = await runInitialSourceIngestWithRecovery({
args: input.args, args: input.args,
connectionId, connectionId: latestConnectionId,
io: input.io, io: input.io,
prompts: input.prompts, prompts: input.prompts,
deps: input.deps, deps: input.deps,
}); });
if (ingestResult === 'failed') { if (ingestResult === 'failed') {
await rollback?.(); await rollbackAfterConfigure?.();
return { status: 'failed' }; return { status: 'failed' };
} }
if (ingestResult === 'back') { if (ingestResult === 'back') {
await rollback?.(); await rollbackAfterConfigure?.();
return { status: 'back' }; return { status: 'back' };
} }
} else { } else {
input.io.stdout.write(`│ Context source ${connectionId} saved. It will be built during the context build step.\n`); input.io.stdout.write(`│ Context source ${latestConnectionId} saved. It will be built during the context build step.\n`);
} }
return { status: 'ready', connectionId }; return { status: 'ready', connectionId: latestConnectionId };
} }
export async function runKtxSetupSourcesStep( export async function runKtxSetupSourcesStep(
@ -1942,8 +2044,13 @@ export async function runKtxSetupSourcesStep(
returnToSourceSelection = true; returnToSourceSelection = true;
break; break;
} }
if (!readyConnectionIds.includes(choiceResult.connectionId)) { if (choiceResult.status === 'skip') {
readyConnectionIds.push(choiceResult.connectionId); continue;
}
if (choiceResult.status === 'ready') {
if (!readyConnectionIds.includes(choiceResult.connectionId)) {
readyConnectionIds.push(choiceResult.connectionId);
}
} }
} }
@ -2005,8 +2112,13 @@ export async function runKtxSetupSourcesStep(
if (choiceResult.status === 'back') { if (choiceResult.status === 'back') {
continue; continue;
} }
if (!readyConnectionIds.includes(choiceResult.connectionId)) { if (choiceResult.status === 'skip') {
readyConnectionIds.push(choiceResult.connectionId); continue;
}
if (choiceResult.status === 'ready') {
if (!readyConnectionIds.includes(choiceResult.connectionId)) {
readyConnectionIds.push(choiceResult.connectionId);
}
} }
continue; continue;
} }

View file

@ -0,0 +1,171 @@
import { describe, expect, it, vi } from 'vitest';
import {
runConnectionSetupWithRecovery,
type ConfigureResult,
type RecoveryAction,
type ValidateResult,
} from '../src/connection-recovery.js';
function input(overrides: {
interactive?: boolean;
allowSkip?: boolean;
configure?: () => Promise<ConfigureResult>;
validate?: () => Promise<ValidateResult>;
selectValues?: string[];
extraActions?: RecoveryAction[];
}) {
const selectValues = [...(overrides.selectValues ?? [])];
const rollback = vi.fn(async () => {});
const select = vi.fn(async () => selectValues.shift() ?? 'back');
const validate = overrides.validate ?? vi.fn(async () => ({ status: 'ok' as const }));
return {
rollback,
select,
validate,
run: () =>
runConnectionSetupWithRecovery({
label: 'warehouse',
interactive: overrides.interactive ?? true,
allowSkip: overrides.allowSkip ?? true,
io: {
stdout: { write: vi.fn() },
stderr: { write: vi.fn() },
},
prompts: { select },
snapshot: vi.fn(async () => rollback),
configure: overrides.configure ?? vi.fn(async () => 'configured' as const),
validate,
}),
};
}
describe('runConnectionSetupWithRecovery', () => {
it('returns ready without opening the menu when first validation passes', async () => {
const setup = input({});
await expect(setup.run()).resolves.toBe('ready');
expect(setup.select).not.toHaveBeenCalled();
expect(setup.rollback).not.toHaveBeenCalled();
});
it('fails fast without prompting or rollback when noninteractive validation fails', async () => {
const setup = input({
interactive: false,
validate: vi.fn(async () => ({ status: 'failed' as const })),
});
await expect(setup.run()).resolves.toBe('failed');
expect(setup.select).not.toHaveBeenCalled();
expect(setup.rollback).not.toHaveBeenCalled();
});
it('retries the same config after Retry and returns ready', async () => {
let calls = 0;
const setup = input({
selectValues: ['retry'],
validate: vi.fn(async () => {
calls += 1;
return calls === 1 ? { status: 'failed' as const } : { status: 'ok' as const };
}),
});
await expect(setup.run()).resolves.toBe('ready');
expect(setup.validate).toHaveBeenCalledTimes(2);
expect(setup.rollback).not.toHaveBeenCalled();
});
it('re-enters config and validates the new attempt', async () => {
let calls = 0;
const configure = vi.fn(async () => 'configured' as const);
const setup = input({
configure,
selectValues: ['re-enter'],
validate: vi.fn(async () => {
calls += 1;
return calls === 1 ? { status: 'failed' as const } : { status: 'ok' as const };
}),
});
await expect(setup.run()).resolves.toBe('ready');
expect(configure).toHaveBeenCalledTimes(2);
expect(setup.validate).toHaveBeenCalledTimes(2);
expect(setup.rollback).not.toHaveBeenCalled();
});
it('rolls back once and returns skip when Skip is selected', async () => {
const setup = input({
selectValues: ['skip'],
validate: vi.fn(async () => ({ status: 'failed' as const })),
});
await expect(setup.run()).resolves.toBe('skip');
expect(setup.rollback).toHaveBeenCalledTimes(1);
});
it('omits Skip when allowSkip is false and rolls back on Back', async () => {
const setup = input({
allowSkip: false,
selectValues: ['back'],
validate: vi.fn(async () => ({ status: 'failed' as const })),
});
await expect(setup.run()).resolves.toBe('back');
expect(setup.select).toHaveBeenCalledWith({
message: 'Connection setup failed for warehouse',
options: [
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
{ value: 'back', label: 'Back' },
],
});
expect(setup.rollback).toHaveBeenCalledTimes(1);
});
it('runs an extra action and then revalidates', async () => {
const action = vi.fn(async () => {});
let calls = 0;
const setup = input({
selectValues: ['disable-query-history'],
validate: vi.fn(async () => {
calls += 1;
return calls === 1
? {
status: 'failed' as const,
extraActions: [
{ value: 'disable-query-history', label: 'Disable query history and retry', run: action },
],
}
: { status: 'ok' as const };
}),
});
await expect(setup.run()).resolves.toBe('ready');
expect(action).toHaveBeenCalledTimes(1);
expect(setup.validate).toHaveBeenCalledTimes(2);
});
it('rolls back when re-enter returns back or cancelled', async () => {
const backSetup = input({
selectValues: ['re-enter'],
configure: vi.fn(async () => 'back' as const),
validate: vi.fn(async () => ({ status: 'failed' as const })),
});
await expect(backSetup.run()).resolves.toBe('back');
expect(backSetup.rollback).toHaveBeenCalledTimes(1);
const cancelledSetup = input({
selectValues: ['re-enter'],
configure: vi.fn(async () => 'cancelled' as const),
validate: vi.fn(async () => ({ status: 'failed' as const })),
});
await expect(cancelledSetup.run()).resolves.toBe('failed');
expect(cancelledSetup.rollback).toHaveBeenCalledTimes(1);
});
});

View file

@ -264,6 +264,7 @@ describe('setup context build state', () => {
now: () => new Date('2026-05-09T10:00:00.000Z'), now: () => new Date('2026-05-09T10:00:00.000Z'),
runContextBuild: runContextBuildMock, runContextBuild: runContextBuildMock,
verifyContextReady, verifyContextReady,
testConnection: async () => 0,
}, },
), ),
).resolves.toEqual({ status: 'ready', projectDir: tempDir, runId: 'setup-context-local-abc123' }); ).resolves.toEqual({ status: 'ready', projectDir: tempDir, runId: 'setup-context-local-abc123' });
@ -315,6 +316,7 @@ describe('setup context build state', () => {
runIdFactory: () => 'setup-context-local-failed', runIdFactory: () => 'setup-context-local-failed',
now: () => new Date('2026-05-09T10:00:00.000Z'), now: () => new Date('2026-05-09T10:00:00.000Z'),
runContextBuild: runContextBuildMock, runContextBuild: runContextBuildMock,
testConnection: async () => 0,
}, },
), ),
).resolves.toEqual({ status: 'failed', projectDir: tempDir }); ).resolves.toEqual({ status: 'failed', projectDir: tempDir });
@ -347,6 +349,7 @@ describe('setup context build state', () => {
runIdFactory: () => 'setup-context-local-throw', runIdFactory: () => 'setup-context-local-throw',
now: () => new Date('2026-05-09T10:00:00.000Z'), now: () => new Date('2026-05-09T10:00:00.000Z'),
runContextBuild: runContextBuildMock, runContextBuild: runContextBuildMock,
testConnection: async () => 0,
}, },
), ),
).resolves.toEqual({ ).resolves.toEqual({
@ -423,6 +426,7 @@ describe('setup context build state', () => {
runIdFactory: () => 'setup-context-local-enriched-scan', runIdFactory: () => 'setup-context-local-enriched-scan',
now: () => new Date('2026-05-09T10:00:00.000Z'), now: () => new Date('2026-05-09T10:00:00.000Z'),
runContextBuild: runContextBuildMock, runContextBuild: runContextBuildMock,
testConnection: async () => 0,
}, },
), ),
).resolves.toEqual({ status: 'ready', projectDir: tempDir, runId: 'setup-context-local-enriched-scan' }); ).resolves.toEqual({ status: 'ready', projectDir: tempDir, runId: 'setup-context-local-enriched-scan' });
@ -457,7 +461,7 @@ describe('setup context build state', () => {
runKtxSetupContextStep( runKtxSetupContextStep(
{ projectDir: tempDir, inputMode: 'disabled' }, { projectDir: tempDir, inputMode: 'disabled' },
io.io, io.io,
{ runContextBuild: runContextBuildMock }, { runContextBuild: runContextBuildMock, testConnection: async () => 0 },
), ),
).resolves.toMatchObject({ status: 'ready' }); ).resolves.toMatchObject({ status: 'ready' });
@ -552,10 +556,119 @@ describe('setup context build state', () => {
runKtxSetupContextStep( runKtxSetupContextStep(
{ projectDir: tempDir, inputMode: 'disabled' }, { projectDir: tempDir, inputMode: 'disabled' },
io.io, io.io,
{ runContextBuild: runContextBuildMock, verifyContextReady }, { runContextBuild: runContextBuildMock, verifyContextReady, testConnection: async () => 0 },
), ),
).resolves.toMatchObject({ status: 'ready' }); ).resolves.toMatchObject({ status: 'ready' });
expect(runContextBuildMock).toHaveBeenCalledOnce(); expect(runContextBuildMock).toHaveBeenCalledOnce();
}); });
it('blocks the build and names the failing connection without leaking raw error text', async () => {
const missingDbPath = join(tempDir, 'missing-warehouse.sqlite');
await writeReadyProject(tempDir, {
connections: { warehouse: { driver: 'sqlite', path: missingDbPath } },
});
const io = makeIo();
const runContextBuildMock = vi.fn(async () => ({ exitCode: 0 }));
await expect(
runKtxSetupContextStep(
{ projectDir: tempDir, inputMode: 'disabled' },
io.io,
{
runIdFactory: () => 'setup-context-local-gate',
now: () => new Date('2026-05-09T10:00:00.000Z'),
runContextBuild: runContextBuildMock,
},
),
).resolves.toEqual({ status: 'failed', projectDir: tempDir });
expect(runContextBuildMock).not.toHaveBeenCalled();
// Names the failing connection by id + connector type, with remediation.
expect(io.stderr()).toContain('warehouse (sqlite)');
expect(io.stderr()).toContain('ktx connection test');
// The remediation command targets the project that just failed, not cwd.
expect(io.stderr()).toContain(`ktx connection test <id> --project-dir ${tempDir}`);
// Never surfaces raw connection error text (or the database path) to the user.
expect(io.stderr()).not.toContain('File not found');
expect(io.stderr()).not.toContain(missingDbPath);
// The failed context state forces context.ready=false so setup cannot read as ready.
await expect(readKtxSetupContextState(tempDir)).resolves.toMatchObject({
status: 'failed',
failureReason: 'Required connections failed their live test: warehouse (sqlite).',
});
expect((await readKtxSetupState(tempDir)).completed_steps).not.toContain('context');
});
it('retries connection tests after a fix and then builds in interactive mode', async () => {
await writeReadyProject(tempDir, {
connections: { warehouse: { driver: 'postgres', readonly: true } },
});
const io = makeIo();
const runContextBuildMock = vi.fn(async () => ({ exitCode: 0 }));
const verifyContextReady = vi.fn(async () => ({
ready: true,
agentContextReady: true,
semanticSearchReady: true,
details: ['ready'],
}));
let gateRounds = 0;
const testConnection = vi.fn(async () => (++gateRounds === 1 ? 1 : 0));
let selectCalls = 0;
const select = vi.fn(async () => {
selectCalls += 1;
return selectCalls === 1 ? 'build' : 'retry';
});
await expect(
runKtxSetupContextStep(
{ projectDir: tempDir, inputMode: 'auto' },
io.io,
{
prompts: { select, cancel: vi.fn() },
runContextBuild: runContextBuildMock,
verifyContextReady,
testConnection,
},
),
).resolves.toMatchObject({ status: 'ready' });
expect(testConnection).toHaveBeenCalledTimes(2);
expect(runContextBuildMock).toHaveBeenCalledOnce();
expect(io.stderr()).toContain('warehouse (postgres)');
});
it('returns to setup when the user backs out of a failing connection in interactive mode', async () => {
await writeReadyProject(tempDir, {
connections: { warehouse: { driver: 'postgres', readonly: true } },
});
const io = makeIo();
const runContextBuildMock = vi.fn(async () => ({ exitCode: 0 }));
const verifyContextReady = vi.fn(async () => ({
ready: true,
agentContextReady: true,
semanticSearchReady: true,
details: ['ready'],
}));
let selectCalls = 0;
const select = vi.fn(async () => {
selectCalls += 1;
return selectCalls === 1 ? 'build' : 'back';
});
await expect(
runKtxSetupContextStep(
{ projectDir: tempDir, inputMode: 'auto' },
io.io,
{
prompts: { select, cancel: vi.fn() },
runContextBuild: runContextBuildMock,
verifyContextReady,
testConnection: async () => 1,
},
),
).resolves.toEqual({ status: 'back', projectDir: tempDir });
expect(runContextBuildMock).not.toHaveBeenCalled();
});
}); });

View file

@ -1261,11 +1261,16 @@ describe('setup databases step', () => {
const prompts = makePromptAdapter({ const prompts = makePromptAdapter({
textValues: ['env:DATABASE_URL'], textValues: ['env:DATABASE_URL'],
}); });
let primaryMenuCount = 0;
vi.mocked(prompts.select).mockImplementation(async (options) => { vi.mocked(prompts.select).mockImplementation(async (options) => {
if (options.message === 'Databases configured: warehouse\nWhat would you like to do?') return 'edit'; if (options.message === 'Databases configured: warehouse\nWhat would you like to do?') {
primaryMenuCount += 1;
return primaryMenuCount === 1 ? 'edit' : 'continue';
}
if (options.message === 'Database to edit') return 'warehouse'; if (options.message === 'Database to edit') return 'warehouse';
if (options.message === 'How do you want to connect to PostgreSQL?') return 'url'; if (options.message === 'How do you want to connect to PostgreSQL?') return 'url';
if (options.message.startsWith('Enable query-history ingest')) return 'no'; if (options.message.startsWith('Enable query-history ingest')) return 'no';
if (options.message === 'Connection setup failed for warehouse') return 'back';
return 'back'; return 'back';
}); });
const listTables = vi.fn(async () => [ const listTables = vi.fn(async () => [
@ -1286,13 +1291,283 @@ describe('setup databases step', () => {
}, },
); );
expect(result).toEqual({ status: 'failed', projectDir: tempDir }); expect(result).toEqual({ status: 'ready', projectDir: tempDir, connectionIds: ['warehouse'] });
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.warehouse).toMatchObject({ expect(config.connections.warehouse).toMatchObject({
enabled_tables: ['public.orders'], enabled_tables: ['public.orders'],
}); });
}); });
it('recovers from an interactive database edit failure by re-entering details', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' analytics:',
' driver: postgres',
' url: env:OLD_DATABASE_URL',
'setup:',
' database_connection_ids:',
' - analytics',
'',
].join('\n'),
'utf-8',
);
const io = makeIo();
const prompts = makePromptAdapter({
selectValues: ['edit', 'analytics', 'url', 'no', 're-enter', 'url', 'no', 'continue'],
textValues: ['env:BAD_DATABASE_URL', 'env:FIXED_DATABASE_URL'],
});
let attempts = 0;
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'auto',
databaseSchemas: [],
skipDatabases: false,
},
io.io,
{
prompts,
testConnection: vi.fn(async () => {
attempts += 1;
return attempts === 1 ? 1 : 0;
}),
scanConnection: vi.fn(async () => 0),
listSchemas: vi.fn(async () => ['public']),
listTables: vi.fn(async () => [{ catalog: null, schema: 'public', name: 'orders', kind: 'table' as const }]),
},
);
expect(result.status).toBe('ready');
expect(vi.mocked(prompts.select)).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Connection setup failed for analytics',
options: expect.arrayContaining([
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
{ value: 'back', label: 'Back' },
]),
}),
);
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.analytics).toMatchObject({
driver: 'postgres',
url: 'env:FIXED_DATABASE_URL',
});
});
it('re-enters details after an interactive existing database validation failure', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' warehouse:',
' driver: postgres',
' url: env:OLD_DATABASE_URL',
'',
].join('\n'),
'utf-8',
);
const io = makeIo();
const prompts = makePromptAdapter({
selectValues: ['existing:warehouse', 'no', 're-enter', 'url', 'no'],
textValues: ['env:FIXED_DATABASE_URL'],
});
let attempts = 0;
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'auto',
databaseDrivers: ['postgres'],
databaseSchemas: [],
skipDatabases: false,
},
io.io,
{
prompts,
testConnection: vi.fn(async () => {
attempts += 1;
return attempts === 1 ? 1 : 0;
}),
scanConnection: vi.fn(async () => 0),
listSchemas: vi.fn(async () => ['public']),
listTables: vi.fn(async () => [
{ catalog: null, schema: 'public', name: 'orders', kind: 'table' as const },
]),
},
);
expect(result.status).toBe('ready');
expect(vi.mocked(prompts.select)).toHaveBeenCalledWith({
message: 'How do you want to connect to PostgreSQL?',
options: [
{ value: 'url', label: 'Paste a connection URL' },
{ value: 'fields', label: 'Enter connection details (host, port, database, user)' },
{ value: 'back', label: 'Back' },
],
});
expect(vi.mocked(prompts.select)).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Connection setup failed for warehouse',
options: expect.arrayContaining([
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
{ value: 'skip', label: 'Skip this connection' },
{ value: 'back', label: 'Back' },
]),
}),
);
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.warehouse).toMatchObject({
driver: 'postgres',
url: 'env:FIXED_DATABASE_URL',
});
});
it('restores the previous database config when backing out of a failed edit', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' analytics:',
' driver: postgres',
' url: env:OLD_DATABASE_URL',
'setup:',
' database_connection_ids:',
' - analytics',
'',
].join('\n'),
'utf-8',
);
const io = makeIo();
const prompts = makePromptAdapter({
selectValues: ['edit', 'analytics', 'url', 'no', 'back', 'continue'],
textValues: ['env:BAD_DATABASE_URL'],
});
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'auto',
databaseSchemas: [],
skipDatabases: false,
},
io.io,
{
prompts,
testConnection: vi.fn(async () => 1),
scanConnection: vi.fn(async () => 0),
},
);
expect(result.status).toBe('ready');
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.analytics).toMatchObject({
driver: 'postgres',
url: 'env:OLD_DATABASE_URL',
});
});
it('keeps scripted database setup fail-fast without rolling back attempted config', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' analytics:',
' driver: postgres',
' url: env:OLD_DATABASE_URL',
'',
].join('\n'),
'utf-8',
);
const io = makeIo();
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'disabled',
databaseConnectionIds: ['analytics'],
databaseSchemas: [],
enableQueryHistory: true,
skipDatabases: false,
},
io.io,
{
testConnection: vi.fn(async () => 1),
scanConnection: vi.fn(async () => 0),
},
);
expect(result.status).toBe('failed');
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.analytics).toMatchObject({
driver: 'postgres',
url: 'env:OLD_DATABASE_URL',
context: {
queryHistory: {
enabled: true,
},
},
});
});
it('keeps scripted database ids fail-fast even when input mode is auto', async () => {
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' analytics:',
' driver: postgres',
' url: env:OLD_DATABASE_URL',
'',
].join('\n'),
'utf-8',
);
const io = makeIo();
const prompts = makePromptAdapter({});
vi.mocked(prompts.select).mockImplementation(async ({ message }) => {
if (message === 'Connection setup failed for analytics') {
throw new Error('scripted selected-id setup opened the recovery menu');
}
return 'finish';
});
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'auto',
databaseConnectionIds: ['analytics'],
databaseSchemas: [],
enableQueryHistory: true,
skipDatabases: false,
},
io.io,
{
prompts,
testConnection: vi.fn(async () => 1),
scanConnection: vi.fn(async () => 0),
},
);
expect(result.status).toBe('failed');
expect(prompts.select).not.toHaveBeenCalledWith(
expect.objectContaining({ message: 'Connection setup failed for analytics' }),
);
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.analytics).toMatchObject({
driver: 'postgres',
url: 'env:OLD_DATABASE_URL',
context: {
queryHistory: {
enabled: true,
},
},
});
});
it('lets Escape from connection fields return to connection method selection', async () => { it('lets Escape from connection fields return to connection method selection', async () => {
const prompts = makePromptAdapter({ const prompts = makePromptAdapter({
selectValues: ['fields', 'url'], selectValues: ['fields', 'url'],
@ -2517,7 +2792,7 @@ describe('setup databases step', () => {
vi.mocked(prompts.select).mockImplementation(async ({ message, options }) => { vi.mocked(prompts.select).mockImplementation(async ({ message, options }) => {
if (message.startsWith('Enable query-history ingest')) return 'yes'; if (message.startsWith('Enable query-history ingest')) return 'yes';
if (message.includes('How much database context should KTX build?')) return 'fast'; if (message.includes('How much database context should KTX build?')) return 'fast';
if (message.startsWith('Database setup failed for analytics')) { if (message.startsWith('Connection setup failed for analytics')) {
failurePromptCount += 1; failurePromptCount += 1;
failurePromptOptions.push(options); failurePromptOptions.push(options);
if (failurePromptCount === 1) return 'disable-query-history'; if (failurePromptCount === 1) return 'disable-query-history';
@ -2874,6 +3149,25 @@ describe('setup databases step', () => {
expect(io.stderr()).toContain('Missing database connection id'); expect(io.stderr()).toContain('Missing database connection id');
}); });
it('returns missing input when a non-interactive new connection is missing required details', async () => {
const io = makeIo();
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'disabled',
databaseDrivers: ['postgres'],
databaseConnectionId: 'warehouse',
databaseSchemas: [],
skipDatabases: false,
},
io.io,
);
expect(result.status).toBe('missing-input');
expect(io.stderr()).toContain('Missing connection details');
});
it('accepts former ingest subcommand names as non-interactive database connection ids', async () => { it('accepts former ingest subcommand names as non-interactive database connection ids', async () => {
const io = makeIo(); const io = makeIo();

View file

@ -706,7 +706,18 @@ describe('setup sources step', () => {
); );
expect(io.stderr()).toContain('1: Metabase database does not match KTX connection database'); expect(io.stderr()).toContain('1: Metabase database does not match KTX connection database');
expect(io.stderr()).not.toContain('Metabase mapping validation failed'); expect(io.stderr()).not.toContain('Metabase mapping validation failed');
expect(testPrompts.log).toHaveBeenCalledWith('Edit the connection or pick a different source to continue.'); expect(testPrompts.log).toHaveBeenCalledWith('Validating Metabase mapping...');
expect(testPrompts.select).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Connection setup failed for metabase-main',
options: expect.arrayContaining([
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
{ value: 'skip', label: 'Skip this connection' },
{ value: 'back', label: 'Back' },
]),
}),
);
}); });
it('does not mark sources complete when validation fails', async () => { it('does not mark sources complete when validation fails', async () => {
@ -961,7 +972,153 @@ describe('setup sources step', () => {
expect(result.status).not.toBe('failed'); expect(result.status).not.toBe('failed');
expect(io.stderr()).toContain('Failed to clone https://github.com/acme/private-repo: Authentication failed'); expect(io.stderr()).toContain('Failed to clone https://github.com/acme/private-repo: Authentication failed');
expect(testPrompts.log).toHaveBeenCalledWith('Edit the connection or pick a different source to continue.'); expect(testPrompts.select).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Connection setup failed for dbt-main',
options: expect.arrayContaining([
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
{ value: 'skip', label: 'Skip this connection' },
{ value: 'back', label: 'Back' },
]),
}),
);
});
it('recovers from an existing context-source validation failure by re-entering details', async () => {
await addPrimarySource();
await addConnection('dbt-main', {
driver: 'dbt',
source_dir: '/repo/bad-dbt',
project_name: 'analytics',
});
let attempts = 0;
const validateDbt = vi.fn(async () => {
attempts += 1;
return attempts === 1
? { ok: false as const, message: 'dbt project not found' }
: { ok: true as const, detail: 'project=analytics' };
});
const testPrompts = prompts({
multiselect: [['dbt']],
select: ['existing:dbt-main', 're-enter', 'path', 'done'],
text: ['/repo/fixed-dbt', ''],
});
const io = makeIo();
const result = await runKtxSetupSourcesStep(
{ projectDir, inputMode: 'auto', runInitialSourceIngest: false, skipSources: false },
io.io,
{ prompts: testPrompts, validateDbt },
);
expect(result.status).toBe('ready');
expect(validateDbt).toHaveBeenCalledTimes(2);
expect(vi.mocked(testPrompts.select)).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Connection setup failed for dbt-main',
options: expect.arrayContaining([
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
{ value: 'skip', label: 'Skip this connection' },
{ value: 'back', label: 'Back' },
]),
}),
);
expect((await readConfig()).connections['dbt-main']).toMatchObject({
driver: 'dbt',
source_dir: '/repo/fixed-dbt',
});
});
it('restores a context-source edit and adapter enablement when recovery goes back', async () => {
await addPrimarySource();
await addConnection('dbt-main', {
driver: 'dbt',
source_dir: '/repo/existing-dbt',
project_name: 'analytics',
});
const testPrompts = prompts({
multiselect: [['dbt']],
select: ['edit:dbt-main', 'path', 'back'],
text: ['/repo/bad-dbt', ''],
});
const io = makeIo();
const result = await runKtxSetupSourcesStep(
{ projectDir, inputMode: 'auto', runInitialSourceIngest: false, skipSources: false },
io.io,
{
prompts: testPrompts,
validateDbt: vi.fn(async () => ({ ok: false as const, message: 'dbt project not found' })),
},
);
expect(result.status).toBe('skipped');
const config = await readConfig();
expect(config.connections['dbt-main']).toMatchObject({
driver: 'dbt',
source_dir: '/repo/existing-dbt',
project_name: 'analytics',
});
expect(config.ingest.adapters).not.toContain('dbt');
});
it('lets Metabase mapping failure retry through source recovery', async () => {
await addPrimarySource();
let mappingAttempts = 0;
const runMapping = vi.fn(async () => {
mappingAttempts += 1;
return mappingAttempts === 1 ? 1 : 0;
});
const testPrompts = prompts({
multiselect: [['metabase']],
select: ['env', 'retry', 'done'],
text: ['metabase-main', 'https://metabase.example.com'],
});
const io = makeIo();
const result = await runKtxSetupSourcesStep(
{ projectDir, inputMode: 'auto', runInitialSourceIngest: false, skipSources: false },
io.io,
{
prompts: testPrompts,
discoverMetabaseDatabases: vi.fn(async () => [
{ id: 1, name: 'Analytics', engine: 'postgres', host: 'db.example.com', dbName: 'analytics' },
]),
runMapping,
},
);
expect(result.status).toBe('ready');
expect(runMapping).toHaveBeenCalledTimes(2);
});
it('keeps noninteractive source setup fail-fast without rolling back attempted config', async () => {
await addPrimarySource();
const io = makeIo();
const result = await runKtxSetupSourcesStep(
{
projectDir,
inputMode: 'disabled',
source: 'lookml',
sourceConnectionId: 'looker-repo',
sourceGitUrl: 'https://github.com/acme/lookml.git',
runInitialSourceIngest: false,
skipSources: false,
},
io.io,
{
validateLookml: vi.fn(async () => ({ ok: false as const, message: 'No LookML files found' })),
},
);
expect(result.status).toBe('failed');
expect((await readConfig()).connections['looker-repo']).toMatchObject({
driver: 'lookml',
repoUrl: 'https://github.com/acme/lookml.git',
});
}); });
it('adds a dbt source connection and enables its adapter', async () => { it('adds a dbt source connection and enables its adapter', async () => {
@ -1371,7 +1528,17 @@ describe('setup sources step', () => {
source_dir: '/repo/new-dbt', source_dir: '/repo/new-dbt',
})); }));
expect(io.stderr()).toContain('dbt project not found'); expect(io.stderr()).toContain('dbt project not found');
expect(testPrompts.log).toHaveBeenCalledWith('Edit the connection or pick a different source to continue.'); expect(testPrompts.select).toHaveBeenCalledWith(
expect.objectContaining({
message: 'Connection setup failed for dbt-main',
options: expect.arrayContaining([
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
{ value: 'skip', label: 'Skip this connection' },
{ value: 'back', label: 'Back' },
]),
}),
);
const config = await readConfig(); const config = await readConfig();
expect(config.connections['dbt-main']).toMatchObject({ expect(config.connections['dbt-main']).toMatchObject({
driver: 'dbt', driver: 'dbt',