From c526601d5297edc2380dd7b48d1028f3be4c7893 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Mon, 25 May 2026 17:22:43 +0200 Subject: [PATCH] refactor(setup): centralize rail-prefixed diagnostics + query-history fallback Extract errorMessage, writePrefixedLines, and flushPrefixedBufferedCommandOutput into clack.ts so the setup wizard, managed daemons, and embedding/agent steps share one rail-formatted writer. setup-databases.ts also adds a "disable query history and retry" option when the schema-context build fails and query history is the likely culprit, surfaced via a new failed-query-history-unavailable status. --- .../content/docs/cli-reference/ktx-setup.mdx | 6 +- packages/cli/src/clack.ts | 23 ++++ packages/cli/src/managed-local-embeddings.ts | 3 +- packages/cli/src/managed-python-http.ts | 3 +- packages/cli/src/setup-agents.ts | 3 +- packages/cli/src/setup-context.ts | 3 +- packages/cli/src/setup-databases.ts | 115 ++++++++++-------- packages/cli/src/setup-embeddings.ts | 7 +- packages/cli/src/setup-runtime.ts | 3 +- packages/cli/src/setup-sources.ts | 3 +- 10 files changed, 109 insertions(+), 60 deletions(-) diff --git a/docs-site/content/docs/cli-reference/ktx-setup.mdx b/docs-site/content/docs/cli-reference/ktx-setup.mdx index 17423534..2c19bd07 100644 --- a/docs-site/content/docs/cli-reference/ktx-setup.mdx +++ b/docs-site/content/docs/cli-reference/ktx-setup.mdx @@ -137,8 +137,10 @@ Enabling query history makes deep ingest readiness matter for later When query history is enabled for PostgreSQL, Snowflake, or BigQuery, `ktx setup` runs a non-blocking readiness probe after the connection test passes. A failed probe still writes setup changes, prints the warehouse-specific -grant or extension remediation, and leaves query-history ingest disabled until -you fix the prerequisite. +grant or extension remediation, and skips query-history processing until you +fix the prerequisite. If the later schema-context build also fails, interactive +setup offers **Disable query history and retry** so you can finish database +setup with `connections..context.queryHistory.enabled: false`. For BigQuery, the remediation tells you to grant `roles/bigquery.resourceViewer` on the BigQuery project, or grant a custom role that contains diff --git a/packages/cli/src/clack.ts b/packages/cli/src/clack.ts index 55d3e802..2ad51e6c 100644 --- a/packages/cli/src/clack.ts +++ b/packages/cli/src/clack.ts @@ -1,7 +1,30 @@ import { cancel, confirm, isCancel, log, spinner } from '@clack/prompts'; +import type { KtxCliIo } from './cli-runtime.js'; const ESC = String.fromCharCode(0x1b); +export interface RailBufferedSource { + stdoutText(): string; + stderrText(): string; +} + +export function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +export function writePrefixedLines(write: (chunk: string) => void, output: string): void { + for (const line of output.split(/\r?\n/)) { + if (line.length > 0) { + write(`│ ${line}\n`); + } + } +} + +export function flushPrefixedBufferedCommandOutput(io: KtxCliIo, buffered: RailBufferedSource): void { + writePrefixedLines((chunk) => io.stdout.write(chunk), buffered.stdoutText()); + writePrefixedLines((chunk) => io.stderr.write(chunk), buffered.stderrText()); +} + export interface KtxCliSpinner { start(message: string): void; message(message: string): void; diff --git a/packages/cli/src/managed-local-embeddings.ts b/packages/cli/src/managed-local-embeddings.ts index b178be47..768648c1 100644 --- a/packages/cli/src/managed-local-embeddings.ts +++ b/packages/cli/src/managed-local-embeddings.ts @@ -1,5 +1,6 @@ import type { KtxEmbeddingConfig } from './llm/types.js'; import type { KtxCliIo } from './cli-runtime.js'; +import { writePrefixedLines } from './clack.js'; import { ensureManagedPythonCommandRuntime, type KtxManagedPythonInstallPolicy, @@ -73,7 +74,7 @@ export async function ensureManagedLocalEmbeddingsDaemon( }); const verb = daemon.status === 'started' ? 'Started' : 'Using'; - options.io.stderr.write(`${verb} KTX daemon: ${daemon.baseUrl}\n`); + writePrefixedLines((chunk) => options.io.stderr.write(chunk), `${verb} KTX daemon: ${daemon.baseUrl}`); return { baseUrl: daemon.baseUrl, diff --git a/packages/cli/src/managed-python-http.ts b/packages/cli/src/managed-python-http.ts index 0c9b24b3..728aa3ca 100644 --- a/packages/cli/src/managed-python-http.ts +++ b/packages/cli/src/managed-python-http.ts @@ -7,6 +7,7 @@ import type { LookerTableIdentifierParser } from './context/ingest/adapters/look import { createHttpSqlAnalysisPort, type KtxSqlAnalysisHttpJsonRunner } from './context/sql-analysis/http-sql-analysis-port.js'; import type { SqlAnalysisPort } from './context/sql-analysis/ports.js'; import type { KtxCliIo } from './cli-runtime.js'; +import { writePrefixedLines } from './clack.js'; import { ensureManagedPythonCommandRuntime, type KtxManagedPythonInstallPolicy, @@ -137,7 +138,7 @@ export function createManagedPythonDaemonBaseUrlResolver( force: false, }); const verb = daemon.status === 'started' ? 'Started' : 'Using existing'; - options.io.stderr.write(`${verb} KTX daemon: ${daemon.baseUrl}\n`); + writePrefixedLines((chunk) => options.io.stderr.write(chunk), `${verb} KTX daemon: ${daemon.baseUrl}`); cachedBaseUrl = daemon.baseUrl; return cachedBaseUrl; }; diff --git a/packages/cli/src/setup-agents.ts b/packages/cli/src/setup-agents.ts index 99d510c5..a671ba4b 100644 --- a/packages/cli/src/setup-agents.ts +++ b/packages/cli/src/setup-agents.ts @@ -10,6 +10,7 @@ import { markKtxSetupStateStepComplete } from './context/project/setup-config.js import { serializeKtxProjectConfig } from './context/project/config.js'; import { strToU8, zipSync } from 'fflate'; import type { KtxCliIo } from './cli-runtime.js'; +import { errorMessage, writePrefixedLines } from './clack.js'; import { createKtxSetupPromptAdapter, createKtxSetupUiAdapter, @@ -1230,7 +1231,7 @@ export async function runKtxSetupAgentsStep( } return { status: 'ready', projectDir: args.projectDir, installs, nextActions }; } catch (error) { - io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); + writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error)); return { status: 'failed', projectDir: args.projectDir }; } } diff --git a/packages/cli/src/setup-context.ts b/packages/cli/src/setup-context.ts index aa519111..dc289278 100644 --- a/packages/cli/src/setup-context.ts +++ b/packages/cli/src/setup-context.ts @@ -5,6 +5,7 @@ import { type KtxLocalProject, loadKtxProject } from './context/project/project. import { markKtxSetupStateStepComplete, readKtxSetupState } from './context/project/setup-config.js'; import { serializeKtxProjectConfig } from './context/project/config.js'; import type { KtxCliIo } from './cli-runtime.js'; +import { errorMessage, writePrefixedLines } from './clack.js'; import { buildPublicIngestPlan } from './public-ingest.js'; import { type KtxDatabaseContextDepth, @@ -745,7 +746,7 @@ export async function runKtxSetupContextStep( return await runBuild(args, io, deps, project, targets); } catch (error) { - io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); + writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error)); return { status: 'failed', projectDir: args.projectDir }; } } diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index 3dcf678a..0704ecd2 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -16,6 +16,11 @@ import { loadKtxProject } from './context/project/project.js'; import { markKtxSetupStateStepComplete, setKtxSetupDatabaseConnectionIds } from './context/project/setup-config.js'; import type { KtxTableListEntry } from './context/scan/types.js'; import type { KtxCliIo } from './cli-runtime.js'; +import { + errorMessage, + flushPrefixedBufferedCommandOutput, + writePrefixedLines, +} from './clack.js'; import { runKtxConnection } from './connection.js'; import { pickDatabaseScope as defaultPickDatabaseScope, @@ -221,7 +226,7 @@ const SCOPE_DISCOVERY_SPECS: Partial; -type ConnectionSetupStatus = 'ready' | 'back' | 'failed'; +type ConnectionSetupStatus = 'ready' | 'back' | 'failed' | 'failed-query-history-unavailable'; const DRIVER_CONNECTION_DEFAULTS: Record = { postgres: { port: '5432' }, @@ -1017,25 +1022,6 @@ function createBufferedCommandIo(): BufferedCommandIo { }; } -function flushBufferedCommandOutput(io: KtxCliIo, bufferedIo: BufferedCommandIo): void { - const stdout = bufferedIo.stdoutText(); - const stderr = bufferedIo.stderrText(); - if (stdout.length > 0) { - io.stdout.write(stdout); - } - if (stderr.length > 0) { - io.stderr.write(stderr); - } -} - -function writePrefixedLines(write: (chunk: string) => void, output: string): void { - for (const line of output.split(/\r?\n/)) { - if (line.length > 0) { - write(`│ ${line}\n`); - } - } -} - function envWithCurrentNodeFirst(env: NodeJS.ProcessEnv = process.env): NodeJS.ProcessEnv { return { ...env, @@ -1111,11 +1097,6 @@ async function defaultRebuildNativeSqlite(io: KtxCliIo): Promise { } } -function flushPrefixedBufferedCommandOutput(io: KtxCliIo, bufferedIo: BufferedCommandIo): void { - writePrefixedLines((chunk) => io.stdout.write(chunk), bufferedIo.stdoutText()); - writePrefixedLines((chunk) => io.stderr.write(chunk), bufferedIo.stderrText()); -} - function nativeSqliteAbiMismatchDetail(output: string): string | null { const mentionsBetterSqlite = /\bbetter-sqlite3\b|better_sqlite3/i.test(output); const mentionsAbiMismatch = /compiled against a different Node\.js version|NODE_MODULE_VERSION/i.test(output); @@ -1207,6 +1188,20 @@ async function writeConnectionConfig(input: { } } +async function disableConnectionQueryHistory(projectDir: string, connectionId: string): Promise { + const project = await loadKtxProject({ projectDir }); + const connection = project.config.connections[connectionId]; + if (!connection) { + return; + } + const existing = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection) ?? {}; + await writeConnectionConfig({ + projectDir, + connectionId, + connection: withQueryHistoryConfig(connection, { ...existing, enabled: false }), + }); +} + async function createConnectionConfigRollback(projectDir: string, connectionId: string): Promise<() => Promise> { const project = await loadKtxProject({ projectDir }); const previousConnection = project.config.connections[connectionId]; @@ -1408,9 +1403,9 @@ async function maybeConfigureDatabaseScope(input: { input.connectionId, ); } catch (error) { - const detail = error instanceof Error ? error.message : String(error); - input.io.stderr.write( - `Could not discover ${spec.promptLabel.toLowerCase()} for ${input.connectionId}; ${detail}\n`, + writePrefixedLines( + (chunk) => input.io.stderr.write(chunk), + `Could not discover ${spec.promptLabel.toLowerCase()} for ${input.connectionId}; ${errorMessage(error)}`, ); const typed = await promptCommaSeparatedScope({ prompts: input.prompts, @@ -1462,11 +1457,12 @@ async function maybeConfigureDatabaseScope(input: { input.io, ); } catch (error) { - const detail = error instanceof Error ? error.message : String(error); - input.io.stderr.write( + const detail = errorMessage(error); + writePrefixedLines( + (chunk) => input.io.stderr.write(chunk), input.forcePrompt === true - ? `Could not discover tables for ${input.connectionId}; edit was not saved. ${detail}\n` - : `Could not discover tables for ${input.connectionId}; continuing without table filter. ${detail}\n`, + ? `Could not discover tables for ${input.connectionId}; edit was not saved. ${detail}` + : `Could not discover tables for ${input.connectionId}; continuing without table filter. ${detail}`, ); return input.forcePrompt === true ? 'failed' : 'ready'; } @@ -1554,19 +1550,19 @@ async function maybeRunHistoricSqlSetupProbe(input: { connectionId: string; io: KtxCliIo; deps: KtxSetupDatabasesDeps; -}): Promise { +}): Promise { const project = await loadKtxProject({ projectDir: input.projectDir }); const connection = project.config.connections[input.connectionId]; const queryHistory = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection); if (queryHistory?.enabled !== true) { - return; + return true; } if (!connection) { - return; + return true; } const dialect = queryHistoryDialectForConnection(connection); if (!dialect) { - return; + return true; } input.io.stdout.write('│ Query history probe...\n'); @@ -1585,6 +1581,7 @@ async function maybeRunHistoricSqlSetupProbe(input: { if (!result.ok) { input.io.stdout.write('│ Setup written; query history will be skipped until fixed.\n'); } + return result.ok; } async function applyHistoricSqlConfigToExistingConnection(input: { @@ -1674,8 +1671,11 @@ async function validateAndScanConnection(input: { const testIo = createBufferedCommandIo(); const testCode = await testConnection(input.projectDir, input.connectionId, testIo); if (testCode !== 0) { - flushBufferedCommandOutput(input.io, testIo); - input.io.stderr.write(`Connection test failed for ${input.connectionId}.\n`); + flushPrefixedBufferedCommandOutput(input.io, testIo); + writePrefixedLines( + (chunk) => input.io.stderr.write(chunk), + `Connection test failed for ${input.connectionId}.`, + ); return 'failed'; } const testOutput = testIo.stdoutText(); @@ -1689,7 +1689,7 @@ async function validateAndScanConnection(input: { return scopeStatus; } - await maybeRunHistoricSqlSetupProbe({ + const queryHistoryAvailable = await maybeRunHistoricSqlSetupProbe({ projectDir: input.projectDir, connectionId: input.connectionId, io: input.io, @@ -1746,7 +1746,7 @@ async function validateAndScanConnection(input: { ); } if (scanCode !== 0) { - return 'failed'; + return queryHistoryAvailable ? 'failed' : 'failed-query-history-unavailable'; } } const scanOutput = scanIo.stdoutText(); @@ -1888,7 +1888,10 @@ async function runPrimarySourceFullEdit(input: { const existing = project.config.connections[input.connectionId]; const driver = normalizeDriver(existing?.driver); if (!existing || !driver) { - input.io.stderr.write(`Connection "${input.connectionId}" is not a configured database.\n`); + writePrefixedLines( + (chunk) => input.io.stderr.write(chunk), + `Connection "${input.connectionId}" is not a configured database.`, + ); return 'failed'; } @@ -1942,7 +1945,7 @@ async function runPrimarySourceFullEdit(input: { }); if (validated !== 'ready') { await rollback(); - return validated; + return validated === 'failed-query-history-unavailable' ? 'failed' : validated; } return 'ready'; } @@ -2077,7 +2080,7 @@ export async function runKtxSetupDatabasesStep( prompts, }); } catch (error) { - io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); + writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error)); return { status: 'failed', projectDir: args.projectDir }; } if (connectionChoice === 'back') { @@ -2221,14 +2224,18 @@ export async function runKtxSetupDatabasesStep( break; } if (args.inputMode === 'disabled') return { status: 'failed', projectDir: args.projectDir }; + const failureOptions = [ + { value: 'retry', label: 'Retry connection test' }, + { value: 're-enter', label: 'Re-enter connection details' }, + ...(setupStatus === 'failed-query-history-unavailable' + ? [{ value: 'disable-query-history', label: 'Disable query history and retry' }] + : []), + { value: 'skip', label: 'Skip this database' }, + { value: 'back', label: 'Back' }, + ]; const action = await prompts.select({ message: `Database setup failed for ${connectionChoice.connectionId}`, - options: [ - { value: 'retry', label: 'Retry connection test' }, - { value: 're-enter', label: 'Re-enter connection details' }, - { value: 'skip', label: 'Skip this database' }, - { value: 'back', label: 'Back' }, - ], + options: failureOptions, }); if (action === 'back') { if (!canReturnToDriverSelection) return { status: 'back', projectDir: args.projectDir }; @@ -2248,6 +2255,16 @@ export async function runKtxSetupDatabasesStep( args, prompts, }); + } else if (action === 'disable-query-history') { + await disableConnectionQueryHistory(args.projectDir, connectionChoice.connectionId); + setupStatus = await validateAndScanConnection({ + projectDir: args.projectDir, + connectionId: connectionChoice.connectionId, + io, + deps, + args, + prompts, + }); } else if (action === 're-enter') { const connection = await buildConnectionConfig({ driver, diff --git a/packages/cli/src/setup-embeddings.ts b/packages/cli/src/setup-embeddings.ts index f703980a..8f49bcf1 100644 --- a/packages/cli/src/setup-embeddings.ts +++ b/packages/cli/src/setup-embeddings.ts @@ -6,7 +6,7 @@ import { markKtxSetupStateStepComplete, readKtxSetupState } from './context/proj import type { KtxEmbeddingConfig } from './llm/types.js'; import { type KtxEmbeddingHealthCheckResult, runKtxEmbeddingHealthCheck } from './llm/embedding-health.js'; import type { KtxCliIo } from './cli-runtime.js'; -import { createStaticCliSpinner, type KtxCliSpinner } from './clack.js'; +import { createStaticCliSpinner, errorMessage, writePrefixedLines, type KtxCliSpinner } from './clack.js'; import { ensureManagedLocalEmbeddingsDaemon, managedLocalEmbeddingHealthConfig, @@ -420,11 +420,12 @@ export async function runKtxSetupEmbeddingsStep( io, }); } catch (error) { + const write = (chunk: string) => io.stderr.write(chunk); if (error instanceof ManagedPythonDaemonStartError) { const tail = await readLocalEmbeddingDaemonStderrTail(error.stderrLog); - io.stderr.write(`${localEmbeddingSetupMessage(error.detail, tail)}\n`); + writePrefixedLines(write, localEmbeddingSetupMessage(error.detail, tail)); } else { - io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); + writePrefixedLines(write, errorMessage(error)); } return { status: 'failed', projectDir: args.projectDir }; } diff --git a/packages/cli/src/setup-runtime.ts b/packages/cli/src/setup-runtime.ts index 25612065..19f09a53 100644 --- a/packages/cli/src/setup-runtime.ts +++ b/packages/cli/src/setup-runtime.ts @@ -1,6 +1,7 @@ import { loadKtxProject, type KtxLocalProject } from './context/project/project.js'; import { markKtxSetupStateStepComplete } from './context/project/setup-config.js'; import type { KtxCliIo } from './cli-runtime.js'; +import { errorMessage, writePrefixedLines } from './clack.js'; import { ensureManagedLocalEmbeddingsDaemon, type ManagedLocalEmbeddingsDaemon, @@ -88,7 +89,7 @@ export async function runKtxSetupRuntimeStep( }); } } catch (error) { - io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); + writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error)); return { status: 'failed', projectDir: args.projectDir, requirements }; } diff --git a/packages/cli/src/setup-sources.ts b/packages/cli/src/setup-sources.ts index 410de812..dea1cd43 100644 --- a/packages/cli/src/setup-sources.ts +++ b/packages/cli/src/setup-sources.ts @@ -17,6 +17,7 @@ import { type KtxProjectConfig, type KtxProjectConnectionConfig, serializeKtxPro import { loadKtxProject } from './context/project/project.js'; import { markKtxSetupStateStepComplete } from './context/project/setup-config.js'; import type { KtxCliIo } from './cli-runtime.js'; +import { errorMessage, writePrefixedLines } from './clack.js'; import { pickNotionRootPages } from './notion-page-picker.js'; import { runKtxSourceMapping } from './source-mapping.js'; import { withMultiselectNavigation, withTextInputNavigation } from './prompt-navigation.js'; @@ -1983,7 +1984,7 @@ export async function runKtxSetupSourcesStep( return { status: 'ready', projectDir: args.projectDir, connectionIds: readyConnectionIds }; } } catch (error) { - io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); + writePrefixedLines((chunk) => io.stderr.write(chunk), errorMessage(error)); return { status: 'failed', projectDir: args.projectDir }; } }