From 3677b1fb0c25a8e7ac288918d9624e4825cf8de5 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Sun, 24 May 2026 01:28:42 +0200 Subject: [PATCH] feat(historic-sql): use shared readiness probes --- .../ingest/historic-sql-probes.test.ts | 157 +++++++++ .../src/context/ingest/historic-sql-probes.ts | 141 ++++++++ .../bigquery-runner.test.ts | 110 ++++++ .../historic-sql-probes/bigquery-runner.ts | 160 +++++++++ .../postgres-runner.test.ts | 113 +++++++ .../historic-sql-probes/postgres-runner.ts | 111 ++++++ .../snowflake-runner.test.ts | 82 +++++ .../historic-sql-probes/snowflake-runner.ts | 96 ++++++ packages/cli/src/setup-databases.test.ts | 235 +++++++++++-- packages/cli/src/setup-databases.ts | 161 ++------- packages/cli/src/status-project.test.ts | 107 ++++-- packages/cli/src/status-project.ts | 320 ++++-------------- 12 files changed, 1337 insertions(+), 456 deletions(-) create mode 100644 packages/cli/src/context/ingest/historic-sql-probes.test.ts create mode 100644 packages/cli/src/context/ingest/historic-sql-probes.ts create mode 100644 packages/cli/src/context/ingest/historic-sql-probes/bigquery-runner.test.ts create mode 100644 packages/cli/src/context/ingest/historic-sql-probes/bigquery-runner.ts create mode 100644 packages/cli/src/context/ingest/historic-sql-probes/postgres-runner.test.ts create mode 100644 packages/cli/src/context/ingest/historic-sql-probes/postgres-runner.ts create mode 100644 packages/cli/src/context/ingest/historic-sql-probes/snowflake-runner.test.ts create mode 100644 packages/cli/src/context/ingest/historic-sql-probes/snowflake-runner.ts diff --git a/packages/cli/src/context/ingest/historic-sql-probes.test.ts b/packages/cli/src/context/ingest/historic-sql-probes.test.ts new file mode 100644 index 00000000..e96b261f --- /dev/null +++ b/packages/cli/src/context/ingest/historic-sql-probes.test.ts @@ -0,0 +1,157 @@ +import { describe, expect, it, vi } from 'vitest'; +import type { HistoricSqlDialect } from './adapters/historic-sql/types.js'; +import { + historicSqlProbeCatalogName, + runHistoricSqlReadinessProbe, + type HistoricSqlProbeRunner, + type HistoricSqlProbeRunnerFactoryEntry, +} from './historic-sql-probes.js'; + +function fakeRunner( + dialect: HistoricSqlDialect, + catalogName: string, + options: { result?: unknown; error?: unknown } = {}, +): HistoricSqlProbeRunner & { runCalls: () => number } { + let calls = 0; + return { + dialect, + catalogName, + async run() { + calls += 1; + if (options.error) { + throw options.error; + } + return options.result ?? { warnings: [], info: [] }; + }, + formatSuccessDetail() { + return { detail: `${catalogName} ready`, warnings: [] }; + }, + fixAdvice(error) { + return { + failHeadline: error instanceof Error ? error.message : String(error), + remediation: 'Fix the test probe.', + }; + }, + runCalls: () => calls, + }; +} + +function factories( + overrides: Partial>, +): Record { + const postgres = overrides.postgres ?? fakeRunner('postgres', 'pg_stat_statements'); + const snowflake = + overrides.snowflake ?? + fakeRunner('snowflake', 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'); + const bigquery = + overrides.bigquery ?? fakeRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT'); + + return { + postgres: { + catalogName: 'pg_stat_statements', + load: vi.fn(async () => postgres), + }, + snowflake: { + catalogName: 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + load: vi.fn(async () => snowflake), + }, + bigquery: { + catalogName: 'INFORMATION_SCHEMA.JOBS_BY_PROJECT', + load: vi.fn(async () => bigquery), + }, + }; +} + +describe('historic-SQL probe registry', () => { + it('returns null when the connection has no query-history dialect', async () => { + const deps = { factories: factories({}), cache: new Map() }; + + await expect( + runHistoricSqlReadinessProbe( + { + projectDir: '/work/project', + connectionId: 'mysql', + connection: { + driver: 'mysql', + context: { queryHistory: { enabled: true } }, + }, + env: {}, + }, + deps, + ), + ).resolves.toBeNull(); + + expect(deps.factories.postgres.load).not.toHaveBeenCalled(); + expect(deps.factories.snowflake.load).not.toHaveBeenCalled(); + expect(deps.factories.bigquery.load).not.toHaveBeenCalled(); + }); + + it('dispatches to the dialect runner and caches the runner instance', async () => { + const runner = fakeRunner('postgres', 'pg_stat_statements', { + result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] }, + }); + const deps = { factories: factories({ postgres: runner }), cache: new Map() }; + const input = { + projectDir: '/work/project', + connectionId: 'warehouse', + connection: { + driver: 'postgres', + url: 'env:DATABASE_URL', + context: { queryHistory: { enabled: true } }, + }, + env: {}, + }; + + const first = await runHistoricSqlReadinessProbe(input, deps); + const second = await runHistoricSqlReadinessProbe(input, deps); + + expect(first).toMatchObject({ ok: true, dialect: 'postgres', runner }); + expect(second).toMatchObject({ ok: true, dialect: 'postgres', runner }); + expect(deps.factories.postgres.load).toHaveBeenCalledTimes(1); + expect(runner.runCalls()).toBe(2); + }); + + it('normalizes runner errors into a failed outcome', async () => { + const error = new Error('missing grants'); + const runner = fakeRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT', { + error, + }); + const deps = { factories: factories({ bigquery: runner }), cache: new Map() }; + + await expect( + runHistoricSqlReadinessProbe( + { + projectDir: '/work/project', + connectionId: 'bq', + connection: { + driver: 'bigquery', + credentials_json: '{"project_id":"project-1"}', + context: { queryHistory: { enabled: true } }, + }, + env: {}, + }, + deps, + ), + ).resolves.toEqual({ + ok: false, + dialect: 'bigquery', + runner, + error, + }); + }); + + it('returns catalog names without loading runner modules', () => { + const deps = { factories: factories({}), cache: new Map() }; + + expect(historicSqlProbeCatalogName('postgres', deps)).toBe('pg_stat_statements'); + expect(historicSqlProbeCatalogName('snowflake', deps)).toBe( + 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + ); + expect(historicSqlProbeCatalogName('bigquery', deps)).toBe( + 'INFORMATION_SCHEMA.JOBS_BY_PROJECT', + ); + expect(deps.factories.postgres.load).not.toHaveBeenCalled(); + expect(deps.factories.snowflake.load).not.toHaveBeenCalled(); + expect(deps.factories.bigquery.load).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/cli/src/context/ingest/historic-sql-probes.ts b/packages/cli/src/context/ingest/historic-sql-probes.ts new file mode 100644 index 00000000..07204f3a --- /dev/null +++ b/packages/cli/src/context/ingest/historic-sql-probes.ts @@ -0,0 +1,141 @@ +import type { KtxProjectConnectionConfig } from '../project/config.js'; +import { queryHistoryDialectForConnection } from './adapters/historic-sql/connection-dialect.js'; +import type { HistoricSqlDialect } from './adapters/historic-sql/types.js'; + +export interface HistoricSqlFixAdvice { + failHeadline: string; + remediation: string; +} + +export interface HistoricSqlSuccessDetail { + detail: string; + warnings: string[]; +} + +export interface HistoricSqlProbeInput { + projectDir: string; + connectionId: string; + connection: KtxProjectConnectionConfig; + env?: NodeJS.ProcessEnv; +} + +export interface HistoricSqlProbeRunner { + readonly dialect: HistoricSqlDialect; + readonly catalogName: string; + run(input: HistoricSqlProbeInput): Promise; + formatSuccessDetail(result: unknown): HistoricSqlSuccessDetail; + fixAdvice(error: unknown): HistoricSqlFixAdvice; +} + +/** @internal */ +export interface HistoricSqlProbeRunnerFactoryEntry { + readonly catalogName: string; + load(): Promise; +} + +export type HistoricSqlProbeOutcome = + | { + ok: true; + dialect: HistoricSqlDialect; + runner: HistoricSqlProbeRunner; + result: unknown; + } + | { + ok: false; + dialect: HistoricSqlDialect; + runner: HistoricSqlProbeRunner; + error: unknown; + }; + +export type HistoricSqlReadinessProbe = ( + input: HistoricSqlProbeInput, +) => Promise; + +export interface HistoricSqlProbeRegistryDeps { + factories?: Record; + cache?: Map; +} + +const defaultHistoricSqlProbeRunnerFactories: Record< + HistoricSqlDialect, + HistoricSqlProbeRunnerFactoryEntry +> = { + postgres: { + catalogName: 'pg_stat_statements', + load: async () => { + const { PostgresPgssProbeRunner } = await import( + './historic-sql-probes/postgres-runner.js' + ); + return new PostgresPgssProbeRunner(); + }, + }, + snowflake: { + catalogName: 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + load: async () => { + const { SnowflakeAccountUsageProbeRunner } = await import( + './historic-sql-probes/snowflake-runner.js' + ); + return new SnowflakeAccountUsageProbeRunner(); + }, + }, + bigquery: { + catalogName: 'INFORMATION_SCHEMA.JOBS_BY_PROJECT', + load: async () => { + const { BigQueryJobsByProjectProbeRunner } = await import( + './historic-sql-probes/bigquery-runner.js' + ); + return new BigQueryJobsByProjectProbeRunner(); + }, + }, +}; + +const DEFAULT_RUNNER_CACHE = new Map(); + +function registryDeps(input: HistoricSqlProbeRegistryDeps) { + return { + factories: input.factories ?? defaultHistoricSqlProbeRunnerFactories, + cache: input.cache ?? DEFAULT_RUNNER_CACHE, + }; +} + +export function historicSqlProbeCatalogName( + dialect: HistoricSqlDialect, + deps: HistoricSqlProbeRegistryDeps = {}, +): string { + return registryDeps(deps).factories[dialect].catalogName; +} + +async function loadHistoricSqlProbeRunner( + dialect: HistoricSqlDialect, + deps: HistoricSqlProbeRegistryDeps = {}, +): Promise { + const { factories, cache } = registryDeps(deps); + const cached = cache.get(dialect); + if (cached) { + return cached; + } + const runner = await factories[dialect].load(); + cache.set(dialect, runner); + return runner; +} + +export async function runHistoricSqlReadinessProbe( + input: HistoricSqlProbeInput, + deps: HistoricSqlProbeRegistryDeps = {}, +): Promise { + const dialect = queryHistoryDialectForConnection(input.connection); + if (!dialect) { + return null; + } + const runner = await loadHistoricSqlProbeRunner(dialect, deps); + try { + return { + ok: true, + dialect, + runner, + result: await runner.run(input), + }; + } catch (error) { + return { ok: false, dialect, runner, error }; + } +} diff --git a/packages/cli/src/context/ingest/historic-sql-probes/bigquery-runner.test.ts b/packages/cli/src/context/ingest/historic-sql-probes/bigquery-runner.test.ts new file mode 100644 index 00000000..7a2db117 --- /dev/null +++ b/packages/cli/src/context/ingest/historic-sql-probes/bigquery-runner.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, it, vi } from 'vitest'; +import { HistoricSqlGrantsMissingError } from '../adapters/historic-sql/errors.js'; +import { BigQueryJobsByProjectProbeRunner } from './bigquery-runner.js'; + +describe('BigQueryJobsByProjectProbeRunner', () => { + it('creates a region-scoped reader, runs it, and cleans up the connector', async () => { + const cleanup = vi.fn(async () => undefined); + const reader = { + probe: vi.fn(async () => ({ warnings: [], info: ['region: eu'] })), + }; + const createReader = vi.fn(() => reader); + const runner = new BigQueryJobsByProjectProbeRunner({ + createReader, + createClient: () => ({ client: { executeQuery: vi.fn() }, cleanup }), + resolveReference: () => '{"project_id":"project-1"}', + }); + + await expect( + runner.run({ + projectDir: '/work/project', + connectionId: 'bq', + connection: { + driver: 'bigquery', + credentials_json: 'env:BQ_CREDENTIALS_JSON', + location: 'EU', + }, + env: {}, + }), + ).resolves.toEqual({ warnings: [], info: ['region: eu'] }); + expect(createReader).toHaveBeenCalledWith({ projectId: 'project-1', region: 'EU' }); + expect(reader.probe).toHaveBeenCalledOnce(); + expect(cleanup).toHaveBeenCalledOnce(); + }); + + it('uses us as the default BigQuery region', async () => { + const createReader = vi.fn(() => ({ + probe: vi.fn(async () => ({ warnings: [], info: [] })), + })); + const runner = new BigQueryJobsByProjectProbeRunner({ + createReader, + createClient: () => ({ client: {}, cleanup: vi.fn(async () => undefined) }), + resolveReference: () => '{"project_id":"project-1"}', + }); + + await runner.run({ + projectDir: '/work/project', + connectionId: 'bq', + connection: { + driver: 'bigquery', + credentials_json: '{"project_id":"project-1"}', + }, + env: {}, + }); + + expect(createReader).toHaveBeenCalledWith({ projectId: 'project-1', region: 'us' }); + }); + + it('rejects missing BigQuery credentials_json.project_id', async () => { + const runner = new BigQueryJobsByProjectProbeRunner({ + createReader: vi.fn(), + createClient: () => ({ client: {}, cleanup: vi.fn() }), + resolveReference: () => '{"client_email":"svc@example.test"}', + }); + + await expect( + runner.run({ + projectDir: '/work/project', + connectionId: 'bq', + connection: { + driver: 'bigquery', + credentials_json: 'env:BQ_CREDENTIALS_JSON', + }, + env: {}, + }), + ).rejects.toThrow('Query history BigQuery connection bq requires credentials_json.project_id'); + }); + + it('formats successful BigQuery details', () => { + const runner = new BigQueryJobsByProjectProbeRunner(); + + expect( + runner.formatSuccessDetail({ + warnings: ['JOBS_BY_PROJECT is delayed'], + info: ['region: us'], + }), + ).toEqual({ + detail: 'INFORMATION_SCHEMA.JOBS_BY_PROJECT ready; region: us', + warnings: ['JOBS_BY_PROJECT is delayed'], + }); + }); + + it('maps BigQuery grant errors to runner advice', () => { + const runner = new BigQueryJobsByProjectProbeRunner(); + + expect( + runner.fixAdvice( + new HistoricSqlGrantsMissingError({ + dialect: 'bigquery', + message: 'principal cannot query JOBS_BY_PROJECT', + remediation: + 'Grant roles/bigquery.resourceViewer on the BigQuery project, or grant a custom role containing bigquery.jobs.listAll.', + }), + ), + ).toEqual({ + failHeadline: 'BigQuery principal cannot read INFORMATION_SCHEMA.JOBS_BY_PROJECT', + remediation: + 'Grant roles/bigquery.resourceViewer on the BigQuery project, or grant a custom role containing bigquery.jobs.listAll.', + }); + }); +}); diff --git a/packages/cli/src/context/ingest/historic-sql-probes/bigquery-runner.ts b/packages/cli/src/context/ingest/historic-sql-probes/bigquery-runner.ts new file mode 100644 index 00000000..09ad65d5 --- /dev/null +++ b/packages/cli/src/context/ingest/historic-sql-probes/bigquery-runner.ts @@ -0,0 +1,160 @@ +import { HistoricSqlGrantsMissingError } from '../adapters/historic-sql/errors.js'; +import { BigQueryHistoricSqlQueryHistoryReader } from '../adapters/historic-sql/bigquery-query-history-reader.js'; +import { + type HistoricSqlFixAdvice, + type HistoricSqlProbeInput, + type HistoricSqlProbeRunner, + type HistoricSqlSuccessDetail, +} from '../historic-sql-probes.js'; +import { resolveKtxConfigReference } from '../../core/config-reference.js'; +import { + isKtxBigQueryConnectionConfig, + KtxBigQueryScanConnector, + type KtxBigQueryConnectionConfig, +} from '../../../connectors/bigquery/connector.js'; + +interface GenericProbeResult { + warnings: string[]; + info?: string[]; +} + +interface ClientHandle { + client: unknown; + cleanup(): Promise; +} + +interface BigQueryJobsByProjectProbeRunnerOptions { + createReader?: (options: { projectId: string; region: string }) => { + probe(client: unknown): Promise; + }; + createClient?: ( + input: HistoricSqlProbeInput & { connection: KtxBigQueryConnectionConfig }, + ) => ClientHandle; + resolveReference?: (value: string | undefined, env: NodeJS.ProcessEnv) => string | undefined; +} + +function bigQueryProjectId( + connectionId: string, + connection: KtxBigQueryConnectionConfig, + env: NodeJS.ProcessEnv, + resolveReference: (value: string | undefined, env: NodeJS.ProcessEnv) => string | undefined, +): string { + const rawCredentials = + typeof connection.credentials_json === 'string' ? connection.credentials_json : ''; + const resolvedCredentials = resolveReference(rawCredentials, env); + if (!resolvedCredentials) { + throw new Error(`Query history BigQuery connection ${connectionId} requires credentials_json`); + } + const parsed = JSON.parse(resolvedCredentials) as { project_id?: unknown }; + if (typeof parsed.project_id !== 'string' || parsed.project_id.trim().length === 0) { + throw new Error( + `Query history BigQuery connection ${connectionId} requires credentials_json.project_id`, + ); + } + return parsed.project_id; +} + +function bigQueryRegion(connection: KtxBigQueryConnectionConfig): string { + return typeof connection.location === 'string' && connection.location.trim().length > 0 + ? connection.location.trim() + : 'us'; +} + +function infoSuffix(info: readonly string[] | undefined): string { + return info && info.length > 0 ? `; ${info.join('; ')}` : ''; +} + +export class BigQueryJobsByProjectProbeRunner implements HistoricSqlProbeRunner { + readonly dialect = 'bigquery' as const; + readonly catalogName = 'INFORMATION_SCHEMA.JOBS_BY_PROJECT'; + + private readonly createReader: (options: { projectId: string; region: string }) => { + probe(client: unknown): Promise; + }; + private readonly createClient: ( + input: HistoricSqlProbeInput & { connection: KtxBigQueryConnectionConfig }, + ) => ClientHandle; + private readonly resolveReference: ( + value: string | undefined, + env: NodeJS.ProcessEnv, + ) => string | undefined; + + constructor(options: BigQueryJobsByProjectProbeRunnerOptions = {}) { + this.createReader = + options.createReader ?? + ((readerOptions) => new BigQueryHistoricSqlQueryHistoryReader(readerOptions)); + this.createClient = + options.createClient ?? + ((input) => { + const connector = new KtxBigQueryScanConnector({ + connectionId: input.connectionId, + connection: input.connection, + env: input.env, + }); + return { + client: { + async executeQuery(sql: string) { + const result = await connector.executeReadOnly( + { connectionId: input.connectionId, sql }, + {} as never, + ); + return { + headers: result.headers, + rows: result.rows, + totalRows: result.totalRows, + }; + }, + }, + cleanup: () => connector.cleanup(), + }; + }); + this.resolveReference = options.resolveReference ?? resolveKtxConfigReference; + } + + async run(input: HistoricSqlProbeInput): Promise { + const inputDriver = input.connection.driver ?? 'unknown'; + if (!isKtxBigQueryConnectionConfig(input.connection)) { + throw new Error(`Native BigQuery connector cannot run driver "${inputDriver}"`); + } + const projectId = bigQueryProjectId( + input.connectionId, + input.connection, + input.env ?? process.env, + this.resolveReference, + ); + const reader = this.createReader({ + projectId, + region: bigQueryRegion(input.connection), + }); + const handle = this.createClient({ + ...input, + connection: input.connection, + }); + try { + return await reader.probe(handle.client); + } finally { + await handle.cleanup(); + } + } + + formatSuccessDetail(result: unknown): HistoricSqlSuccessDetail { + const probeResult = result as GenericProbeResult; + return { + detail: `${this.catalogName} ready${infoSuffix(probeResult.info)}`, + warnings: probeResult.warnings, + }; + } + + fixAdvice(error: unknown): HistoricSqlFixAdvice { + if (error instanceof HistoricSqlGrantsMissingError) { + return { + failHeadline: 'BigQuery principal cannot read INFORMATION_SCHEMA.JOBS_BY_PROJECT', + remediation: error.remediation, + }; + } + return { + failHeadline: `${this.catalogName} readiness check failed`, + remediation: error instanceof Error ? error.message : String(error), + }; + } +} diff --git a/packages/cli/src/context/ingest/historic-sql-probes/postgres-runner.test.ts b/packages/cli/src/context/ingest/historic-sql-probes/postgres-runner.test.ts new file mode 100644 index 00000000..bcd6d187 --- /dev/null +++ b/packages/cli/src/context/ingest/historic-sql-probes/postgres-runner.test.ts @@ -0,0 +1,113 @@ +import { describe, expect, it, vi } from 'vitest'; +import { + HistoricSqlExtensionMissingError, + HistoricSqlGrantsMissingError, + HistoricSqlVersionUnsupportedError, +} from '../adapters/historic-sql/errors.js'; +import { PostgresPgssProbeRunner } from './postgres-runner.js'; + +describe('PostgresPgssProbeRunner', () => { + it('runs the pg_stat_statements reader and cleans up the client', async () => { + const cleanup = vi.fn(async () => undefined); + const reader = { + probe: vi.fn(async () => ({ + pgServerVersion: 'PostgreSQL 16.4', + warnings: [], + info: ['tracked statements: 12'], + })), + }; + const runner = new PostgresPgssProbeRunner({ + reader, + createClient: () => ({ client: { executeQuery: vi.fn() }, cleanup }), + }); + + await expect( + runner.run({ + projectDir: '/work/project', + connectionId: 'warehouse', + connection: { driver: 'postgres', url: 'env:DATABASE_URL' }, + env: {}, + }), + ).resolves.toEqual({ + pgServerVersion: 'PostgreSQL 16.4', + warnings: [], + info: ['tracked statements: 12'], + }); + expect(reader.probe).toHaveBeenCalledOnce(); + expect(cleanup).toHaveBeenCalledOnce(); + }); + + it('rejects non-Postgres connections', async () => { + const runner = new PostgresPgssProbeRunner({ + reader: { probe: vi.fn() }, + createClient: () => ({ client: {}, cleanup: vi.fn() }), + }); + + await expect( + runner.run({ + projectDir: '/work/project', + connectionId: 'warehouse', + connection: { driver: 'snowflake' }, + env: {}, + }), + ).rejects.toThrow('Native PostgreSQL connector cannot run driver "snowflake"'); + }); + + it('formats successful Postgres details', () => { + const runner = new PostgresPgssProbeRunner(); + + expect( + runner.formatSuccessDetail({ + pgServerVersion: 'PostgreSQL 16.4', + warnings: ['pg_stat_statements.track is top'], + info: ['tracked statements: 12'], + }), + ).toEqual({ + detail: 'pg_stat_statements ready (PostgreSQL 16.4); tracked statements: 12', + warnings: ['pg_stat_statements.track is top'], + }); + }); + + it('maps Postgres probe errors to actionable advice', () => { + const runner = new PostgresPgssProbeRunner(); + + expect( + runner.fixAdvice( + new HistoricSqlExtensionMissingError({ + dialect: 'postgres', + message: 'pg_stat_statements missing', + remediation: 'CREATE EXTENSION pg_stat_statements;', + }), + ), + ).toEqual({ + failHeadline: 'pg_stat_statements extension is missing', + remediation: 'CREATE EXTENSION pg_stat_statements;', + }); + + expect( + runner.fixAdvice( + new HistoricSqlGrantsMissingError({ + dialect: 'postgres', + message: 'missing grants', + remediation: 'GRANT pg_read_all_stats TO ;', + }), + ), + ).toEqual({ + failHeadline: 'Postgres connection role lacks pg_read_all_stats', + remediation: 'GRANT pg_read_all_stats TO ;', + }); + + expect( + runner.fixAdvice( + new HistoricSqlVersionUnsupportedError({ + dialect: 'postgres', + detectedVersion: 'PostgreSQL 13.12', + minimumVersion: 'PostgreSQL 14', + }), + ), + ).toEqual({ + failHeadline: 'Postgres version too old', + remediation: 'Use PostgreSQL 14 or newer, or disable query history for this connection', + }); + }); +}); diff --git a/packages/cli/src/context/ingest/historic-sql-probes/postgres-runner.ts b/packages/cli/src/context/ingest/historic-sql-probes/postgres-runner.ts new file mode 100644 index 00000000..7ebf9721 --- /dev/null +++ b/packages/cli/src/context/ingest/historic-sql-probes/postgres-runner.ts @@ -0,0 +1,111 @@ +import { + HistoricSqlExtensionMissingError, + HistoricSqlGrantsMissingError, + HistoricSqlVersionUnsupportedError, +} from '../adapters/historic-sql/errors.js'; +import { PostgresPgssReader } from '../adapters/historic-sql/postgres-pgss-reader.js'; +import type { PostgresPgssProbeResult } from '../adapters/historic-sql/types.js'; +import { + type HistoricSqlFixAdvice, + type HistoricSqlProbeInput, + type HistoricSqlProbeRunner, + type HistoricSqlSuccessDetail, +} from '../historic-sql-probes.js'; +import { + isKtxPostgresConnectionConfig, + type KtxPostgresConnectionConfig, +} from '../../../connectors/postgres/connector.js'; +import { KtxPostgresHistoricSqlQueryClient } from '../../../connectors/postgres/historic-sql-query-client.js'; + +interface ClientHandle { + client: unknown; + cleanup(): Promise; +} + +interface PostgresPgssProbeRunnerOptions { + reader?: { probe(client: unknown): Promise }; + createClient?: ( + input: HistoricSqlProbeInput & { connection: KtxPostgresConnectionConfig }, + ) => ClientHandle; +} + +function genericAdvice(error: unknown, catalogName: string): HistoricSqlFixAdvice { + return { + failHeadline: `${catalogName} readiness check failed`, + remediation: error instanceof Error ? error.message : String(error), + }; +} + +function infoSuffix(info: readonly string[] | undefined): string { + return info && info.length > 0 ? `; ${info.join('; ')}` : ''; +} + +export class PostgresPgssProbeRunner implements HistoricSqlProbeRunner { + readonly dialect = 'postgres' as const; + readonly catalogName = 'pg_stat_statements'; + + private readonly reader: { probe(client: unknown): Promise }; + private readonly createClient: ( + input: HistoricSqlProbeInput & { connection: KtxPostgresConnectionConfig }, + ) => ClientHandle; + + constructor(options: PostgresPgssProbeRunnerOptions = {}) { + this.reader = options.reader ?? new PostgresPgssReader(); + this.createClient = + options.createClient ?? + ((input) => { + const client = new KtxPostgresHistoricSqlQueryClient({ + connectionId: input.connectionId, + connection: input.connection, + env: input.env, + }); + return { client, cleanup: () => client.cleanup() }; + }); + } + + async run(input: HistoricSqlProbeInput): Promise { + const inputDriver = input.connection.driver ?? 'unknown'; + if (!isKtxPostgresConnectionConfig(input.connection)) { + throw new Error(`Native PostgreSQL connector cannot run driver "${inputDriver}"`); + } + const handle = this.createClient({ + ...input, + connection: input.connection, + }); + try { + return await this.reader.probe(handle.client); + } finally { + await handle.cleanup(); + } + } + + formatSuccessDetail(result: unknown): HistoricSqlSuccessDetail { + const pgssResult = result as PostgresPgssProbeResult; + return { + detail: `pg_stat_statements ready (${pgssResult.pgServerVersion})${infoSuffix(pgssResult.info)}`, + warnings: pgssResult.warnings, + }; + } + + fixAdvice(error: unknown): HistoricSqlFixAdvice { + if (error instanceof HistoricSqlExtensionMissingError) { + return { + failHeadline: 'pg_stat_statements extension is missing', + remediation: error.remediation, + }; + } + if (error instanceof HistoricSqlGrantsMissingError) { + return { + failHeadline: 'Postgres connection role lacks pg_read_all_stats', + remediation: error.remediation, + }; + } + if (error instanceof HistoricSqlVersionUnsupportedError) { + return { + failHeadline: 'Postgres version too old', + remediation: 'Use PostgreSQL 14 or newer, or disable query history for this connection', + }; + } + return genericAdvice(error, this.catalogName); + } +} diff --git a/packages/cli/src/context/ingest/historic-sql-probes/snowflake-runner.test.ts b/packages/cli/src/context/ingest/historic-sql-probes/snowflake-runner.test.ts new file mode 100644 index 00000000..2d6835bf --- /dev/null +++ b/packages/cli/src/context/ingest/historic-sql-probes/snowflake-runner.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, it, vi } from 'vitest'; +import { HistoricSqlGrantsMissingError } from '../adapters/historic-sql/errors.js'; +import { SnowflakeAccountUsageProbeRunner } from './snowflake-runner.js'; + +describe('SnowflakeAccountUsageProbeRunner', () => { + it('runs the account usage reader and cleans up the client', async () => { + const cleanup = vi.fn(async () => undefined); + const reader = { + probe: vi.fn(async () => ({ warnings: [], info: ['query history available'] })), + }; + const runner = new SnowflakeAccountUsageProbeRunner({ + reader, + createClient: () => ({ client: { executeQuery: vi.fn() }, cleanup }), + }); + + await expect( + runner.run({ + projectDir: '/work/project', + connectionId: 'warehouse', + connection: { + driver: 'snowflake', + account: 'ACCT', + warehouse: 'WH', + database: 'ANALYTICS', + username: 'reader', + }, + env: {}, + }), + ).resolves.toEqual({ warnings: [], info: ['query history available'] }); + expect(reader.probe).toHaveBeenCalledOnce(); + expect(cleanup).toHaveBeenCalledOnce(); + }); + + it('rejects non-Snowflake connections', async () => { + const runner = new SnowflakeAccountUsageProbeRunner({ + reader: { probe: vi.fn() }, + createClient: () => ({ client: {}, cleanup: vi.fn() }), + }); + + await expect( + runner.run({ + projectDir: '/work/project', + connectionId: 'warehouse', + connection: { driver: 'postgres' }, + env: {}, + }), + ).rejects.toThrow('Native Snowflake connector cannot run driver "postgres"'); + }); + + it('formats successful Snowflake details', () => { + const runner = new SnowflakeAccountUsageProbeRunner(); + + expect( + runner.formatSuccessDetail({ + warnings: ['query history is delayed'], + info: ['warehouse: WH'], + }), + ).toEqual({ + detail: 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY ready; warehouse: WH', + warnings: ['query history is delayed'], + }); + }); + + it('maps Snowflake grant errors to runner advice', () => { + const runner = new SnowflakeAccountUsageProbeRunner(); + + expect( + runner.fixAdvice( + new HistoricSqlGrantsMissingError({ + dialect: 'snowflake', + message: 'role cannot read account usage', + remediation: + 'GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ;', + }), + ), + ).toEqual({ + failHeadline: 'Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + remediation: + 'GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ;', + }); + }); +}); diff --git a/packages/cli/src/context/ingest/historic-sql-probes/snowflake-runner.ts b/packages/cli/src/context/ingest/historic-sql-probes/snowflake-runner.ts new file mode 100644 index 00000000..415b46d6 --- /dev/null +++ b/packages/cli/src/context/ingest/historic-sql-probes/snowflake-runner.ts @@ -0,0 +1,96 @@ +import { HistoricSqlGrantsMissingError } from '../adapters/historic-sql/errors.js'; +import { SnowflakeHistoricSqlQueryHistoryReader } from '../adapters/historic-sql/snowflake-query-history-reader.js'; +import { + type HistoricSqlFixAdvice, + type HistoricSqlProbeInput, + type HistoricSqlProbeRunner, + type HistoricSqlSuccessDetail, +} from '../historic-sql-probes.js'; +import { + isKtxSnowflakeConnectionConfig, + type KtxSnowflakeConnectionConfig, +} from '../../../connectors/snowflake/connector.js'; +import { KtxSnowflakeHistoricSqlQueryClient } from '../../../connectors/snowflake/historic-sql-query-client.js'; + +interface GenericProbeResult { + warnings: string[]; + info?: string[]; +} + +interface ClientHandle { + client: unknown; + cleanup(): Promise; +} + +interface SnowflakeAccountUsageProbeRunnerOptions { + reader?: { probe(client: unknown): Promise }; + createClient?: ( + input: HistoricSqlProbeInput & { connection: KtxSnowflakeConnectionConfig }, + ) => ClientHandle; +} + +function infoSuffix(info: readonly string[] | undefined): string { + return info && info.length > 0 ? `; ${info.join('; ')}` : ''; +} + +export class SnowflakeAccountUsageProbeRunner implements HistoricSqlProbeRunner { + readonly dialect = 'snowflake' as const; + readonly catalogName = 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'; + + private readonly reader: { probe(client: unknown): Promise }; + private readonly createClient: ( + input: HistoricSqlProbeInput & { connection: KtxSnowflakeConnectionConfig }, + ) => ClientHandle; + + constructor(options: SnowflakeAccountUsageProbeRunnerOptions = {}) { + this.reader = options.reader ?? new SnowflakeHistoricSqlQueryHistoryReader(); + this.createClient = + options.createClient ?? + ((input) => { + const client = new KtxSnowflakeHistoricSqlQueryClient({ + connectionId: input.connectionId, + connection: input.connection, + projectDir: input.projectDir, + env: input.env, + }); + return { client, cleanup: () => client.cleanup() }; + }); + } + + async run(input: HistoricSqlProbeInput): Promise { + const inputDriver = input.connection.driver ?? 'unknown'; + if (!isKtxSnowflakeConnectionConfig(input.connection)) { + throw new Error(`Native Snowflake connector cannot run driver "${inputDriver}"`); + } + const handle = this.createClient({ + ...input, + connection: input.connection, + }); + try { + return await this.reader.probe(handle.client); + } finally { + await handle.cleanup(); + } + } + + formatSuccessDetail(result: unknown): HistoricSqlSuccessDetail { + const probeResult = result as GenericProbeResult; + return { + detail: `${this.catalogName} ready${infoSuffix(probeResult.info)}`, + warnings: probeResult.warnings, + }; + } + + fixAdvice(error: unknown): HistoricSqlFixAdvice { + if (error instanceof HistoricSqlGrantsMissingError) { + return { + failHeadline: 'Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + remediation: error.remediation, + }; + } + return { + failHeadline: `${this.catalogName} readiness check failed`, + remediation: error instanceof Error ? error.message : String(error), + }; + } +} diff --git a/packages/cli/src/setup-databases.test.ts b/packages/cli/src/setup-databases.test.ts index 50a1c6ed..52dfc26f 100644 --- a/packages/cli/src/setup-databases.test.ts +++ b/packages/cli/src/setup-databases.test.ts @@ -2068,9 +2068,40 @@ describe('setup databases step', () => { expect(io.stdout()).toContain('│ Changes: 0 changes across 56 tables'); }); + function fakeHistoricSqlRunner( + dialect: 'postgres' | 'snowflake' | 'bigquery', + catalogName: string, + ) { + return { + dialect, + catalogName, + async run() { + return { warnings: [], info: [] }; + }, + formatSuccessDetail() { + return { detail: `${catalogName} ready`, warnings: [] }; + }, + fixAdvice() { + return { + failHeadline: `${catalogName} unavailable`, + remediation: 'Fix query-history grants.', + }; + }, + }; + } + it('writes query history config for supported Snowflake databases after validation succeeds', async () => { const io = makeIo(); - const historicSqlProbe = vi.fn(async () => ({ ok: true, lines: [] })); + const runner = fakeHistoricSqlRunner( + 'snowflake', + 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + ); + const historicSqlReadinessProbe = vi.fn(async () => ({ + ok: true as const, + dialect: 'snowflake' as const, + runner, + result: { warnings: [], info: [] }, + })); const result = await runKtxSetupDatabasesStep( { projectDir: tempDir, @@ -2088,7 +2119,7 @@ describe('setup databases step', () => { { testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), - historicSqlProbe, + historicSqlReadinessProbe, prompts: makePromptAdapter({ selectValues: ['password'], textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''], @@ -2096,11 +2127,11 @@ describe('setup databases step', () => { }), }, ); - expect(historicSqlProbe).toHaveBeenCalledWith( + expect(historicSqlReadinessProbe).toHaveBeenCalledWith( expect.objectContaining({ projectDir: tempDir, connectionId: 'snowflake', - dialect: 'snowflake', + connection: expect.objectContaining({ driver: 'snowflake' }), }), ); @@ -2198,7 +2229,15 @@ describe('setup databases step', () => { { testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), - historicSqlProbe: vi.fn(async () => ({ ok: true, lines: [' OK pg_stat_statements ready (PostgreSQL 16.4)'] })), + historicSqlReadinessProbe: vi.fn(async () => { + const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements'); + return { + ok: true as const, + dialect: 'postgres' as const, + runner, + result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] }, + }; + }), }, ); @@ -2269,7 +2308,13 @@ describe('setup databases step', () => { ); const io = makeIo(); const prompts = makePromptAdapter({ selectValues: ['yes', 'deep'] }); - const historicSqlProbe = vi.fn(async () => ({ ok: true, lines: [] })); + const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements'); + const historicSqlReadinessProbe = vi.fn(async () => ({ + ok: true as const, + dialect: 'postgres' as const, + runner, + result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] }, + })); const result = await runKtxSetupDatabasesStep( { @@ -2284,7 +2329,7 @@ describe('setup databases step', () => { prompts, testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), - historicSqlProbe, + historicSqlReadinessProbe, }, ); @@ -2303,11 +2348,13 @@ describe('setup databases step', () => { message: expect.stringContaining('How much database context should KTX build?'), }), ); - expect(historicSqlProbe).toHaveBeenCalledWith({ - projectDir: tempDir, - connectionId: 'warehouse', - dialect: 'postgres', - }); + expect(historicSqlReadinessProbe).toHaveBeenCalledWith( + expect.objectContaining({ + projectDir: tempDir, + connectionId: 'warehouse', + connection: expect.objectContaining({ driver: 'postgres' }), + }), + ); const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); expect(config.connections.warehouse).toMatchObject({ context: { @@ -2335,6 +2382,13 @@ describe('setup databases step', () => { 'utf-8', ); const io = makeIo(); + const runner = fakeHistoricSqlRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT'); + const historicSqlReadinessProbe = vi.fn(async () => ({ + ok: true as const, + dialect: 'bigquery' as const, + runner, + result: { warnings: [], info: [] }, + })); const result = await runKtxSetupDatabasesStep( { @@ -2350,10 +2404,18 @@ describe('setup databases step', () => { { testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), + historicSqlReadinessProbe, }, ); expect(result.status).toBe('ready'); + expect(historicSqlReadinessProbe).toHaveBeenCalledWith( + expect.objectContaining({ + projectDir: tempDir, + connectionId: 'analytics', + connection: expect.objectContaining({ driver: 'bigquery' }), + }), + ); const configText = await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'); const config = parseKtxProjectConfig(configText); expect(config.connections.analytics).toMatchObject({ @@ -2375,6 +2437,71 @@ describe('setup databases step', () => { expect(config.ingest.adapters).toEqual([]); }); + it('prints a non-blocking BigQuery query history probe failure with the grants remediation', async () => { + await writeFile( + join(tempDir, 'ktx.yaml'), + [ + 'connections:', + ' analytics:', + ' driver: bigquery', + ' dataset_id: analytics', + ' credentials_json: env:BIGQUERY_CREDENTIALS_JSON', + '', + ].join('\n'), + 'utf-8', + ); + const io = makeIo(); + const runner = { + ...fakeHistoricSqlRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT'), + fixAdvice: () => ({ + failHeadline: 'BigQuery principal cannot read INFORMATION_SCHEMA.JOBS_BY_PROJECT', + remediation: + 'Grant roles/bigquery.resourceViewer on the BigQuery project, or grant a custom role containing bigquery.jobs.listAll.', + }), + }; + const error = new Error('access denied'); + const historicSqlReadinessProbe = vi.fn(async () => ({ + ok: false as const, + dialect: 'bigquery' as const, + runner, + error, + })); + + const result = await runKtxSetupDatabasesStep( + { + projectDir: tempDir, + inputMode: 'disabled', + databaseConnectionIds: ['analytics'], + databaseSchemas: [], + enableQueryHistory: true, + queryHistoryWindowDays: 45, + skipDatabases: false, + }, + io.io, + { + testConnection: vi.fn(async () => 0), + scanConnection: vi.fn(async () => 0), + historicSqlReadinessProbe, + }, + ); + + expect(result.status).toBe('ready'); + expect(historicSqlReadinessProbe).toHaveBeenCalledWith( + expect.objectContaining({ + projectDir: tempDir, + connectionId: 'analytics', + connection: expect.objectContaining({ driver: 'bigquery' }), + }), + ); + expect(io.stdout()).toContain('Query history probe...'); + expect(io.stdout()).toContain( + 'BigQuery principal cannot read INFORMATION_SCHEMA.JOBS_BY_PROJECT', + ); + expect(io.stdout()).toContain('roles/bigquery.resourceViewer'); + expect(io.stdout()).toContain('bigquery.jobs.listAll'); + expect(io.stdout()).toContain('Setup written; query history will be skipped until fixed.'); + }); + it('enables query history on an existing Postgres connection', async () => { await writeFile( join(tempDir, 'ktx.yaml'), @@ -2403,7 +2530,15 @@ describe('setup databases step', () => { { testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), - historicSqlProbe: vi.fn(async () => ({ ok: true, lines: [' OK pg_stat_statements ready (PostgreSQL 16.4)'] })), + historicSqlReadinessProbe: vi.fn(async () => { + const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements'); + return { + ok: true as const, + dialect: 'postgres' as const, + runner, + result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] }, + }; + }), }, ); @@ -2471,7 +2606,15 @@ describe('setup databases step', () => { { testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), - historicSqlProbe: vi.fn(async () => ({ ok: true, lines: [] })), + historicSqlReadinessProbe: vi.fn(async () => { + const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements'); + return { + ok: true as const, + dialect: 'postgres' as const, + runner, + result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] }, + }; + }), }, ), ).resolves.toMatchObject({ status: 'ready' }); @@ -2498,13 +2641,18 @@ describe('setup databases step', () => { it('prints a non-blocking Postgres query history probe failure after connection test succeeds', async () => { const io = makeIo(); - const historicSqlProbe = vi.fn(async () => ({ - ok: false, - lines: [ - ' FAIL pg_stat_statements extension is not installed in the connection database', - ' Fix: Run (against this database): CREATE EXTENSION pg_stat_statements;', - " Fix: Ensure shared_preload_libraries includes 'pg_stat_statements'.", - ], + const runner = { + ...fakeHistoricSqlRunner('postgres', 'pg_stat_statements'), + fixAdvice: () => ({ + failHeadline: 'pg_stat_statements extension is not installed in the connection database', + remediation: 'Run (against this database): CREATE EXTENSION pg_stat_statements;', + }), + }; + const historicSqlReadinessProbe = vi.fn(async () => ({ + ok: false as const, + dialect: 'postgres' as const, + runner, + error: new Error('missing extension'), })); const result = await runKtxSetupDatabasesStep( @@ -2522,16 +2670,16 @@ describe('setup databases step', () => { { testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), - historicSqlProbe, + historicSqlReadinessProbe, }, ); expect(result.status).toBe('ready'); - expect(historicSqlProbe).toHaveBeenCalledWith( + expect(historicSqlReadinessProbe).toHaveBeenCalledWith( expect.objectContaining({ projectDir: tempDir, connectionId: 'warehouse', - dialect: 'postgres', + connection: expect.objectContaining({ driver: 'postgres' }), }), ); expect(io.stdout()).toContain('Query history probe...'); @@ -2542,12 +2690,19 @@ describe('setup databases step', () => { it('prints a non-blocking Snowflake query history probe failure with the grants remediation', async () => { const io = makeIo(); - const historicSqlProbe = vi.fn(async () => ({ - ok: false, - lines: [ - ' FAIL Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', - ' Fix: Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ;', - ], + const runner = { + ...fakeHistoricSqlRunner('snowflake', 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'), + fixAdvice: () => ({ + failHeadline: 'Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + remediation: + 'Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ;', + }), + }; + const historicSqlReadinessProbe = vi.fn(async () => ({ + ok: false as const, + dialect: 'snowflake' as const, + runner, + error: new Error('role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'), })); const result = await runKtxSetupDatabasesStep( @@ -2564,7 +2719,7 @@ describe('setup databases step', () => { { testConnection: vi.fn(async () => 0), scanConnection: vi.fn(async () => 0), - historicSqlProbe, + historicSqlReadinessProbe, prompts: makePromptAdapter({ textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''], passwordValues: ['env:SNOWFLAKE_PASSWORD'], @@ -2573,11 +2728,11 @@ describe('setup databases step', () => { ); expect(result.status).toBe('ready'); - expect(historicSqlProbe).toHaveBeenCalledWith( + expect(historicSqlReadinessProbe).toHaveBeenCalledWith( expect.objectContaining({ projectDir: tempDir, connectionId: 'warehouse', - dialect: 'snowflake', + connection: expect.objectContaining({ driver: 'snowflake' }), }), ); expect(io.stdout()).toContain('Query history probe...'); @@ -2588,7 +2743,15 @@ describe('setup databases step', () => { it('does not run the query history probe when the regular connection test fails', async () => { const io = makeIo(); - const historicSqlProbe = vi.fn(async () => ({ ok: true, lines: [] })); + const historicSqlReadinessProbe = vi.fn(async () => { + const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements'); + return { + ok: true as const, + dialect: 'postgres' as const, + runner, + result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] }, + }; + }); const result = await runKtxSetupDatabasesStep( { @@ -2605,12 +2768,12 @@ describe('setup databases step', () => { { testConnection: vi.fn(async () => 1), scanConnection: vi.fn(async () => 0), - historicSqlProbe, + historicSqlReadinessProbe, }, ); expect(result.status).toBe('failed'); - expect(historicSqlProbe).not.toHaveBeenCalled(); + expect(historicSqlReadinessProbe).not.toHaveBeenCalled(); }); it('returns missing input when non-interactive database flags are incomplete', async () => { diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index 30d4fa20..5793d5e6 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -3,7 +3,13 @@ import { readFile, writeFile } from 'node:fs/promises'; import { delimiter, dirname, join } from 'node:path'; import { fileURLToPath } from 'node:url'; import { promisify } from 'node:util'; +import { queryHistoryDialectForConnection } from './context/ingest/adapters/historic-sql/connection-dialect.js'; import type { HistoricSqlDialect } from './context/ingest/adapters/historic-sql/types.js'; +import { + runHistoricSqlReadinessProbe, + type HistoricSqlProbeOutcome, + type HistoricSqlReadinessProbe, +} from './context/ingest/historic-sql-probes.js'; import { type KtxProjectConnectionConfig, serializeKtxProjectConfig } from './context/project/config.js'; import { loadKtxProject } from './context/project/project.js'; import { markKtxSetupStateStepComplete, setKtxSetupDatabaseConnectionIds } from './context/project/setup-config.js'; @@ -84,19 +90,11 @@ export interface KtxSetupDatabasesPromptAdapter { cancel(message: string): void; } -interface KtxSetupHistoricSqlProbeInput { - projectDir: string; - connectionId: string; - dialect: HistoricSqlDialect; -} - interface KtxSetupHistoricSqlProbeResult { ok: boolean; lines: string[]; } -type KtxSetupHistoricSqlProbe = (input: KtxSetupHistoricSqlProbeInput) => Promise; - export interface KtxSetupDatabasesDeps { prompts?: KtxSetupDatabasesPromptAdapter; testConnection?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise; @@ -105,7 +103,7 @@ export interface KtxSetupDatabasesDeps { listSchemas?: (projectDir: string, connectionId: string) => Promise; listTables?: (projectDir: string, connectionId: string, schemas?: string[]) => Promise; pickDatabaseScope?: (args: PickDatabaseScopeArgs, io: KtxCliIo) => Promise; - historicSqlProbe?: KtxSetupHistoricSqlProbe; + historicSqlReadinessProbe?: HistoricSqlReadinessProbe; } const DRIVER_OPTIONS: Array<{ value: KtxSetupDatabaseDriver; label: string }> = [ @@ -334,121 +332,24 @@ function migrateLegacyHistoricSqlConnection(connection: KtxProjectConnectionConf return withQueryHistoryConfig(connection, queryHistory); } -function historicSqlProbeFailureLines(error: unknown): string[] { - if (error instanceof Error && error.name === 'HistoricSqlExtensionMissingError') { - return [ - ' FAIL pg_stat_statements extension is not installed in the connection database', - ' Fix: Run (against this database): CREATE EXTENSION pg_stat_statements;', - " Fix: Ensure shared_preload_libraries includes 'pg_stat_statements'.", - ]; +function setupHistoricSqlProbeResult( + outcome: HistoricSqlProbeOutcome | null, +): KtxSetupHistoricSqlProbeResult { + if (!outcome) { + return { ok: true, lines: [] }; } - if (error instanceof Error && error.name === 'HistoricSqlGrantsMissingError') { - const dialect = (error as { dialect?: unknown }).dialect; - if (dialect === 'snowflake') { - return [ - ' FAIL Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', - ' Fix: Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ;', - ]; - } - return [ - ' FAIL Postgres connection role lacks pg_read_all_stats', - ' Fix: Run: GRANT pg_read_all_stats TO ;', - ]; - } - if (error instanceof Error && error.name === 'HistoricSqlVersionUnsupportedError') { - return [` FAIL ${error.message}`]; - } - return [` FAIL Query history probe failed: ${error instanceof Error ? error.message : String(error)}`]; -} - -async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Promise { - if (input.dialect === 'postgres') { - return probePostgresHistoricSql(input); - } - if (input.dialect === 'snowflake') { - return probeSnowflakeHistoricSql(input); - } - return { ok: true, lines: [] }; -} - -async function probePostgresHistoricSql( - input: KtxSetupHistoricSqlProbeInput, -): Promise { - const project = await loadKtxProject({ projectDir: input.projectDir }); - const connection = project.config.connections[input.connectionId]; - const [{ PostgresPgssReader }, { KtxPostgresHistoricSqlQueryClient }, { isKtxPostgresConnectionConfig }] = - await Promise.all([ - import('./context/ingest/adapters/historic-sql/postgres-pgss-reader.js'), - import('./connectors/postgres/historic-sql-query-client.js'), - import('./connectors/postgres/connector.js'), - ]); - - const postgresConnection = connection as Parameters[0]; - if (!isKtxPostgresConnectionConfig(postgresConnection)) { - return { - ok: false, - lines: [` FAIL Connection ${input.connectionId} is not a native Postgres connection.`], - }; - } - - const client = new KtxPostgresHistoricSqlQueryClient({ - connectionId: input.connectionId, - connection: postgresConnection, - }); - try { - const result = await new PostgresPgssReader().probe(client); + if (outcome.ok) { + const { detail, warnings } = outcome.runner.formatSuccessDetail(outcome.result); return { ok: true, - lines: [ - ` OK pg_stat_statements ready (${result.pgServerVersion})`, - ...result.warnings.map((warning: string) => ` ! ${warning}`), - ], - }; - } catch (error) { - return { ok: false, lines: historicSqlProbeFailureLines(error) }; - } finally { - await client.cleanup(); - } -} - -async function probeSnowflakeHistoricSql( - input: KtxSetupHistoricSqlProbeInput, -): Promise { - const project = await loadKtxProject({ projectDir: input.projectDir }); - const connection = project.config.connections[input.connectionId]; - const [{ SnowflakeHistoricSqlQueryHistoryReader }, { KtxSnowflakeHistoricSqlQueryClient }, { isKtxSnowflakeConnectionConfig }] = - await Promise.all([ - import('./context/ingest/adapters/historic-sql/snowflake-query-history-reader.js'), - import('./connectors/snowflake/historic-sql-query-client.js'), - import('./connectors/snowflake/connector.js'), - ]); - - if (!isKtxSnowflakeConnectionConfig(connection)) { - return { - ok: false, - lines: [` FAIL Connection ${input.connectionId} is not a native Snowflake connection.`], + lines: [` OK ${detail}`, ...warnings.map((warning) => ` ! ${warning}`)], }; } - - const client = new KtxSnowflakeHistoricSqlQueryClient({ - connectionId: input.connectionId, - connection, - projectDir: input.projectDir, - }); - try { - const result = await new SnowflakeHistoricSqlQueryHistoryReader().probe(client); - return { - ok: true, - lines: [ - ' OK SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY accessible', - ...result.warnings.map((warning: string) => ` ! ${warning}`), - ], - }; - } catch (error) { - return { ok: false, lines: historicSqlProbeFailureLines(error) }; - } finally { - await client.cleanup(); - } + const advice = outcome.runner.fixAdvice(outcome.error); + return { + ok: false, + lines: [` FAIL ${advice.failHeadline}`, ` Fix: ${advice.remediation}`], + }; } async function defaultListSchemas(projectDir: string, connectionId: string): Promise { @@ -1770,23 +1671,27 @@ async function maybeRunHistoricSqlSetupProbe(input: { const project = await loadKtxProject({ projectDir: input.projectDir }); const connection = project.config.connections[input.connectionId]; const queryHistory = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection); - const driver = normalizeDriver(connection?.driver); if (queryHistory?.enabled !== true) { return; } - const dialect: 'postgres' | 'snowflake' | null = - driver === 'postgres' ? 'postgres' : driver === 'snowflake' ? 'snowflake' : null; + if (!connection) { + return; + } + const dialect = queryHistoryDialectForConnection(connection); if (!dialect) { return; } input.io.stdout.write('│ Query history probe...\n'); - const probe = input.deps.historicSqlProbe ?? defaultHistoricSqlProbe; - const result = await probe({ - projectDir: input.projectDir, - connectionId: input.connectionId, - dialect, - }); + const probe = input.deps.historicSqlReadinessProbe ?? runHistoricSqlReadinessProbe; + const result = setupHistoricSqlProbeResult( + await probe({ + projectDir: input.projectDir, + connectionId: input.connectionId, + connection, + env: process.env, + }), + ); for (const line of result.lines) { input.io.stdout.write(`│${line}\n`); } diff --git a/packages/cli/src/status-project.test.ts b/packages/cli/src/status-project.test.ts index 83862bfb..8f35cfe8 100644 --- a/packages/cli/src/status-project.test.ts +++ b/packages/cli/src/status-project.test.ts @@ -197,26 +197,58 @@ function withMysqlQueryHistory(config: KtxProjectConfig): KtxProjectConfig { }; } +function fakeStatusRunner( + dialect: 'postgres' | 'snowflake' | 'bigquery', + catalogName: string, +) { + return { + dialect, + catalogName, + async run() { + return { warnings: [], info: [] }; + }, + formatSuccessDetail(result: unknown) { + const typed = result as { warnings: string[]; info?: string[]; pgServerVersion?: string }; + const info = typed.info && typed.info.length > 0 ? `; ${typed.info.join('; ')}` : ''; + const base = + dialect === 'postgres' + ? `pg_stat_statements ready (${typed.pgServerVersion ?? 'PostgreSQL 16.4'})` + : `${catalogName} ready`; + return { detail: `${base}${info}`, warnings: typed.warnings }; + }, + fixAdvice(error: unknown) { + return { + failHeadline: error instanceof Error ? error.message : String(error), + remediation: 'Fix query-history grants.', + }; + }, + }; +} + describe('buildProjectStatus query history dispatch', () => { - it('runs the snowflake probe for snowflake connections, not the postgres one', async () => { - let postgresCalls = 0; - let snowflakeCalls = 0; + it('runs the shared probe for snowflake connections', async () => { + let probeCalls = 0; + const runner = fakeStatusRunner( + 'snowflake', + 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + ); const project = projectWithConfig(withSnowflakeQueryHistory(baseProjectConfig())); const status = await buildProjectStatus(project, { claudeCodeAuthProbe: stubClaudeCodeAuthProbe, - postgresQueryHistoryProbe: async () => { - postgresCalls += 1; - throw new Error('postgres probe should not run for snowflake'); - }, - snowflakeQueryHistoryProbe: async () => { - snowflakeCalls += 1; - return { warnings: [], info: [] }; + queryHistoryReadinessProbe: async (input) => { + probeCalls += 1; + expect(input.connectionId).toBe('warehouse'); + return { + ok: true, + dialect: 'snowflake', + runner, + result: { warnings: [], info: [] }, + }; }, }); - expect(postgresCalls).toBe(0); - expect(snowflakeCalls).toBe(1); + expect(probeCalls).toBe(1); expect(status.queryHistory).toHaveLength(1); expect(status.queryHistory[0]).toMatchObject({ connection: 'warehouse', @@ -231,19 +263,21 @@ describe('buildProjectStatus query history dispatch', () => { it('reports snowflake probe failures with the reader-provided remediation', async () => { const project = projectWithConfig(withSnowflakeQueryHistory(baseProjectConfig())); - const { HistoricSqlGrantsMissingError } = await import( - './context/ingest/adapters/historic-sql/errors.js' - ); const status = await buildProjectStatus(project, { claudeCodeAuthProbe: stubClaudeCodeAuthProbe, - snowflakeQueryHistoryProbe: async () => { - throw new HistoricSqlGrantsMissingError({ - dialect: 'snowflake', - message: 'role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', - remediation: 'GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ktx;', - }); - }, + queryHistoryReadinessProbe: async () => ({ + ok: false, + dialect: 'snowflake', + runner: { + ...fakeStatusRunner('snowflake', 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'), + fixAdvice: () => ({ + failHeadline: 'Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', + remediation: 'GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ktx;', + }), + }, + error: new Error('role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'), + }), }); expect(status.queryHistory[0]).toMatchObject({ @@ -257,18 +291,25 @@ describe('buildProjectStatus query history dispatch', () => { }); it('runs the bigquery probe for bigquery connections', async () => { - let bigqueryCalls = 0; + let probeCalls = 0; + const runner = fakeStatusRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT'); const project = projectWithConfig(withBigQueryQueryHistory(baseProjectConfig())); const status = await buildProjectStatus(project, { claudeCodeAuthProbe: stubClaudeCodeAuthProbe, - bigqueryQueryHistoryProbe: async () => { - bigqueryCalls += 1; - return { warnings: [], info: [] }; + queryHistoryReadinessProbe: async (input) => { + probeCalls += 1; + expect(input.connectionId).toBe('bq'); + return { + ok: true, + dialect: 'bigquery', + runner, + result: { warnings: [], info: [] }, + }; }, }); - expect(bigqueryCalls).toBe(1); + expect(probeCalls).toBe(1); expect(status.queryHistory[0]).toMatchObject({ connection: 'bq', driver: 'bigquery', @@ -283,7 +324,7 @@ describe('buildProjectStatus query history dispatch', () => { const status = await buildProjectStatus(project, { claudeCodeAuthProbe: stubClaudeCodeAuthProbe, - postgresQueryHistoryProbe: async () => { + queryHistoryReadinessProbe: async () => { throw new Error('postgres probe must not run for mysql'); }, }); @@ -306,7 +347,7 @@ describe('buildProjectStatus query history dispatch', () => { describe('buildProjectStatus --fast', () => { it('skips claude-code probe and Postgres query-history probe', async () => { let claudeProbeCalls = 0; - let pgProbeCalls = 0; + let queryHistoryProbeCalls = 0; const project = projectWithConfig(withPostgresQueryHistory(baseProjectConfig())); const status = await buildProjectStatus(project, { @@ -316,14 +357,14 @@ describe('buildProjectStatus --fast', () => { claudeProbeCalls += 1; return { ok: true }; }, - postgresQueryHistoryProbe: async () => { - pgProbeCalls += 1; + queryHistoryReadinessProbe: async () => { + queryHistoryProbeCalls += 1; throw new Error('should not be called'); }, }); expect(claudeProbeCalls).toBe(0); - expect(pgProbeCalls).toBe(0); + expect(queryHistoryProbeCalls).toBe(0); expect(status.llm.status).toBe('skipped'); expect(status.llm.detail).toMatch(/--fast/); expect(status.queryHistory).toHaveLength(1); @@ -340,7 +381,7 @@ describe('buildProjectStatus --fast', () => { env: { ANALYTICS_DATABASE_URL: 'postgres://example' }, fast: true, claudeCodeAuthProbe: stubClaudeCodeAuthProbe, - postgresQueryHistoryProbe: async () => { + queryHistoryReadinessProbe: async () => { throw new Error('should not be called'); }, }); diff --git a/packages/cli/src/status-project.ts b/packages/cli/src/status-project.ts index 9b5f1af4..11648e10 100644 --- a/packages/cli/src/status-project.ts +++ b/packages/cli/src/status-project.ts @@ -1,14 +1,18 @@ import { stat as statAsync, readdir as readdirAsync } from 'node:fs/promises'; import { basename, join } from 'node:path'; import { runClaudeCodeAuthProbe } from './context/llm/claude-code-runtime.js'; -import type { KtxConfigIssue, KtxProjectConfig, KtxProjectConnectionConfig, KtxProjectEmbeddingConfig, KtxProjectLlmConfig } from './context/project/config.js'; +import type { KtxConfigIssue, KtxProjectConfig, KtxProjectEmbeddingConfig, KtxProjectLlmConfig } from './context/project/config.js'; import type { KtxLocalProject } from './context/project/project.js'; import { ktxLocalStateDbPath } from './context/project/local-state-db.js'; -import type { PostgresPgssProbeResult } from './context/ingest/adapters/historic-sql/types.js'; import { isQueryHistoryEnabled, queryHistoryDialectForConnection, } from './context/ingest/adapters/historic-sql/connection-dialect.js'; +import { + historicSqlProbeCatalogName, + runHistoricSqlReadinessProbe, + type HistoricSqlReadinessProbe, +} from './context/ingest/historic-sql-probes.js'; import { formatClaudeCodePromptCachingFix, formatClaudeCodePromptCachingWarning, @@ -178,6 +182,13 @@ function resolveRef(value: unknown, env: NodeJS.ProcessEnv): { resolved: string; return { resolved: trimmed, via: 'literal' }; } +function failureDetail(error: unknown): string { + if (error instanceof Error && error.message.trim().length > 0) { + return error.message.trim().split('\n')[0] ?? error.message.trim(); + } + return String(error); +} + function envHint(value: unknown): string | undefined { if (typeof value === 'string' && value.trim().startsWith('env:')) { return value.trim().slice(4).trim(); @@ -401,232 +412,6 @@ function buildConnectionStatus( } } -interface QueryHistoryProbeInput { - projectDir: string; - connectionId: string; - connection: KtxProjectConnectionConfig; - env: NodeJS.ProcessEnv; -} - -interface GenericProbeResult { - warnings: string[]; - info?: string[]; -} - -type PostgresQueryHistoryProbe = (input: QueryHistoryProbeInput) => Promise; -type SnowflakeQueryHistoryProbe = (input: QueryHistoryProbeInput) => Promise; -type BigQueryQueryHistoryProbe = (input: QueryHistoryProbeInput) => Promise; - -function failureDetail(error: unknown): string { - if (error instanceof Error && error.message.trim().length > 0) { - return error.message.trim().split('\n')[0] ?? error.message.trim(); - } - return String(error); -} - -function postgresReadinessDetail(result: PostgresPgssProbeResult): string { - const warningText = result.warnings.length > 0 ? ` with warnings: ${result.warnings.join('; ')}` : ''; - const info = result.info ?? []; - const infoText = info.length > 0 ? `; info: ${info.join('; ')}` : ''; - return `pg_stat_statements ready (${result.pgServerVersion})${warningText}${infoText}`; -} - -function genericReadinessDetail(label: string, result: GenericProbeResult): string { - const warningText = result.warnings.length > 0 ? ` with warnings: ${result.warnings.join('; ')}` : ''; - const info = result.info ?? []; - const infoText = info.length > 0 ? `; info: ${info.join('; ')}` : ''; - return `${label} ready${warningText}${infoText}`; -} - -function probeFailureFix(error: unknown, dialect: string, connectionId: string, projectDir: string): string { - if (error instanceof Error && error.name === 'HistoricSqlExtensionMissingError' && 'remediation' in error) { - return String(error.remediation); - } - if (error instanceof Error && error.name === 'HistoricSqlGrantsMissingError' && 'remediation' in error) { - return String(error.remediation); - } - if (error instanceof Error && error.name === 'HistoricSqlVersionUnsupportedError') { - return 'Use PostgreSQL 14 or newer, or disable query history for this connection'; - } - return `Fix connections.${connectionId} ${dialect} settings, then rerun \`ktx status --project-dir ${projectDir}\``; -} - -async function defaultPostgresQueryHistoryProbe( - input: QueryHistoryProbeInput, -): Promise { - const [{ PostgresPgssReader }, { KtxPostgresHistoricSqlQueryClient }, { isKtxPostgresConnectionConfig }] = - await Promise.all([ - import('./context/ingest/adapters/historic-sql/postgres-pgss-reader.js'), - import('./connectors/postgres/historic-sql-query-client.js'), - import('./connectors/postgres/connector.js'), - ]); - - const inputDriver = input.connection.driver ?? 'unknown'; - if (!isKtxPostgresConnectionConfig(input.connection)) { - throw new Error(`Native PostgreSQL connector cannot run driver "${inputDriver}"`); - } - - const client = new KtxPostgresHistoricSqlQueryClient({ - connectionId: input.connectionId, - connection: input.connection, - env: input.env, - }); - try { - return await new PostgresPgssReader().probe(client); - } finally { - await client.cleanup(); - } -} - -async function defaultSnowflakeQueryHistoryProbe( - input: QueryHistoryProbeInput, -): Promise { - const [{ SnowflakeHistoricSqlQueryHistoryReader }, { KtxSnowflakeHistoricSqlQueryClient }, { isKtxSnowflakeConnectionConfig }] = - await Promise.all([ - import('./context/ingest/adapters/historic-sql/snowflake-query-history-reader.js'), - import('./connectors/snowflake/historic-sql-query-client.js'), - import('./connectors/snowflake/connector.js'), - ]); - - const inputDriver = input.connection.driver ?? 'unknown'; - if (!isKtxSnowflakeConnectionConfig(input.connection)) { - throw new Error(`Native Snowflake connector cannot run driver "${inputDriver}"`); - } - - const client = new KtxSnowflakeHistoricSqlQueryClient({ - connectionId: input.connectionId, - connection: input.connection, - projectDir: input.projectDir, - env: input.env, - }); - try { - return await new SnowflakeHistoricSqlQueryHistoryReader().probe(client); - } finally { - await client.cleanup(); - } -} - -async function defaultBigQueryQueryHistoryProbe( - input: QueryHistoryProbeInput, -): Promise { - const [ - { BigQueryHistoricSqlQueryHistoryReader }, - { KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig }, - { resolveKtxConfigReference }, - ] = await Promise.all([ - import('./context/ingest/adapters/historic-sql/bigquery-query-history-reader.js'), - import('./connectors/bigquery/connector.js'), - import('./context/core/config-reference.js'), - ]); - - const inputDriver = input.connection.driver ?? 'unknown'; - if (!isKtxBigQueryConnectionConfig(input.connection)) { - throw new Error(`Native BigQuery connector cannot run driver "${inputDriver}"`); - } - - const rawCredentials = typeof input.connection.credentials_json === 'string' ? input.connection.credentials_json : ''; - const resolvedCredentials = resolveKtxConfigReference(rawCredentials, input.env); - if (!resolvedCredentials) { - throw new Error(`Query history BigQuery connection ${input.connectionId} requires credentials_json`); - } - const parsed = JSON.parse(resolvedCredentials) as { project_id?: unknown }; - if (typeof parsed.project_id !== 'string' || parsed.project_id.trim().length === 0) { - throw new Error(`Query history BigQuery connection ${input.connectionId} requires credentials_json.project_id`); - } - const region = - typeof input.connection.location === 'string' && input.connection.location.trim().length > 0 - ? input.connection.location.trim() - : 'us'; - - const connector = new KtxBigQueryScanConnector({ - connectionId: input.connectionId, - connection: input.connection, - }); - try { - return await new BigQueryHistoricSqlQueryHistoryReader({ - projectId: parsed.project_id, - region, - }).probe({ - async executeQuery(sql: string) { - const result = await connector.executeReadOnly({ connectionId: input.connectionId, sql }, {} as never); - return { - headers: result.headers, - rows: result.rows, - totalRows: result.totalRows, - }; - }, - }); - } finally { - await connector.cleanup(); - } -} - -interface DispatchedProbe { - label: string; - spinnerLabel: string; - fastSkipDetail: string; - run: () => Promise<{ status: ProjectStatusLevel; detail: string; fix?: string }>; -} - -function postgresProbeDispatch( - input: QueryHistoryProbeInput, - probe: PostgresQueryHistoryProbe, -): DispatchedProbe { - return { - label: 'postgres', - spinnerLabel: `Probing pg_stat_statements on ${input.connectionId}`, - fastSkipDetail: 'pg_stat_statements probe skipped (--fast)', - run: async () => { - const result = await probe(input); - return { - status: result.warnings.length > 0 ? 'warn' : 'ok', - detail: postgresReadinessDetail(result), - ...(result.warnings.length > 0 - ? { - fix: `Update the Postgres parameter group or config, then rerun \`ktx status --project-dir ${input.projectDir}\``, - } - : {}), - }; - }, - }; -} - -function snowflakeProbeDispatch( - input: QueryHistoryProbeInput, - probe: SnowflakeQueryHistoryProbe, -): DispatchedProbe { - return { - label: 'snowflake', - spinnerLabel: `Probing SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY on ${input.connectionId}`, - fastSkipDetail: 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY probe skipped (--fast)', - run: async () => { - const result = await probe(input); - return { - status: result.warnings.length > 0 ? 'warn' : 'ok', - detail: genericReadinessDetail('SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', result), - }; - }, - }; -} - -function bigqueryProbeDispatch( - input: QueryHistoryProbeInput, - probe: BigQueryQueryHistoryProbe, -): DispatchedProbe { - return { - label: 'bigquery', - spinnerLabel: `Probing INFORMATION_SCHEMA.JOBS_BY_PROJECT on ${input.connectionId}`, - fastSkipDetail: 'INFORMATION_SCHEMA.JOBS_BY_PROJECT probe skipped (--fast)', - run: async () => { - const result = await probe(input); - return { - status: result.warnings.length > 0 ? 'warn' : 'ok', - detail: genericReadinessDetail('INFORMATION_SCHEMA.JOBS_BY_PROJECT', result), - }; - }, - }; -} - async function buildQueryHistoryStatus( project: KtxLocalProject, options: BuildProjectStatusOptions, @@ -635,9 +420,7 @@ async function buildQueryHistoryStatus( .filter(([, connection]) => isQueryHistoryEnabled(connection)) .sort(([left], [right]) => left.localeCompare(right)); - const postgresProbe = options.postgresQueryHistoryProbe ?? defaultPostgresQueryHistoryProbe; - const snowflakeProbe = options.snowflakeQueryHistoryProbe ?? defaultSnowflakeQueryHistoryProbe; - const bigqueryProbe = options.bigqueryQueryHistoryProbe ?? defaultBigQueryQueryHistoryProbe; + const probe = options.queryHistoryReadinessProbe ?? runHistoricSqlReadinessProbe; const env = options.env ?? process.env; const statuses: QueryHistoryStatus[] = []; @@ -657,18 +440,7 @@ async function buildQueryHistoryStatus( continue; } - const probeInput: QueryHistoryProbeInput = { - projectDir: project.projectDir, - connectionId, - connection, - env, - }; - const dispatched = - dialect === 'postgres' - ? postgresProbeDispatch(probeInput, postgresProbe) - : dialect === 'snowflake' - ? snowflakeProbeDispatch(probeInput, snowflakeProbe) - : bigqueryProbeDispatch(probeInput, bigqueryProbe); + const catalogName = historicSqlProbeCatalogName(dialect); if (options.fast === true) { statuses.push({ @@ -676,29 +448,61 @@ async function buildQueryHistoryStatus( driver, dialect, status: 'skipped', - detail: dispatched.fastSkipDetail, + detail: `${catalogName} probe skipped (--fast)`, }); continue; } - try { - const outcome = await withSpinner(options.useSpinner === true, dispatched.spinnerLabel, dispatched.run); + const outcome = await withSpinner( + options.useSpinner === true, + `Probing ${catalogName} on ${connectionId}`, + () => + probe({ + projectDir: project.projectDir, + connectionId, + connection, + env, + }), + ); + + if (!outcome) { statuses.push({ connection: connectionId, driver, - dialect, - ...outcome, - }); - } catch (error) { - statuses.push({ - connection: connectionId, - driver, - dialect, + dialect: driver, status: 'fail', - detail: failureDetail(error), - fix: probeFailureFix(error, dispatched.label, connectionId, project.projectDir), + detail: `query history is not supported for driver "${driver}"`, + fix: `Disable connections.${connectionId}.context.queryHistory, or use a postgres, snowflake, or bigquery connection`, }); + continue; } + + if (outcome.ok) { + const { detail, warnings } = outcome.runner.formatSuccessDetail(outcome.result); + statuses.push({ + connection: connectionId, + driver, + dialect, + status: warnings.length > 0 ? 'warn' : 'ok', + detail, + ...(dialect === 'postgres' && warnings.length > 0 + ? { + fix: `Update the Postgres parameter group or config, then rerun \`ktx status --project-dir ${project.projectDir}\``, + } + : {}), + }); + continue; + } + + const advice = outcome.runner.fixAdvice(outcome.error); + statuses.push({ + connection: connectionId, + driver, + dialect, + status: 'fail', + detail: advice.failHeadline, + fix: advice.remediation, + }); } return statuses; @@ -882,9 +686,7 @@ function buildVerdict( export interface BuildProjectStatusOptions { env?: NodeJS.ProcessEnv; - postgresQueryHistoryProbe?: PostgresQueryHistoryProbe; - snowflakeQueryHistoryProbe?: SnowflakeQueryHistoryProbe; - bigqueryQueryHistoryProbe?: BigQueryQueryHistoryProbe; + queryHistoryReadinessProbe?: HistoricSqlReadinessProbe; claudeCodeAuthProbe?: ClaudeCodeAuthProbe; configIssues?: KtxConfigIssue[]; fast?: boolean;