refactor(setup): centralize rail-prefixed diagnostics + query-history fallback

Extract errorMessage, writePrefixedLines, and flushPrefixedBufferedCommandOutput
into clack.ts so the setup wizard, managed daemons, and embedding/agent steps
share one rail-formatted writer. setup-databases.ts also adds a
"disable query history and retry" option when the schema-context build fails
and query history is the likely culprit, surfaced via a new
failed-query-history-unavailable status.
This commit is contained in:
Andrey Avtomonov 2026-05-25 17:22:43 +02:00
parent 3a3086b7cd
commit c526601d52
10 changed files with 109 additions and 60 deletions

View file

@ -137,8 +137,10 @@ Enabling query history makes deep ingest readiness matter for later
When query history is enabled for PostgreSQL, Snowflake, or BigQuery,
`ktx setup` runs a non-blocking readiness probe after the connection test
passes. A failed probe still writes setup changes, prints the warehouse-specific
grant or extension remediation, and leaves query-history ingest disabled until
you fix the prerequisite.
grant or extension remediation, and skips query-history processing until you
fix the prerequisite. If the later schema-context build also fails, interactive
setup offers **Disable query history and retry** so you can finish database
setup with `connections.<id>.context.queryHistory.enabled: false`.
For BigQuery, the remediation tells you to grant `roles/bigquery.resourceViewer`
on the BigQuery project, or grant a custom role that contains

View file

@ -1,7 +1,30 @@
import { cancel, confirm, isCancel, log, spinner } from '@clack/prompts';
import type { KtxCliIo } from './cli-runtime.js';
const ESC = String.fromCharCode(0x1b);
export interface RailBufferedSource {
stdoutText(): string;
stderrText(): string;
}
export function errorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
export function writePrefixedLines(write: (chunk: string) => void, output: string): void {
for (const line of output.split(/\r?\n/)) {
if (line.length > 0) {
write(`${line}\n`);
}
}
}
export function flushPrefixedBufferedCommandOutput(io: KtxCliIo, buffered: RailBufferedSource): void {
writePrefixedLines((chunk) => io.stdout.write(chunk), buffered.stdoutText());
writePrefixedLines((chunk) => io.stderr.write(chunk), buffered.stderrText());
}
export interface KtxCliSpinner {
start(message: string): void;
message(message: string): void;

View file

@ -1,5 +1,6 @@
import type { KtxEmbeddingConfig } from './llm/types.js';
import type { KtxCliIo } from './cli-runtime.js';
import { writePrefixedLines } from './clack.js';
import {
ensureManagedPythonCommandRuntime,
type KtxManagedPythonInstallPolicy,
@ -73,7 +74,7 @@ export async function ensureManagedLocalEmbeddingsDaemon(
});
const verb = daemon.status === 'started' ? 'Started' : 'Using';
options.io.stderr.write(`${verb} KTX daemon: ${daemon.baseUrl}\n`);
writePrefixedLines((chunk) => options.io.stderr.write(chunk), `${verb} KTX daemon: ${daemon.baseUrl}`);
return {
baseUrl: daemon.baseUrl,

View file

@ -7,6 +7,7 @@ import type { LookerTableIdentifierParser } from './context/ingest/adapters/look
import { createHttpSqlAnalysisPort, type KtxSqlAnalysisHttpJsonRunner } from './context/sql-analysis/http-sql-analysis-port.js';
import type { SqlAnalysisPort } from './context/sql-analysis/ports.js';
import type { KtxCliIo } from './cli-runtime.js';
import { writePrefixedLines } from './clack.js';
import {
ensureManagedPythonCommandRuntime,
type KtxManagedPythonInstallPolicy,
@ -137,7 +138,7 @@ export function createManagedPythonDaemonBaseUrlResolver(
force: false,
});
const verb = daemon.status === 'started' ? 'Started' : 'Using existing';
options.io.stderr.write(`${verb} KTX daemon: ${daemon.baseUrl}\n`);
writePrefixedLines((chunk) => options.io.stderr.write(chunk), `${verb} KTX daemon: ${daemon.baseUrl}`);
cachedBaseUrl = daemon.baseUrl;
return cachedBaseUrl;
};

View file

@ -10,6 +10,7 @@ import { markKtxSetupStateStepComplete } from './context/project/setup-config.js
import { serializeKtxProjectConfig } from './context/project/config.js';
import { strToU8, zipSync } from 'fflate';
import type { KtxCliIo } from './cli-runtime.js';
import { errorMessage, writePrefixedLines } from './clack.js';
import {
createKtxSetupPromptAdapter,
createKtxSetupUiAdapter,
@ -1230,7 +1231,7 @@ export async function runKtxSetupAgentsStep(
}
return { status: 'ready', projectDir: args.projectDir, installs, nextActions };
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error));
return { status: 'failed', projectDir: args.projectDir };
}
}

View file

@ -5,6 +5,7 @@ import { type KtxLocalProject, loadKtxProject } from './context/project/project.
import { markKtxSetupStateStepComplete, readKtxSetupState } from './context/project/setup-config.js';
import { serializeKtxProjectConfig } from './context/project/config.js';
import type { KtxCliIo } from './cli-runtime.js';
import { errorMessage, writePrefixedLines } from './clack.js';
import { buildPublicIngestPlan } from './public-ingest.js';
import {
type KtxDatabaseContextDepth,
@ -745,7 +746,7 @@ export async function runKtxSetupContextStep(
return await runBuild(args, io, deps, project, targets);
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error));
return { status: 'failed', projectDir: args.projectDir };
}
}

View file

@ -16,6 +16,11 @@ import { loadKtxProject } from './context/project/project.js';
import { markKtxSetupStateStepComplete, setKtxSetupDatabaseConnectionIds } from './context/project/setup-config.js';
import type { KtxTableListEntry } from './context/scan/types.js';
import type { KtxCliIo } from './cli-runtime.js';
import {
errorMessage,
flushPrefixedBufferedCommandOutput,
writePrefixedLines,
} from './clack.js';
import { runKtxConnection } from './connection.js';
import {
pickDatabaseScope as defaultPickDatabaseScope,
@ -221,7 +226,7 @@ const SCOPE_DISCOVERY_SPECS: Partial<Record<KtxSetupDatabaseDriver, ScopeDiscove
};
type UrlDriverType = Extract<KtxSetupDatabaseDriver, 'postgres' | 'mysql' | 'clickhouse' | 'sqlserver'>;
type ConnectionSetupStatus = 'ready' | 'back' | 'failed';
type ConnectionSetupStatus = 'ready' | 'back' | 'failed' | 'failed-query-history-unavailable';
const DRIVER_CONNECTION_DEFAULTS: Record<UrlDriverType, { port: string }> = {
postgres: { port: '5432' },
@ -1017,25 +1022,6 @@ function createBufferedCommandIo(): BufferedCommandIo {
};
}
function flushBufferedCommandOutput(io: KtxCliIo, bufferedIo: BufferedCommandIo): void {
const stdout = bufferedIo.stdoutText();
const stderr = bufferedIo.stderrText();
if (stdout.length > 0) {
io.stdout.write(stdout);
}
if (stderr.length > 0) {
io.stderr.write(stderr);
}
}
function writePrefixedLines(write: (chunk: string) => void, output: string): void {
for (const line of output.split(/\r?\n/)) {
if (line.length > 0) {
write(`${line}\n`);
}
}
}
function envWithCurrentNodeFirst(env: NodeJS.ProcessEnv = process.env): NodeJS.ProcessEnv {
return {
...env,
@ -1111,11 +1097,6 @@ async function defaultRebuildNativeSqlite(io: KtxCliIo): Promise<number> {
}
}
function flushPrefixedBufferedCommandOutput(io: KtxCliIo, bufferedIo: BufferedCommandIo): void {
writePrefixedLines((chunk) => io.stdout.write(chunk), bufferedIo.stdoutText());
writePrefixedLines((chunk) => io.stderr.write(chunk), bufferedIo.stderrText());
}
function nativeSqliteAbiMismatchDetail(output: string): string | null {
const mentionsBetterSqlite = /\bbetter-sqlite3\b|better_sqlite3/i.test(output);
const mentionsAbiMismatch = /compiled against a different Node\.js version|NODE_MODULE_VERSION/i.test(output);
@ -1207,6 +1188,20 @@ async function writeConnectionConfig(input: {
}
}
async function disableConnectionQueryHistory(projectDir: string, connectionId: string): Promise<void> {
const project = await loadKtxProject({ projectDir });
const connection = project.config.connections[connectionId];
if (!connection) {
return;
}
const existing = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection) ?? {};
await writeConnectionConfig({
projectDir,
connectionId,
connection: withQueryHistoryConfig(connection, { ...existing, enabled: false }),
});
}
async function createConnectionConfigRollback(projectDir: string, connectionId: string): Promise<() => Promise<void>> {
const project = await loadKtxProject({ projectDir });
const previousConnection = project.config.connections[connectionId];
@ -1408,9 +1403,9 @@ async function maybeConfigureDatabaseScope(input: {
input.connectionId,
);
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
input.io.stderr.write(
`Could not discover ${spec.promptLabel.toLowerCase()} for ${input.connectionId}; ${detail}\n`,
writePrefixedLines(
(chunk) => input.io.stderr.write(chunk),
`Could not discover ${spec.promptLabel.toLowerCase()} for ${input.connectionId}; ${errorMessage(error)}`,
);
const typed = await promptCommaSeparatedScope({
prompts: input.prompts,
@ -1462,11 +1457,12 @@ async function maybeConfigureDatabaseScope(input: {
input.io,
);
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
input.io.stderr.write(
const detail = errorMessage(error);
writePrefixedLines(
(chunk) => input.io.stderr.write(chunk),
input.forcePrompt === true
? `Could not discover tables for ${input.connectionId}; edit was not saved. ${detail}\n`
: `Could not discover tables for ${input.connectionId}; continuing without table filter. ${detail}\n`,
? `Could not discover tables for ${input.connectionId}; edit was not saved. ${detail}`
: `Could not discover tables for ${input.connectionId}; continuing without table filter. ${detail}`,
);
return input.forcePrompt === true ? 'failed' : 'ready';
}
@ -1554,19 +1550,19 @@ async function maybeRunHistoricSqlSetupProbe(input: {
connectionId: string;
io: KtxCliIo;
deps: KtxSetupDatabasesDeps;
}): Promise<void> {
}): Promise<boolean> {
const project = await loadKtxProject({ projectDir: input.projectDir });
const connection = project.config.connections[input.connectionId];
const queryHistory = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection);
if (queryHistory?.enabled !== true) {
return;
return true;
}
if (!connection) {
return;
return true;
}
const dialect = queryHistoryDialectForConnection(connection);
if (!dialect) {
return;
return true;
}
input.io.stdout.write('│ Query history probe...\n');
@ -1585,6 +1581,7 @@ async function maybeRunHistoricSqlSetupProbe(input: {
if (!result.ok) {
input.io.stdout.write('│ Setup written; query history will be skipped until fixed.\n');
}
return result.ok;
}
async function applyHistoricSqlConfigToExistingConnection(input: {
@ -1674,8 +1671,11 @@ async function validateAndScanConnection(input: {
const testIo = createBufferedCommandIo();
const testCode = await testConnection(input.projectDir, input.connectionId, testIo);
if (testCode !== 0) {
flushBufferedCommandOutput(input.io, testIo);
input.io.stderr.write(`Connection test failed for ${input.connectionId}.\n`);
flushPrefixedBufferedCommandOutput(input.io, testIo);
writePrefixedLines(
(chunk) => input.io.stderr.write(chunk),
`Connection test failed for ${input.connectionId}.`,
);
return 'failed';
}
const testOutput = testIo.stdoutText();
@ -1689,7 +1689,7 @@ async function validateAndScanConnection(input: {
return scopeStatus;
}
await maybeRunHistoricSqlSetupProbe({
const queryHistoryAvailable = await maybeRunHistoricSqlSetupProbe({
projectDir: input.projectDir,
connectionId: input.connectionId,
io: input.io,
@ -1746,7 +1746,7 @@ async function validateAndScanConnection(input: {
);
}
if (scanCode !== 0) {
return 'failed';
return queryHistoryAvailable ? 'failed' : 'failed-query-history-unavailable';
}
}
const scanOutput = scanIo.stdoutText();
@ -1888,7 +1888,10 @@ async function runPrimarySourceFullEdit(input: {
const existing = project.config.connections[input.connectionId];
const driver = normalizeDriver(existing?.driver);
if (!existing || !driver) {
input.io.stderr.write(`Connection "${input.connectionId}" is not a configured database.\n`);
writePrefixedLines(
(chunk) => input.io.stderr.write(chunk),
`Connection "${input.connectionId}" is not a configured database.`,
);
return 'failed';
}
@ -1942,7 +1945,7 @@ async function runPrimarySourceFullEdit(input: {
});
if (validated !== 'ready') {
await rollback();
return validated;
return validated === 'failed-query-history-unavailable' ? 'failed' : validated;
}
return 'ready';
}
@ -2077,7 +2080,7 @@ export async function runKtxSetupDatabasesStep(
prompts,
});
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error));
return { status: 'failed', projectDir: args.projectDir };
}
if (connectionChoice === 'back') {
@ -2221,14 +2224,18 @@ export async function runKtxSetupDatabasesStep(
break;
}
if (args.inputMode === 'disabled') return { status: 'failed', projectDir: args.projectDir };
const failureOptions = [
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
...(setupStatus === 'failed-query-history-unavailable'
? [{ value: 'disable-query-history', label: 'Disable query history and retry' }]
: []),
{ value: 'skip', label: 'Skip this database' },
{ value: 'back', label: 'Back' },
];
const action = await prompts.select({
message: `Database setup failed for ${connectionChoice.connectionId}`,
options: [
{ value: 'retry', label: 'Retry connection test' },
{ value: 're-enter', label: 'Re-enter connection details' },
{ value: 'skip', label: 'Skip this database' },
{ value: 'back', label: 'Back' },
],
options: failureOptions,
});
if (action === 'back') {
if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir };
@ -2248,6 +2255,16 @@ export async function runKtxSetupDatabasesStep(
args,
prompts,
});
} else if (action === 'disable-query-history') {
await disableConnectionQueryHistory(args.projectDir, connectionChoice.connectionId);
setupStatus = await validateAndScanConnection({
projectDir: args.projectDir,
connectionId: connectionChoice.connectionId,
io,
deps,
args,
prompts,
});
} else if (action === 're-enter') {
const connection = await buildConnectionConfig({
driver,

View file

@ -6,7 +6,7 @@ import { markKtxSetupStateStepComplete, readKtxSetupState } from './context/proj
import type { KtxEmbeddingConfig } from './llm/types.js';
import { type KtxEmbeddingHealthCheckResult, runKtxEmbeddingHealthCheck } from './llm/embedding-health.js';
import type { KtxCliIo } from './cli-runtime.js';
import { createStaticCliSpinner, type KtxCliSpinner } from './clack.js';
import { createStaticCliSpinner, errorMessage, writePrefixedLines, type KtxCliSpinner } from './clack.js';
import {
ensureManagedLocalEmbeddingsDaemon,
managedLocalEmbeddingHealthConfig,
@ -420,11 +420,12 @@ export async function runKtxSetupEmbeddingsStep(
io,
});
} catch (error) {
const write = (chunk: string) => io.stderr.write(chunk);
if (error instanceof ManagedPythonDaemonStartError) {
const tail = await readLocalEmbeddingDaemonStderrTail(error.stderrLog);
io.stderr.write(`${localEmbeddingSetupMessage(error.detail, tail)}\n`);
writePrefixedLines(write, localEmbeddingSetupMessage(error.detail, tail));
} else {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
writePrefixedLines(write, errorMessage(error));
}
return { status: 'failed', projectDir: args.projectDir };
}

View file

@ -1,6 +1,7 @@
import { loadKtxProject, type KtxLocalProject } from './context/project/project.js';
import { markKtxSetupStateStepComplete } from './context/project/setup-config.js';
import type { KtxCliIo } from './cli-runtime.js';
import { errorMessage, writePrefixedLines } from './clack.js';
import {
ensureManagedLocalEmbeddingsDaemon,
type ManagedLocalEmbeddingsDaemon,
@ -88,7 +89,7 @@ export async function runKtxSetupRuntimeStep(
});
}
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error));
return { status: 'failed', projectDir: args.projectDir, requirements };
}

View file

@ -17,6 +17,7 @@ import { type KtxProjectConfig, type KtxProjectConnectionConfig, serializeKtxPro
import { loadKtxProject } from './context/project/project.js';
import { markKtxSetupStateStepComplete } from './context/project/setup-config.js';
import type { KtxCliIo } from './cli-runtime.js';
import { errorMessage, writePrefixedLines } from './clack.js';
import { pickNotionRootPages } from './notion-page-picker.js';
import { runKtxSourceMapping } from './source-mapping.js';
import { withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js';
@ -1983,7 +1984,7 @@ export async function runKtxSetupSourcesStep(
return { status: 'ready', projectDir: args.projectDir, connectionIds: readyConnectionIds };
}
} catch (error) {
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error));
return { status: 'failed', projectDir: args.projectDir };
}
}